From 1c908c261d7b1dbe51378e75f595c3c40ea7bccd Mon Sep 17 00:00:00 2001 From: SChernykh Date: Fri, 17 Feb 2023 08:47:52 +0100 Subject: [PATCH] TCPServer: refactored memory allocation --- src/common.h | 10 ++++ src/p2p_server.h | 1 + src/stratum_server.h | 1 + src/tcp_server.h | 9 +++ src/tcp_server.inl | 139 ++++++++++++++++++++++++++----------------- 5 files changed, 106 insertions(+), 54 deletions(-) diff --git a/src/common.h b/src/common.h index 31e0fd6..c412dfd 100644 --- a/src/common.h +++ b/src/common.h @@ -89,6 +89,16 @@ #define P2POOL_DEBUGGING 1 #endif +#if __has_feature(address_sanitizer) || defined(__SANITIZE_ADDRESS__) +#define ASAN_POISON_MEMORY_REGION(addr, size) __asan_poison_memory_region((addr), (size)) +#define ASAN_UNPOISON_MEMORY_REGION(addr, size) __asan_unpoison_memory_region((addr), (size)) +extern "C" void __asan_poison_memory_region(void const volatile* addr, size_t size); +extern "C" void __asan_unpoison_memory_region(void const volatile* addr, size_t size); +#else +#define ASAN_POISON_MEMORY_REGION(addr, size) +#define ASAN_UNPOISON_MEMORY_REGION(addr, size) +#endif + namespace p2pool { constexpr size_t HASH_SIZE = 32; diff --git a/src/p2p_server.h b/src/p2p_server.h index 0321e74..7c4620e 100644 --- a/src/p2p_server.h +++ b/src/p2p_server.h @@ -68,6 +68,7 @@ public: ~P2PClient(); static Client* allocate() { return new P2PClient(); } + virtual size_t size() const override { return sizeof(P2PClient); } void reset() override; bool on_connect() override; diff --git a/src/stratum_server.h b/src/stratum_server.h index 0f71be1..e281006 100644 --- a/src/stratum_server.h +++ b/src/stratum_server.h @@ -42,6 +42,7 @@ public: FORCEINLINE ~StratumClient() {} static Client* allocate() { return new StratumClient(); } + virtual size_t size() const override { return sizeof(StratumClient); } void reset() override; bool on_connect() override; diff --git a/src/tcp_server.h b/src/tcp_server.h index 76eeb64..dd6b8c0 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -55,6 +55,8 @@ public: Client(); virtual ~Client() {} + virtual size_t size() const = 0; + virtual void reset(); virtual bool on_connect() = 0; virtual bool on_read(char* data, uint32_t size) = 0; @@ -110,6 +112,9 @@ public: std::vector m_writeBuffers; + WriteBuf* get_write_buffer(); + void return_write_buffer(WriteBuf* buf); + struct SendCallbackBase { virtual ~SendCallbackBase() {} @@ -167,6 +172,10 @@ protected: uv_mutex_t m_clientsListLock; std::vector m_preallocatedClients; + + Client* get_client(); + void return_client(Client* c); + Client* m_connectedClientsList; std::atomic m_numConnections; std::atomic m_numIncomingConnections; diff --git a/src/tcp_server.inl b/src/tcp_server.inl index e64fbe4..e38cbbb 100644 --- a/src/tcp_server.inl +++ b/src/tcp_server.inl @@ -226,23 +226,13 @@ bool TCPServer::connect_to_peer(bool is_v6, const return false; } - Client* client; - - if (!m_preallocatedClients.empty()) { - client = m_preallocatedClients.back(); - m_preallocatedClients.pop_back(); - client->reset(); - } - else { - client = m_allocateNewClient(); - } - + Client* client = get_client(); client->m_owner = this; client->m_port = port; client->m_isV6 = is_v6; if (!str_to_ip(is_v6, ip, client->m_addr)) { - m_preallocatedClients.push_back(client); + return_client(client); return false; } @@ -264,17 +254,7 @@ bool TCPServer::connect_to_peer(bool is_v6, const return false; } - Client* client; - - if (!m_preallocatedClients.empty()) { - client = m_preallocatedClients.back(); - m_preallocatedClients.pop_back(); - client->reset(); - } - else { - client = m_allocateNewClient(); - } - + Client* client = get_client(); client->m_owner = this; client->m_addr = ip; client->m_port = port; @@ -312,20 +292,20 @@ bool TCPServer::connect_to_peer(Client* client) { if (is_banned(client->m_addr)) { LOGINFO(5, "peer " << log::Gray() << static_cast(client->m_addrString) << log::NoColor() << " is banned, not connecting to it"); - m_preallocatedClients.push_back(client); + return_client(client); return false; } if (!m_pendingConnections.insert(client->m_addr).second) { LOGINFO(6, "there is already a pending connection to this IP, not connecting to " << log::Gray() << static_cast(client->m_addrString)); - m_preallocatedClients.push_back(client); + return_client(client); return false; } int err = uv_tcp_init(&m_loop, &client->m_socket); if (err) { LOGERR(1, "failed to create tcp client handle, error " << uv_err_name(err)); - m_preallocatedClients.push_back(client); + return_client(client); return false; } client->m_socket.data = client; @@ -493,15 +473,7 @@ bool TCPServer::send_internal(Client* client, Sen return true; } - WriteBuf* buf; - - if (!m_writeBuffers.empty()) { - buf = m_writeBuffers.back(); - m_writeBuffers.pop_back(); - } - else { - buf = new WriteBuf(); - } + WriteBuf* buf = get_write_buffer(); // callback_buf is used in only 1 thread, so it's safe static uint8_t callback_buf[WRITE_BUF_SIZE]; @@ -514,7 +486,7 @@ bool TCPServer::send_internal(Client* client, Sen if (bytes_written == 0) { LOGWARN(1, "send callback wrote 0 bytes, nothing to do"); - m_writeBuffers.push_back(buf); + return_write_buffer(buf); return true; } @@ -539,7 +511,7 @@ bool TCPServer::send_internal(Client* client, Sen const int err = uv_write(&buf->m_write, reinterpret_cast(&client->m_socket), bufs, 1, Client::on_write); if (err) { LOGWARN(1, "failed to start writing data to client connection " << static_cast(client->m_addrString) << ", error " << uv_err_name(err)); - m_writeBuffers.push_back(buf); + return_write_buffer(buf); return false; } @@ -556,8 +528,12 @@ void TCPServer::loop(void* data) server->m_writeBuffers.resize(DEFAULT_BACKLOG); server->m_preallocatedClients.reserve(DEFAULT_BACKLOG); for (size_t i = 0; i < DEFAULT_BACKLOG; ++i) { - server->m_writeBuffers[i] = new WriteBuf(); - server->m_preallocatedClients.emplace_back(server->m_allocateNewClient()); + WriteBuf* wb = new WriteBuf(); + Client* c = server->m_allocateNewClient(); + ASAN_POISON_MEMORY_REGION(wb, sizeof(WriteBuf)); + ASAN_POISON_MEMORY_REGION(c, c->size()); + server->m_writeBuffers[i] = wb; + server->m_preallocatedClients.emplace_back(c); } int err = uv_run(&server->m_loop, UV_RUN_DEFAULT); @@ -571,12 +547,18 @@ void TCPServer::loop(void* data) } for (WriteBuf* buf : server->m_writeBuffers) { - free_hook(buf->m_data); + ASAN_UNPOISON_MEMORY_REGION(buf, sizeof(WriteBuf)); + if (buf->m_data) { + ASAN_UNPOISON_MEMORY_REGION(buf->m_data, buf->m_dataCapacity); + free_hook(buf->m_data); + } delete buf; } server->m_writeBuffers.clear(); for (Client* c : server->m_preallocatedClients) { + ASAN_UNPOISON_MEMORY_REGION(c, sizeof(Client)); + ASAN_UNPOISON_MEMORY_REGION(c, c->size()); delete c; } server->m_preallocatedClients.clear(); @@ -622,7 +604,7 @@ void TCPServer::on_connection_close(uv_handle_t* prev_in_list->m_next = next_in_list; next_in_list->m_prev = prev_in_list; - owner->m_preallocatedClients.push_back(client); + owner->return_client(client); --owner->m_numConnections; if (is_incoming) { @@ -638,7 +620,7 @@ template void TCPServer::on_connection_error(uv_handle_t* handle) { Client* client = reinterpret_cast(handle->data); - client->m_owner->m_preallocatedClients.push_back(client); + client->m_owner->return_client(client); } template @@ -675,21 +657,12 @@ void TCPServer::on_new_client(uv_stream_t* server return; } - Client* client; - - if (!m_preallocatedClients.empty()) { - client = m_preallocatedClients.back(); - m_preallocatedClients.pop_back(); - client->reset(); - } - else { - client = m_allocateNewClient(); - } + Client* client = get_client(); int err = uv_tcp_init(&m_loop, &client->m_socket); if (err) { LOGERR(1, "failed to create tcp client handle, error " << uv_err_name(err)); - m_preallocatedClients.push_back(client); + return_client(client); return; } client->m_socket.data = client; @@ -868,6 +841,64 @@ void TCPServer::on_shutdown(uv_async_t* async) }); } +template +typename TCPServer::WriteBuf* TCPServer::get_write_buffer() +{ + WriteBuf* buf; + + if (!m_writeBuffers.empty()) { + buf = m_writeBuffers.back(); + m_writeBuffers.pop_back(); + + ASAN_UNPOISON_MEMORY_REGION(buf, sizeof(WriteBuf)); + if (buf->m_data) { + ASAN_UNPOISON_MEMORY_REGION(buf->m_data, buf->m_dataCapacity); + } + } + else { + buf = new WriteBuf(); + } + + return buf; +} + +template +void TCPServer::return_write_buffer(WriteBuf* buf) +{ + if (buf->m_data) { + ASAN_POISON_MEMORY_REGION(buf->m_data, buf->m_dataCapacity); + } + ASAN_POISON_MEMORY_REGION(buf, sizeof(WriteBuf)); + + m_writeBuffers.push_back(buf); +} + +template +typename TCPServer::Client* TCPServer::get_client() +{ + Client* c; + + if (!m_preallocatedClients.empty()) { + c = m_preallocatedClients.back(); + m_preallocatedClients.pop_back(); + ASAN_UNPOISON_MEMORY_REGION(c, sizeof(Client)); + ASAN_UNPOISON_MEMORY_REGION(c, c->size()); + c->reset(); + } + else { + c = m_allocateNewClient(); + } + + return c; +} + +template +void TCPServer::return_client(Client* c) +{ + ASAN_POISON_MEMORY_REGION(c, c->size()); + m_preallocatedClients.push_back(c); +} + template TCPServer::Client::Client() : m_owner(nullptr) @@ -1101,7 +1132,7 @@ void TCPServer::Client::on_write(uv_write_t* req, TCPServer* server = client->m_owner; if (server) { - server->m_writeBuffers.push_back(buf); + server->return_write_buffer(buf); } if (status != 0) {