TCPServer: refactored memory allocation

pull/238/head
SChernykh 2023-02-17 08:47:52 +01:00
parent 39216df8eb
commit 1c908c261d
5 changed files with 106 additions and 54 deletions

View File

@ -89,6 +89,16 @@
#define P2POOL_DEBUGGING 1
#endif
#if __has_feature(address_sanitizer) || defined(__SANITIZE_ADDRESS__)
#define ASAN_POISON_MEMORY_REGION(addr, size) __asan_poison_memory_region((addr), (size))
#define ASAN_UNPOISON_MEMORY_REGION(addr, size) __asan_unpoison_memory_region((addr), (size))
extern "C" void __asan_poison_memory_region(void const volatile* addr, size_t size);
extern "C" void __asan_unpoison_memory_region(void const volatile* addr, size_t size);
#else
#define ASAN_POISON_MEMORY_REGION(addr, size)
#define ASAN_UNPOISON_MEMORY_REGION(addr, size)
#endif
namespace p2pool {
constexpr size_t HASH_SIZE = 32;

View File

@ -68,6 +68,7 @@ public:
~P2PClient();
static Client* allocate() { return new P2PClient(); }
virtual size_t size() const override { return sizeof(P2PClient); }
void reset() override;
bool on_connect() override;

View File

@ -42,6 +42,7 @@ public:
FORCEINLINE ~StratumClient() {}
static Client* allocate() { return new StratumClient(); }
virtual size_t size() const override { return sizeof(StratumClient); }
void reset() override;
bool on_connect() override;

View File

@ -55,6 +55,8 @@ public:
Client();
virtual ~Client() {}
virtual size_t size() const = 0;
virtual void reset();
virtual bool on_connect() = 0;
virtual bool on_read(char* data, uint32_t size) = 0;
@ -110,6 +112,9 @@ public:
std::vector<WriteBuf*> m_writeBuffers;
WriteBuf* get_write_buffer();
void return_write_buffer(WriteBuf* buf);
struct SendCallbackBase
{
virtual ~SendCallbackBase() {}
@ -167,6 +172,10 @@ protected:
uv_mutex_t m_clientsListLock;
std::vector<Client*> m_preallocatedClients;
Client* get_client();
void return_client(Client* c);
Client* m_connectedClientsList;
std::atomic<uint32_t> m_numConnections;
std::atomic<uint32_t> m_numIncomingConnections;

View File

@ -226,23 +226,13 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(bool is_v6, const
return false;
}
Client* client;
if (!m_preallocatedClients.empty()) {
client = m_preallocatedClients.back();
m_preallocatedClients.pop_back();
client->reset();
}
else {
client = m_allocateNewClient();
}
Client* client = get_client();
client->m_owner = this;
client->m_port = port;
client->m_isV6 = is_v6;
if (!str_to_ip(is_v6, ip, client->m_addr)) {
m_preallocatedClients.push_back(client);
return_client(client);
return false;
}
@ -264,17 +254,7 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(bool is_v6, const
return false;
}
Client* client;
if (!m_preallocatedClients.empty()) {
client = m_preallocatedClients.back();
m_preallocatedClients.pop_back();
client->reset();
}
else {
client = m_allocateNewClient();
}
Client* client = get_client();
client->m_owner = this;
client->m_addr = ip;
client->m_port = port;
@ -312,20 +292,20 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::connect_to_peer(Client* client)
{
if (is_banned(client->m_addr)) {
LOGINFO(5, "peer " << log::Gray() << static_cast<char*>(client->m_addrString) << log::NoColor() << " is banned, not connecting to it");
m_preallocatedClients.push_back(client);
return_client(client);
return false;
}
if (!m_pendingConnections.insert(client->m_addr).second) {
LOGINFO(6, "there is already a pending connection to this IP, not connecting to " << log::Gray() << static_cast<char*>(client->m_addrString));
m_preallocatedClients.push_back(client);
return_client(client);
return false;
}
int err = uv_tcp_init(&m_loop, &client->m_socket);
if (err) {
LOGERR(1, "failed to create tcp client handle, error " << uv_err_name(err));
m_preallocatedClients.push_back(client);
return_client(client);
return false;
}
client->m_socket.data = client;
@ -493,15 +473,7 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, Sen
return true;
}
WriteBuf* buf;
if (!m_writeBuffers.empty()) {
buf = m_writeBuffers.back();
m_writeBuffers.pop_back();
}
else {
buf = new WriteBuf();
}
WriteBuf* buf = get_write_buffer();
// callback_buf is used in only 1 thread, so it's safe
static uint8_t callback_buf[WRITE_BUF_SIZE];
@ -514,7 +486,7 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, Sen
if (bytes_written == 0) {
LOGWARN(1, "send callback wrote 0 bytes, nothing to do");
m_writeBuffers.push_back(buf);
return_write_buffer(buf);
return true;
}
@ -539,7 +511,7 @@ bool TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::send_internal(Client* client, Sen
const int err = uv_write(&buf->m_write, reinterpret_cast<uv_stream_t*>(&client->m_socket), bufs, 1, Client::on_write);
if (err) {
LOGWARN(1, "failed to start writing data to client connection " << static_cast<const char*>(client->m_addrString) << ", error " << uv_err_name(err));
m_writeBuffers.push_back(buf);
return_write_buffer(buf);
return false;
}
@ -556,8 +528,12 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::loop(void* data)
server->m_writeBuffers.resize(DEFAULT_BACKLOG);
server->m_preallocatedClients.reserve(DEFAULT_BACKLOG);
for (size_t i = 0; i < DEFAULT_BACKLOG; ++i) {
server->m_writeBuffers[i] = new WriteBuf();
server->m_preallocatedClients.emplace_back(server->m_allocateNewClient());
WriteBuf* wb = new WriteBuf();
Client* c = server->m_allocateNewClient();
ASAN_POISON_MEMORY_REGION(wb, sizeof(WriteBuf));
ASAN_POISON_MEMORY_REGION(c, c->size());
server->m_writeBuffers[i] = wb;
server->m_preallocatedClients.emplace_back(c);
}
int err = uv_run(&server->m_loop, UV_RUN_DEFAULT);
@ -571,12 +547,18 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::loop(void* data)
}
for (WriteBuf* buf : server->m_writeBuffers) {
free_hook(buf->m_data);
ASAN_UNPOISON_MEMORY_REGION(buf, sizeof(WriteBuf));
if (buf->m_data) {
ASAN_UNPOISON_MEMORY_REGION(buf->m_data, buf->m_dataCapacity);
free_hook(buf->m_data);
}
delete buf;
}
server->m_writeBuffers.clear();
for (Client* c : server->m_preallocatedClients) {
ASAN_UNPOISON_MEMORY_REGION(c, sizeof(Client));
ASAN_UNPOISON_MEMORY_REGION(c, c->size());
delete c;
}
server->m_preallocatedClients.clear();
@ -622,7 +604,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connection_close(uv_handle_t*
prev_in_list->m_next = next_in_list;
next_in_list->m_prev = prev_in_list;
owner->m_preallocatedClients.push_back(client);
owner->return_client(client);
--owner->m_numConnections;
if (is_incoming) {
@ -638,7 +620,7 @@ template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_connection_error(uv_handle_t* handle)
{
Client* client = reinterpret_cast<Client*>(handle->data);
client->m_owner->m_preallocatedClients.push_back(client);
client->m_owner->return_client(client);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
@ -675,21 +657,12 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_new_client(uv_stream_t* server
return;
}
Client* client;
if (!m_preallocatedClients.empty()) {
client = m_preallocatedClients.back();
m_preallocatedClients.pop_back();
client->reset();
}
else {
client = m_allocateNewClient();
}
Client* client = get_client();
int err = uv_tcp_init(&m_loop, &client->m_socket);
if (err) {
LOGERR(1, "failed to create tcp client handle, error " << uv_err_name(err));
m_preallocatedClients.push_back(client);
return_client(client);
return;
}
client->m_socket.data = client;
@ -868,6 +841,64 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::on_shutdown(uv_async_t* async)
});
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
typename TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::WriteBuf* TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::get_write_buffer()
{
WriteBuf* buf;
if (!m_writeBuffers.empty()) {
buf = m_writeBuffers.back();
m_writeBuffers.pop_back();
ASAN_UNPOISON_MEMORY_REGION(buf, sizeof(WriteBuf));
if (buf->m_data) {
ASAN_UNPOISON_MEMORY_REGION(buf->m_data, buf->m_dataCapacity);
}
}
else {
buf = new WriteBuf();
}
return buf;
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::return_write_buffer(WriteBuf* buf)
{
if (buf->m_data) {
ASAN_POISON_MEMORY_REGION(buf->m_data, buf->m_dataCapacity);
}
ASAN_POISON_MEMORY_REGION(buf, sizeof(WriteBuf));
m_writeBuffers.push_back(buf);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
typename TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client* TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::get_client()
{
Client* c;
if (!m_preallocatedClients.empty()) {
c = m_preallocatedClients.back();
m_preallocatedClients.pop_back();
ASAN_UNPOISON_MEMORY_REGION(c, sizeof(Client));
ASAN_UNPOISON_MEMORY_REGION(c, c->size());
c->reset();
}
else {
c = m_allocateNewClient();
}
return c;
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::return_client(Client* c)
{
ASAN_POISON_MEMORY_REGION(c, c->size());
m_preallocatedClients.push_back(c);
}
template<size_t READ_BUF_SIZE, size_t WRITE_BUF_SIZE>
TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::Client()
: m_owner(nullptr)
@ -1101,7 +1132,7 @@ void TCPServer<READ_BUF_SIZE, WRITE_BUF_SIZE>::Client::on_write(uv_write_t* req,
TCPServer* server = client->m_owner;
if (server) {
server->m_writeBuffers.push_back(buf);
server->return_write_buffer(buf);
}
if (status != 0) {