diff --git a/src/json_rpc_request.cpp b/src/json_rpc_request.cpp index 3babb8e..ebdd2c5 100644 --- a/src/json_rpc_request.cpp +++ b/src/json_rpc_request.cpp @@ -72,7 +72,7 @@ JSONRPCRequest::JSONRPCRequest(const char* address, int port, const char* req, C m_request.reserve(std::max(len + 128, log::Stream::BUF_SIZE + 1)); m_request.resize(log::Stream::BUF_SIZE + 1); - log::Stream s(m_request.data()); + log::Stream s(m_request.data(), m_request.size()); s << "POST " << uri << " HTTP/1.1\nContent-Type: application/json\nContent-Length: " << len << "\n\n"; m_request.resize(s.m_pos); diff --git a/src/log.h b/src/log.h index 9e22d40..1554953 100644 --- a/src/log.h +++ b/src/log.h @@ -35,8 +35,10 @@ struct Stream { enum params : int { BUF_SIZE = 1024 - 1 }; - explicit FORCEINLINE Stream(char* buf) : m_pos(0), m_numberWidth(1), m_buf(buf), m_bufSize(BUF_SIZE) {} - FORCEINLINE Stream(char* buf, size_t size) : m_pos(0), m_numberWidth(1), m_buf(buf), m_bufSize(static_cast(size) - 1) {} + template + explicit FORCEINLINE Stream(char (&buf)[N]) : m_pos(0), m_numberWidth(1), m_buf(buf), m_bufSize(N - 1) {} + + FORCEINLINE Stream(void* buf, size_t size) : m_pos(0), m_numberWidth(1), m_buf(reinterpret_cast(buf)), m_bufSize(static_cast(size) - 1) {} template struct Entry diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index 40bc43b..5d25a9e 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -37,6 +37,8 @@ static const char* seed_nodes_mini[] = { "seeds-mini.p2pool.io", "" }; static constexpr int DEFAULT_BACKLOG = 16; static constexpr uint64_t DEFAULT_BAN_TIME = 600; +static constexpr size_t SEND_BUF_MIN_SIZE = 256; + #include "tcp_server.inl" namespace p2pool { @@ -310,9 +312,14 @@ void P2PServer::update_peer_list() client->m_nextOutgoingPeerListRequest = cur_time + (60 + (get_random64() % 61)); const bool result = send(client, - [](void* buf) + [](void* buf, size_t buf_size) { LOGINFO(5, "sending PEER_LIST_REQUEST"); + + if (buf_size < SEND_BUF_MIN_SIZE) { + return 0; + } + *reinterpret_cast(buf) = static_cast(MessageId::PEER_LIST_REQUEST); return 1; }); @@ -780,7 +787,8 @@ void P2PServer::on_broadcast() } for (Broadcast* data : broadcast_queue) { - send(client, [client, data](void* buf) { + send(client, [client, data](void* buf, size_t buf_size) -> size_t + { uint8_t* p0 = reinterpret_cast(buf); uint8_t* p = p0; @@ -798,9 +806,14 @@ void P2PServer::on_broadcast() if (send_pruned) { LOGINFO(6, "sending BLOCK_BROADCAST (pruned) to " << log::Gray() << static_cast(client->m_addrString)); - *(p++) = static_cast(MessageId::BLOCK_BROADCAST); const uint32_t len = static_cast(data->pruned_blob.size()); + if (buf_size < SEND_BUF_MIN_SIZE + 1 + sizeof(uint32_t) + len) { + return 0; + } + + *(p++) = static_cast(MessageId::BLOCK_BROADCAST); + memcpy(p, &len, sizeof(uint32_t)); p += sizeof(uint32_t); @@ -811,9 +824,14 @@ void P2PServer::on_broadcast() } else { LOGINFO(5, "sending BLOCK_BROADCAST (full) to " << log::Gray() << static_cast(client->m_addrString)); - *(p++) = static_cast(MessageId::BLOCK_BROADCAST); const uint32_t len = static_cast(data->blob.size()); + if (buf_size < SEND_BUF_MIN_SIZE + 1 + sizeof(uint32_t) + len) { + return 0; + } + + *(p++) = static_cast(MessageId::BLOCK_BROADCAST); + memcpy(p, &len, sizeof(uint32_t)); p += sizeof(uint32_t); @@ -980,12 +998,17 @@ void P2PServer::download_missing_blocks() } const bool result = send(client, - [&id](void* buf) + [&id](void* buf, size_t buf_size) -> size_t { + LOGINFO(5, "sending BLOCK_REQUEST for id = " << id); + + if (buf_size < SEND_BUF_MIN_SIZE) { + return 0; + } + uint8_t* p0 = reinterpret_cast(buf); uint8_t* p = p0; - LOGINFO(5, "sending BLOCK_REQUEST for id = " << id); *(p++) = static_cast(MessageId::BLOCK_REQUEST); memcpy(p, id.h, HASH_SIZE); @@ -1348,12 +1371,17 @@ bool P2PServer::P2PClient::send_handshake_challenge() m_handshakeChallenge = owner->get_random64(); return owner->send(this, - [this, owner](void* buf) + [this, owner](void* buf, size_t buf_size) -> size_t { + LOGINFO(5, "sending HANDSHAKE_CHALLENGE"); + + if (buf_size < SEND_BUF_MIN_SIZE) { + return 0; + } + uint8_t* p0 = reinterpret_cast(buf); uint8_t* p = p0; - LOGINFO(5, "sending HANDSHAKE_CHALLENGE"); *(p++) = static_cast(MessageId::HANDSHAKE_CHALLENGE); uint64_t k = m_handshakeChallenge; @@ -1465,12 +1493,17 @@ void P2PServer::P2PClient::send_handshake_solution(const uint8_t (&challenge)[CH } const bool result = work->server->send(work->client, - [work](void* buf) + [work](void* buf, size_t buf_size) -> size_t { + LOGINFO(5, "sending HANDSHAKE_SOLUTION"); + + if (buf_size < SEND_BUF_MIN_SIZE) { + return 0; + } + uint8_t* p0 = reinterpret_cast(buf); uint8_t* p = p0; - LOGINFO(5, "sending HANDSHAKE_SOLUTION"); *(p++) = static_cast(MessageId::HANDSHAKE_SOLUTION); memcpy(p, work->solution.h, HASH_SIZE); @@ -1615,8 +1648,14 @@ bool P2PServer::P2PClient::on_handshake_solution(const uint8_t* buf) } return m_owner->send(this, - [this](void* buf) + [this](void* buf, size_t buf_size) -> size_t { + LOGINFO(5, "sending LISTEN_PORT and BLOCK_REQUEST for the chain tip"); + + if (buf_size < SEND_BUF_MIN_SIZE) { + return 0; + } + uint8_t* p0 = reinterpret_cast(buf); uint8_t* p = p0; on_after_handshake(p); @@ -1679,15 +1718,21 @@ bool P2PServer::P2PClient::on_block_request(const uint8_t* buf) } return server->send(this, - [&blob](void* buf) + [&blob](void* buf, size_t buf_size) -> size_t { + LOGINFO(5, "sending BLOCK_RESPONSE"); + + const uint32_t len = static_cast(blob.size()); + + if (buf_size < SEND_BUF_MIN_SIZE + 1 + sizeof(uint32_t) + len) { + return 0; + } + uint8_t* p0 = reinterpret_cast(buf); uint8_t* p = p0; - LOGINFO(5, "sending BLOCK_RESPONSE"); *(p++) = static_cast(MessageId::BLOCK_RESPONSE); - const uint32_t len = static_cast(blob.size()); memcpy(p, &len, sizeof(uint32_t)); p += sizeof(uint32_t); @@ -1841,12 +1886,17 @@ bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*) } return server->send(this, - [&peers, num_selected_peers](void* buf) + [&peers, num_selected_peers](void* buf, size_t buf_size) -> size_t { + LOGINFO(5, "sending PEER_LIST_RESPONSE"); + + if (buf_size < SEND_BUF_MIN_SIZE + 2 + num_selected_peers * 19) { + return 0; + } + uint8_t* p0 = reinterpret_cast(buf); uint8_t* p = p0; - LOGINFO(5, "sending PEER_LIST_RESPONSE"); *(p++) = static_cast(MessageId::PEER_LIST_RESPONSE); *(p++) = static_cast(num_selected_peers); @@ -2002,12 +2052,17 @@ void P2PServer::P2PClient::post_handle_incoming_block(const uint32_t reset_count } const bool result = m_owner->send(this, - [&id](void* buf) + [&id](void* buf, size_t buf_size) -> size_t { + LOGINFO(5, "sending BLOCK_REQUEST for id = " << id); + + if (buf_size < SEND_BUF_MIN_SIZE + 1 + HASH_SIZE) { + return 0; + } + uint8_t* p0 = reinterpret_cast(buf); uint8_t* p = p0; - LOGINFO(5, "sending BLOCK_REQUEST for id = " << id); *(p++) = static_cast(MessageId::BLOCK_REQUEST); memcpy(p, id.h, HASH_SIZE); diff --git a/src/stratum_server.cpp b/src/stratum_server.cpp index 37ee939..5ed00e2 100644 --- a/src/stratum_server.cpp +++ b/src/stratum_server.cpp @@ -267,7 +267,7 @@ bool StratumServer::on_login(StratumClient* client, uint32_t id, const char* log } const bool result = send(client, - [client, id, &hashing_blob, job_id, blob_size, target, height, &seed_hash](void* buf) + [client, id, &hashing_blob, job_id, blob_size, target, height, &seed_hash](void* buf, size_t buf_size) { do { client->m_rpcId = static_cast(static_cast(client->m_owner)->get_random64()); @@ -280,7 +280,7 @@ bool StratumServer::on_login(StratumClient* client, uint32_t id, const char* log target_hex.m_size -= sizeof(uint32_t); } - log::Stream s(reinterpret_cast(buf)); + log::Stream s(buf, buf_size); s << "{\"id\":" << id << ",\"jsonrpc\":\"2.0\",\"result\":{\"id\":\""; s << log::Hex(client->m_rpcId) << "\",\"job\":{\"blob\":\""; s << log::hex_buf(hashing_blob, blob_size) << "\",\"job_id\":\""; @@ -353,9 +353,9 @@ bool StratumServer::on_submit(StratumClient* client, uint32_t id, const char* jo if (!block.get_difficulties(template_id, mainchain_diff, sidechain_diff)) { LOGWARN(4, "client " << static_cast(client->m_addrString) << " got a stale share"); return send(client, - [id](void* buf) + [id](void* buf, size_t buf_size) { - log::Stream s(reinterpret_cast(buf)); + log::Stream s(buf, buf_size); s << "{\"id\":" << id << ",\"jsonrpc\":\"2.0\",\"error\":{\"message\":\"Stale share\"}}\n"; return s.m_pos; }); @@ -419,9 +419,9 @@ bool StratumServer::on_submit(StratumClient* client, uint32_t id, const char* jo LOGWARN(4, "client " << static_cast(client->m_addrString) << " got a share with invalid job id"); const bool result = send(client, - [id](void* buf) + [id](void* buf, size_t buf_size) { - log::Stream s(reinterpret_cast(buf)); + log::Stream s(buf, buf_size); s << "{\"id\":" << id << ",\"jsonrpc\":\"2.0\",\"error\":{\"message\":\"Invalid job id\"}}\n"; return s.m_pos; }); @@ -570,7 +570,7 @@ void StratumServer::on_blobs_ready() } const bool result = send(client, - [data, target, hashing_blob, &job_id](void* buf) + [data, target, hashing_blob, &job_id](void* buf, size_t buf_size) { log::hex_buf target_hex(reinterpret_cast(&target), sizeof(uint64_t)); @@ -579,7 +579,7 @@ void StratumServer::on_blobs_ready() target_hex.m_size -= sizeof(uint32_t); } - log::Stream s(reinterpret_cast(buf)); + log::Stream s(buf, buf_size); s << "{\"jsonrpc\":\"2.0\",\"method\":\"job\",\"params\":{\"blob\":\""; s << log::hex_buf(hashing_blob, data->m_blobSize) << "\",\"job_id\":\""; s << log::Hex(job_id) << "\",\"target\":\""; @@ -739,9 +739,9 @@ void StratumServer::on_after_share_found(uv_work_t* req, int /*status*/) if ((client->m_resetCounter.load() == share->m_clientResetCounter) && (client->m_rpcId == share->m_rpcId)) { const bool result = server->send(client, - [share](void* buf) + [share](void* buf, size_t buf_size) { - log::Stream s(reinterpret_cast(buf)); + log::Stream s(buf, buf_size); switch (share->m_result) { case SubmittedShare::Result::STALE: s << "{\"id\":" << share->m_id << ",\"jsonrpc\":\"2.0\",\"error\":{\"message\":\"Stale share\"}}\n"; diff --git a/src/tcp_server.h b/src/tcp_server.h index 294d11f..d902d91 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -105,14 +105,14 @@ public: struct SendCallbackBase { virtual ~SendCallbackBase() {} - virtual size_t operator()(void*) = 0; + virtual size_t operator()(void*, size_t) = 0; }; template struct SendCallback : public SendCallbackBase { explicit FORCEINLINE SendCallback(T&& callback) : m_callback(std::move(callback)) {} - size_t operator()(void* buf) override { return m_callback(buf); } + size_t operator()(void* buf, size_t buf_size) override { return m_callback(buf, buf_size); } private: SendCallback& operator=(SendCallback&&) = delete; diff --git a/src/tcp_server.inl b/src/tcp_server.inl index f73b334..88d4276 100644 --- a/src/tcp_server.inl +++ b/src/tcp_server.inl @@ -556,7 +556,7 @@ bool TCPServer::send_internal(Client* client, Sen // callback_buf is used in only 1 thread, so it's safe static uint8_t callback_buf[WRITE_BUF_SIZE]; - const size_t bytes_written = callback(callback_buf); + const size_t bytes_written = callback(callback_buf, sizeof(callback_buf)); if (bytes_written > WRITE_BUF_SIZE) { LOGERR(0, "send callback wrote " << bytes_written << " bytes, expected no more than " << WRITE_BUF_SIZE << " bytes");