From 12a011a9ff54b7204391614ad120997405fc6a19 Mon Sep 17 00:00:00 2001 From: SChernykh Date: Mon, 27 Feb 2023 15:38:30 +0100 Subject: [PATCH] TCPServer: removed unnecessary mutex --- src/p2p_server.cpp | 138 +++++++++++++++---------------- src/p2p_server.h | 2 +- src/stratum_server.cpp | 179 ++++++++++++++++++++--------------------- src/tcp_server.h | 3 +- src/tcp_server.inl | 35 ++++---- 5 files changed, 172 insertions(+), 185 deletions(-) diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index 76a906b..11d8d64 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -261,6 +261,8 @@ void P2PServer::on_connect_failed(bool is_v6, const raw_ip& ip, int port) void P2PServer::update_peer_connections() { + check_event_loop_thread(__func__); + const uint64_t cur_time = seconds_since_epoch(); const uint64_t last_updated = m_pool->side_chain().last_updated(); @@ -268,40 +270,37 @@ void P2PServer::update_peer_connections() m_fastestPeer = nullptr; unordered_set connected_clients; - { - MutexLock lock(m_clientsListLock); - connected_clients.reserve(m_numConnections); - for (P2PClient* client = static_cast(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast(client->m_next)) { - const int timeout = client->m_handshakeComplete ? 300 : 10; - if ((cur_time >= client->m_lastAlive + timeout) && (client->m_socks5ProxyState == Client::Socks5ProxyState::Default)) { - const uint64_t idle_time = static_cast(cur_time - client->m_lastAlive); - LOGWARN(5, "peer " << static_cast(client->m_addrString) << " has been idle for " << idle_time << " seconds, disconnecting"); + connected_clients.reserve(m_numConnections); + for (P2PClient* client = static_cast(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast(client->m_next)) { + const int timeout = client->m_handshakeComplete ? 300 : 10; + if ((cur_time >= client->m_lastAlive + timeout) && (client->m_socks5ProxyState == Client::Socks5ProxyState::Default)) { + const uint64_t idle_time = static_cast(cur_time - client->m_lastAlive); + LOGWARN(5, "peer " << static_cast(client->m_addrString) << " has been idle for " << idle_time << " seconds, disconnecting"); + client->close(); + continue; + } + + if (client->m_handshakeComplete && client->m_lastBroadcastTimestamp) { + // - Side chain is at least 15 minutes newer (last_updated >= client->m_lastBroadcastTimestamp + 900) + // - It's been at least 10 seconds since side chain updated (cur_time >= last_updated + 10) + // - It's been at least 10 seconds since the last block request (peer is not syncing) + // - Peer should have sent a broadcast by now + if (last_updated && (cur_time >= std::max(last_updated, client->m_lastBlockrequestTimestamp) + 10) && (last_updated >= client->m_lastBroadcastTimestamp + 900)) { + const uint64_t dt = last_updated - client->m_lastBroadcastTimestamp; + LOGWARN(5, "peer " << static_cast(client->m_addrString) << " is not broadcasting blocks (last update " << dt << " seconds ago)"); + client->ban(DEFAULT_BAN_TIME); + remove_peer_from_list(client); client->close(); continue; } + } - if (client->m_handshakeComplete && client->m_lastBroadcastTimestamp) { - // - Side chain is at least 15 minutes newer (last_updated >= client->m_lastBroadcastTimestamp + 900) - // - It's been at least 10 seconds since side chain updated (cur_time >= last_updated + 10) - // - It's been at least 10 seconds since the last block request (peer is not syncing) - // - Peer should have sent a broadcast by now - if (last_updated && (cur_time >= std::max(last_updated, client->m_lastBlockrequestTimestamp) + 10) && (last_updated >= client->m_lastBroadcastTimestamp + 900)) { - const uint64_t dt = last_updated - client->m_lastBroadcastTimestamp; - LOGWARN(5, "peer " << static_cast(client->m_addrString) << " is not broadcasting blocks (last update " << dt << " seconds ago)"); - client->ban(DEFAULT_BAN_TIME); - remove_peer_from_list(client); - client->close(); - continue; - } - } - - connected_clients.insert(client->m_addr); - if (client->is_good()) { - has_good_peers = true; - if ((client->m_pingTime >= 0) && (!m_fastestPeer || (m_fastestPeer->m_pingTime > client->m_pingTime))) { - m_fastestPeer = client; - } + connected_clients.insert(client->m_addr); + if (client->is_good()) { + has_good_peers = true; + if ((client->m_pingTime >= 0) && (!m_fastestPeer || (m_fastestPeer->m_pingTime > client->m_pingTime))) { + m_fastestPeer = client; } } } @@ -363,7 +362,7 @@ void P2PServer::update_peer_connections() void P2PServer::update_peer_list() { - MutexLock lock(m_clientsListLock); + check_event_loop_thread(__func__); const uint64_t cur_time = seconds_since_epoch(); for (P2PClient* client = static_cast(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast(client->m_next)) { @@ -843,6 +842,8 @@ void P2PServer::broadcast(const PoolBlock& block, const PoolBlock* parent) void P2PServer::on_broadcast() { + check_event_loop_thread(__func__); + std::vector broadcast_queue; broadcast_queue.reserve(2); @@ -863,8 +864,6 @@ void P2PServer::on_broadcast() } }); - MutexLock lock(m_clientsListLock); - for (P2PClient* client = static_cast(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast(client->m_next)) { if (!client->is_good()) { continue; @@ -941,9 +940,7 @@ void P2PServer::on_broadcast() uint64_t P2PServer::get_random64() { - if (!server_event_loop_thread) { - LOGERR(1, "get_random64() was called from another thread, this is not thread safe"); - } + check_event_loop_thread(__func__); return m_rng(); } @@ -965,9 +962,9 @@ void P2PServer::show_peers_async() } } -void P2PServer::show_peers() +void P2PServer::show_peers() const { - MutexLock lock(m_clientsListLock); + check_event_loop_thread(__func__); const uint64_t cur_time = seconds_since_epoch(); size_t n = 0; @@ -1070,6 +1067,8 @@ void P2PServer::flush_cache() void P2PServer::download_missing_blocks() { + check_event_loop_thread(__func__); + if (!m_lookForMissingBlocks) { return; } @@ -1083,8 +1082,6 @@ void P2PServer::download_missing_blocks() return; } - MutexLock lock(m_clientsListLock); - if (m_numConnections == 0) { return; } @@ -1271,7 +1268,6 @@ bool P2PServer::P2PClient::on_connect() } // Don't allow multiple connections to/from the same IP (except localhost) - // server->m_clientsListLock is already locked here if (!m_addr.is_localhost()) { for (P2PClient* client = static_cast(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast(client->m_next)) { if ((client != this) && (client->m_addr == m_addr)) { @@ -1757,6 +1753,8 @@ bool P2PServer::P2PClient::check_handshake_solution(const hash& solution, const bool P2PServer::P2PClient::on_handshake_challenge(const uint8_t* buf) { + check_event_loop_thread(__func__); + P2PServer* server = static_cast(m_owner); uint8_t challenge[CHALLENGE_SIZE]; @@ -1772,23 +1770,14 @@ bool P2PServer::P2PClient::on_handshake_challenge(const uint8_t* buf) m_peerId = peer_id; - bool same_peer = false; - { - MutexLock lock(server->m_clientsListLock); - for (const P2PClient* client = static_cast(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast(client->m_next)) { - if ((client != this) && (client->m_peerId == peer_id)) { - LOGWARN(5, "tried to connect to the same peer twice: current connection " << static_cast(client->m_addrString) << ", new connection " << static_cast(m_addrString)); - same_peer = true; - break; - } + for (const P2PClient* client = static_cast(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast(client->m_next)) { + if ((client != this) && (client->m_peerId == peer_id)) { + LOGWARN(5, "tried to connect to the same peer twice: current connection " << static_cast(client->m_addrString) << ", new connection " << static_cast(m_addrString)); + close(); + return true; } } - if (same_peer) { - close(); - return true; - } - send_handshake_solution(challenge); return true; } @@ -2033,6 +2022,8 @@ bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size, bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*) { + check_event_loop_thread(__func__); + P2PServer* server = static_cast(m_owner); const uint64_t cur_time = seconds_since_epoch(); const bool first = (m_prevIncomingPeerListRequest == 0); @@ -2050,33 +2041,30 @@ bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*) Peer peers[PEER_LIST_RESPONSE_MAX_PEERS]; uint32_t num_selected_peers = 0; - { - MutexLock lock(server->m_clientsListLock); - // Send every 4th peer on average, selected at random - const uint32_t peers_to_send_target = std::min(PEER_LIST_RESPONSE_MAX_PEERS, std::max(1, server->m_numConnections / 4)); - uint32_t n = 0; + // Send every 4th peer on average, selected at random + const uint32_t peers_to_send_target = std::min(PEER_LIST_RESPONSE_MAX_PEERS, std::max(1, server->m_numConnections / 4)); + uint32_t n = 0; - for (P2PClient* client = static_cast(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast(client->m_next)) { - if (!client->is_good() || (client->m_addr == m_addr)) { - continue; - } + for (P2PClient* client = static_cast(server->m_connectedClientsList->m_next); client != server->m_connectedClientsList; client = static_cast(client->m_next)) { + if (!client->is_good() || (client->m_addr == m_addr)) { + continue; + } - const Peer p{ client->m_isV6, client->m_addr, client->m_listenPort, 0, 0 }; - ++n; + const Peer p{ client->m_isV6, client->m_addr, client->m_listenPort, 0, 0 }; + ++n; - // Use https://en.wikipedia.org/wiki/Reservoir_sampling algorithm - if (num_selected_peers < peers_to_send_target) { - peers[num_selected_peers++] = p; - continue; - } + // Use https://en.wikipedia.org/wiki/Reservoir_sampling algorithm + if (num_selected_peers < peers_to_send_target) { + peers[num_selected_peers++] = p; + continue; + } - uint64_t k; - umul128(server->get_random64(), n, &k); + uint64_t k; + umul128(server->get_random64(), n, &k); - if (k < peers_to_send_target) { - peers[k] = p; - } + if (k < peers_to_send_target) { + peers[k] = p; } } diff --git a/src/p2p_server.h b/src/p2p_server.h index 7c4620e..9f00446 100644 --- a/src/p2p_server.h +++ b/src/p2p_server.h @@ -253,7 +253,7 @@ private: uv_async_t m_showPeersAsync; static void on_show_peers(uv_async_t* handle) { reinterpret_cast(handle->data)->show_peers(); } - void show_peers(); + void show_peers() const; void on_shutdown() override; }; diff --git a/src/stratum_server.cpp b/src/stratum_server.cpp index 880d888..936bb88 100644 --- a/src/stratum_server.cpp +++ b/src/stratum_server.cpp @@ -484,11 +484,11 @@ void StratumServer::show_workers_async() void StratumServer::show_workers() { + check_event_loop_thread(__func__); + const uint64_t cur_time = seconds_since_epoch(); const difficulty_type pool_diff = m_pool->side_chain().difficulty(); - MutexLock lock(m_clientsListLock); - int addr_len = 0; for (const StratumClient* c = static_cast(m_connectedClientsList->m_next); c != m_connectedClientsList; c = static_cast(c->m_next)) { addr_len = std::max(addr_len, static_cast(strlen(c->m_addrString))); @@ -671,6 +671,8 @@ void StratumServer::update_auto_diff(StratumClient* client, const uint64_t times void StratumServer::on_blobs_ready() { + check_event_loop_thread(__func__); + std::vector blobs_queue; blobs_queue.reserve(2); @@ -699,101 +701,98 @@ void StratumServer::on_blobs_ready() uint32_t num_sent = 0; const uint64_t cur_time = seconds_since_epoch(); - { - MutexLock lock2(m_clientsListLock); - for (StratumClient* client = static_cast(m_connectedClientsList->m_prev); client != m_connectedClientsList; client = static_cast(client->m_prev)) { - ++numClientsProcessed; + for (StratumClient* client = static_cast(m_connectedClientsList->m_prev); client != m_connectedClientsList; client = static_cast(client->m_prev)) { + ++numClientsProcessed; - if (!client->m_rpcId) { - // Not logged in yet, on_login() will send the job to this client. Also close inactive connections. - if (cur_time >= client->m_connectedTime + 10) { - LOGWARN(4, "client " << static_cast(client->m_addrString) << " didn't send login data"); - client->ban(DEFAULT_BAN_TIME); - client->close(); - } - continue; - } - - if (num_sent >= data->m_numClientsExpected) { - // We don't have any more extra_nonce values available - continue; - } - - uint8_t* hashing_blob = data->m_blobs.data() + num_sent * data->m_blobSize; - - uint64_t target = data->m_target; - if (client->m_customDiff.lo) { - target = std::max(target, client->m_customDiff.target()); - } - else if (m_autoDiff) { - // Limit autodiff to 4000000 for maximum compatibility - target = std::max(target, TARGET_4_BYTES_LIMIT); - - if (client->m_autoDiff.lo) { - const uint32_t k = client->m_autoDiffIndex; - const uint16_t elapsed_time = static_cast(cur_time) - client->m_autoDiffData[(k - 1) % StratumClient::AUTO_DIFF_SIZE].m_timestamp; - if (elapsed_time > AUTO_DIFF_TARGET_TIME * 5) { - // More than 500% effort, reduce the auto diff by 1/8 every time until the share is found - client->m_autoDiff.lo = std::max(client->m_autoDiff.lo - client->m_autoDiff.lo / 8, MIN_DIFF); - } - target = std::max(target, client->m_autoDiff.target()); - } - else { - // Not enough shares from the client yet, cut diff in half every 16 seconds - const uint64_t num_halvings = (cur_time - client->m_connectedTime) / 16; - constexpr uint64_t max_target = (std::numeric_limits::max() / MIN_DIFF) + 1; - for (uint64_t i = 0; (i < num_halvings) && (target < max_target); ++i) { - target *= 2; - } - target = std::min(target, max_target); - } - } - - uint32_t job_id; - { - job_id = ++client->m_perConnectionJobId; - - StratumClient::SavedJob& saved_job = client->m_jobs[job_id % StratumClient::JOBS_SIZE]; - saved_job.job_id = job_id; - saved_job.extra_nonce = extra_nonce_start + num_sent; - saved_job.template_id = data->m_templateId; - saved_job.target = target; - } - client->m_lastJobTarget = target; - - const bool result = send(client, - [data, target, hashing_blob, job_id](void* buf, size_t buf_size) - { - log::hex_buf target_hex(reinterpret_cast(&target), sizeof(uint64_t)); - - if (target >= TARGET_4_BYTES_LIMIT) { - target_hex.m_data += sizeof(uint32_t); - target_hex.m_size -= sizeof(uint32_t); - } - - log::Stream s(buf, buf_size); - s << "{\"jsonrpc\":\"2.0\",\"method\":\"job\",\"params\":{\"blob\":\""; - s << log::hex_buf(hashing_blob, data->m_blobSize) << "\",\"job_id\":\""; - s << log::Hex(job_id) << "\",\"target\":\""; - s << target_hex << "\",\"algo\":\"rx/0\",\"height\":"; - s << data->m_height << ",\"seed_hash\":\""; - s << data->m_seedHash << "\"}}\n"; - return s.m_pos; - }); - - if (result) { - ++num_sent; - } - else { + if (!client->m_rpcId) { + // Not logged in yet, on_login() will send the job to this client. Also close inactive connections. + if (cur_time >= client->m_connectedTime + 10) { + LOGWARN(4, "client " << static_cast(client->m_addrString) << " didn't send login data"); + client->ban(DEFAULT_BAN_TIME); client->close(); } + continue; } - const uint32_t num_connections = m_numConnections; - if (numClientsProcessed != num_connections) { - LOGWARN(1, "client list is broken, expected " << num_connections << ", got " << numClientsProcessed << " clients"); + if (num_sent >= data->m_numClientsExpected) { + // We don't have any more extra_nonce values available + continue; } + + uint8_t* hashing_blob = data->m_blobs.data() + num_sent * data->m_blobSize; + + uint64_t target = data->m_target; + if (client->m_customDiff.lo) { + target = std::max(target, client->m_customDiff.target()); + } + else if (m_autoDiff) { + // Limit autodiff to 4000000 for maximum compatibility + target = std::max(target, TARGET_4_BYTES_LIMIT); + + if (client->m_autoDiff.lo) { + const uint32_t k = client->m_autoDiffIndex; + const uint16_t elapsed_time = static_cast(cur_time) - client->m_autoDiffData[(k - 1) % StratumClient::AUTO_DIFF_SIZE].m_timestamp; + if (elapsed_time > AUTO_DIFF_TARGET_TIME * 5) { + // More than 500% effort, reduce the auto diff by 1/8 every time until the share is found + client->m_autoDiff.lo = std::max(client->m_autoDiff.lo - client->m_autoDiff.lo / 8, MIN_DIFF); + } + target = std::max(target, client->m_autoDiff.target()); + } + else { + // Not enough shares from the client yet, cut diff in half every 16 seconds + const uint64_t num_halvings = (cur_time - client->m_connectedTime) / 16; + constexpr uint64_t max_target = (std::numeric_limits::max() / MIN_DIFF) + 1; + for (uint64_t i = 0; (i < num_halvings) && (target < max_target); ++i) { + target *= 2; + } + target = std::min(target, max_target); + } + } + + uint32_t job_id; + { + job_id = ++client->m_perConnectionJobId; + + StratumClient::SavedJob& saved_job = client->m_jobs[job_id % StratumClient::JOBS_SIZE]; + saved_job.job_id = job_id; + saved_job.extra_nonce = extra_nonce_start + num_sent; + saved_job.template_id = data->m_templateId; + saved_job.target = target; + } + client->m_lastJobTarget = target; + + const bool result = send(client, + [data, target, hashing_blob, job_id](void* buf, size_t buf_size) + { + log::hex_buf target_hex(reinterpret_cast(&target), sizeof(uint64_t)); + + if (target >= TARGET_4_BYTES_LIMIT) { + target_hex.m_data += sizeof(uint32_t); + target_hex.m_size -= sizeof(uint32_t); + } + + log::Stream s(buf, buf_size); + s << "{\"jsonrpc\":\"2.0\",\"method\":\"job\",\"params\":{\"blob\":\""; + s << log::hex_buf(hashing_blob, data->m_blobSize) << "\",\"job_id\":\""; + s << log::Hex(job_id) << "\",\"target\":\""; + s << target_hex << "\",\"algo\":\"rx/0\",\"height\":"; + s << data->m_height << ",\"seed_hash\":\""; + s << data->m_seedHash << "\"}}\n"; + return s.m_pos; + }); + + if (result) { + ++num_sent; + } + else { + client->close(); + } + } + + const uint32_t num_connections = m_numConnections; + if (numClientsProcessed != num_connections) { + LOGWARN(1, "client list is broken, expected " << num_connections << ", got " << numClientsProcessed << " clients"); } LOGINFO(3, "sent new job to " << num_sent << '/' << numClientsProcessed << " clients"); diff --git a/src/tcp_server.h b/src/tcp_server.h index dd6b8c0..bc37512 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -170,7 +170,8 @@ protected: uv_loop_t m_loop; - uv_mutex_t m_clientsListLock; + static void check_event_loop_thread(const char *func); + std::vector m_preallocatedClients; Client* get_client(); diff --git a/src/tcp_server.inl b/src/tcp_server.inl index e38cbbb..f83c55d 100644 --- a/src/tcp_server.inl +++ b/src/tcp_server.inl @@ -61,7 +61,6 @@ TCPServer::TCPServer(allocate_client_callback all } m_shutdownAsync.data = this; - uv_mutex_init_checked(&m_clientsListLock); uv_mutex_init_checked(&m_bansLock); m_connectedClientsList = m_allocateNewClient(); @@ -369,11 +368,17 @@ bool TCPServer::connect_to_peer(Client* client) } template -void TCPServer::close_sockets(bool listen_sockets) +void TCPServer::check_event_loop_thread(const char* func) { if (!server_event_loop_thread) { - LOGERR(1, "closing sockets from another thread, this is not thread safe"); + LOGERR(1, func << " called from another thread, this is not thread safe"); } +} + +template +void TCPServer::close_sockets(bool listen_sockets) +{ + check_event_loop_thread(__func__); if (listen_sockets) { for (uv_tcp_t* s : m_listenSockets6) { @@ -391,15 +396,12 @@ void TCPServer::close_sockets(bool listen_sockets } size_t numClosed = 0; - { - MutexLock lock(m_clientsListLock); - for (Client* c = m_connectedClientsList->m_next; c != m_connectedClientsList; c = c->m_next) { - uv_handle_t* h = reinterpret_cast(&c->m_socket); - if (!uv_is_closing(h)) { - uv_close(h, on_connection_close); - ++numClosed; - } + for (Client* c = m_connectedClientsList->m_next; c != m_connectedClientsList; c = c->m_next) { + uv_handle_t* h = reinterpret_cast(&c->m_socket); + if (!uv_is_closing(h)) { + uv_close(h, on_connection_close); + ++numClosed; } } @@ -418,7 +420,6 @@ void TCPServer::shutdown_tcp() uv_async_send(&m_shutdownAsync); uv_thread_join(&m_loopThread); - uv_mutex_destroy(&m_clientsListLock); uv_mutex_destroy(&m_bansLock); LOGINFO(1, "stopped"); @@ -464,9 +465,7 @@ void TCPServer::print_bans() template bool TCPServer::send_internal(Client* client, SendCallbackBase&& callback) { - if (!server_event_loop_thread) { - LOGERR(1, "sending data from another thread, this is not thread safe"); - } + check_event_loop_thread(__func__); if (client->m_isClosing) { LOGWARN(5, "client " << static_cast(client->m_addrString) << " is being disconnected, can't send any more data"); @@ -586,14 +585,14 @@ void TCPServer::on_new_connection(uv_stream_t* se template void TCPServer::on_connection_close(uv_handle_t* handle) { + check_event_loop_thread(__func__); + Client* client = static_cast(handle->data); TCPServer* owner = client->m_owner; LOGINFO(5, "peer " << log::Gray() << static_cast(client->m_addrString) << log::NoColor() << " disconnected"); if (owner) { - MutexLock lock(owner->m_clientsListLock); - Client* prev_in_list = client->m_prev; Client* next_in_list = client->m_next; @@ -688,7 +687,7 @@ void TCPServer::on_new_client(uv_stream_t* server template void TCPServer::on_new_client(uv_stream_t* server, Client* client) { - MutexLock lock(m_clientsListLock); + check_event_loop_thread(__func__); client->m_prev = m_connectedClientsList; client->m_next = m_connectedClientsList->m_next;