diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index 955ba1a..2c96fa0 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -58,6 +58,7 @@ P2PServer::P2PServer(p2pool* pool) , m_timerInterval(2) , m_peerListLastSaved(0) , m_lookForMissingBlocks(true) + , m_fastestPeer(nullptr) { m_blockDeserializeBuf.reserve(131072); @@ -88,7 +89,6 @@ P2PServer::P2PServer(p2pool* pool) uv_mutex_init_checked(&m_blockLock); uv_mutex_init_checked(&m_peerListLock); uv_mutex_init_checked(&m_broadcastLock); - uv_mutex_init_checked(&m_missingBlockRequestsLock); uv_rwlock_init_checked(&m_cachedBlocksLock); uv_mutex_init_checked(&m_connectToPeersLock); @@ -151,7 +151,6 @@ P2PServer::~P2PServer() uv_mutex_destroy(&m_blockLock); uv_mutex_destroy(&m_peerListLock); uv_mutex_destroy(&m_broadcastLock); - uv_mutex_destroy(&m_missingBlockRequestsLock); clear_cached_blocks(); uv_rwlock_destroy(&m_cachedBlocksLock); @@ -273,6 +272,7 @@ void P2PServer::update_peer_connections() const uint64_t last_updated = m_pool->side_chain().last_updated(); bool has_good_peers = false; + m_fastestPeer = nullptr; unordered_set connected_clients; { @@ -306,6 +306,9 @@ void P2PServer::update_peer_connections() connected_clients.insert(client->m_addr); if (client->is_good()) { has_good_peers = true; + if ((client->m_pingTime >= 0) && (!m_fastestPeer || (m_fastestPeer->m_pingTime > client->m_pingTime))) { + m_fastestPeer = client; + } } } } @@ -373,30 +376,35 @@ void P2PServer::update_peer_list() for (P2PClient* client = static_cast(m_connectedClientsList->m_next); client != m_connectedClientsList; client = static_cast(client->m_next)) { if (client->is_good() && (cur_time >= client->m_nextOutgoingPeerListRequest)) { - // Send peer list requests at random intervals (60-120 seconds) - client->m_nextOutgoingPeerListRequest = cur_time + (60 + (get_random64() % 61)); - - const bool result = send(client, - [](void* buf, size_t buf_size) - { - LOGINFO(5, "sending PEER_LIST_REQUEST"); - - if (buf_size < SEND_BUF_MIN_SIZE) { - return 0; - } - - *reinterpret_cast(buf) = static_cast(MessageId::PEER_LIST_REQUEST); - return 1; - }); - - if (result) { - client->m_lastPeerListRequestTime = std::chrono::high_resolution_clock::now(); - ++client->m_peerListPendingRequests; - } + send_peer_list_request(client, cur_time); } } } +void P2PServer::send_peer_list_request(P2PClient* client, uint64_t cur_time) +{ + // Send peer list requests at random intervals (60-120 seconds) + client->m_nextOutgoingPeerListRequest = cur_time + (60 + (get_random64() % 61)); + + const bool result = send(client, + [](void* buf, size_t buf_size) + { + LOGINFO(5, "sending PEER_LIST_REQUEST"); + + if (buf_size < SEND_BUF_MIN_SIZE) { + return 0; + } + + *reinterpret_cast(buf) = static_cast(MessageId::PEER_LIST_REQUEST); + return 1; + }); + + if (result) { + client->m_lastPeerListRequestTime = std::chrono::high_resolution_clock::now(); + ++client->m_peerListPendingRequests; + } +} + void P2PServer::save_peer_list_async() { const uint64_t cur_time = seconds_since_epoch(); @@ -1006,8 +1014,6 @@ void P2PServer::download_missing_blocks() if (missing_blocks.empty()) { m_lookForMissingBlocks = false; - - MutexLock lock(m_missingBlockRequestsLock); m_missingBlockRequests.clear(); return; } @@ -1037,15 +1043,11 @@ void P2PServer::download_missing_blocks() for (const hash& id : missing_blocks) { P2PClient* client = clients[get_random64() % clients.size()]; - { - MutexLock lock3(m_missingBlockRequestsLock); - - const uint64_t truncated_block_id = *reinterpret_cast(id.h); - if (!m_missingBlockRequests.insert({ client->m_peerId, truncated_block_id }).second) { - // We already asked this peer about this block - // Don't try to ask another peer, leave it for another timer tick - continue; - } + const uint64_t truncated_block_id = *reinterpret_cast(id.h); + if (!m_missingBlockRequests.insert({ client->m_peerId, truncated_block_id }).second) { + // We already asked this peer about this block + // Don't try to ask another peer, leave it for another timer tick + continue; } if (m_cachedBlocks) { @@ -1112,7 +1114,7 @@ P2PServer::P2PClient::P2PClient() , m_nextOutgoingPeerListRequest(0) , m_lastPeerListRequestTime{} , m_peerListPendingRequests(0) - , m_pingTime(0) + , m_pingTime(-1) , m_blockPendingRequests(0) , m_chainTipBlockRequest(false) , m_lastAlive(0) @@ -1128,6 +1130,12 @@ P2PServer::P2PClient::~P2PClient() void P2PServer::P2PClient::reset() { + P2PServer* server = static_cast(m_owner); + + if (server && (server->m_fastestPeer == this)) { + server->m_fastestPeer = nullptr; + } + Client::reset(); m_peerId = 0; @@ -1142,7 +1150,7 @@ void P2PServer::P2PClient::reset() m_nextOutgoingPeerListRequest = 0; m_lastPeerListRequestTime = {}; m_peerListPendingRequests = 0; - m_pingTime = 0; + m_pingTime = -1; m_blockPendingRequests = 0; m_chainTipBlockRequest = false; m_lastAlive = 0; @@ -1379,7 +1387,7 @@ bool P2PServer::P2PClient::on_read(char* data, uint32_t size) bytes_read = 2u + num_peers * 19u; using namespace std::chrono; - m_pingTime = duration_cast(high_resolution_clock::now() - m_lastPeerListRequestTime).count(); + m_pingTime = std::max(duration_cast(high_resolution_clock::now() - m_lastPeerListRequestTime).count(), 0); --m_peerListPendingRequests; if (!on_peer_list_response(buf + 1)) { @@ -1417,11 +1425,16 @@ void P2PServer::P2PClient::on_read_failed(int /*err*/) void P2PServer::P2PClient::on_disconnected() { + P2PServer* server = static_cast(m_owner); + + if (server && (server->m_fastestPeer == this)) { + server->m_fastestPeer = nullptr; + } + if (!m_handshakeComplete) { LOGWARN(5, "peer " << static_cast(m_addrString) << " disconnected before finishing handshake"); ban(DEFAULT_BAN_TIME); - P2PServer* server = static_cast(m_owner); if (server) { server->remove_peer_from_list(this); } @@ -1835,6 +1848,8 @@ bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size) LOGWARN(4, "peer " << static_cast(m_addrString) << " is mining on top of a stale block (mainchain height " << peer_height << ", expected >= " << our_height << ')'); return false; } + + server->send_peer_list_request(this, seconds_since_epoch()); } return handle_incoming_block_async(server->get_block()); @@ -2104,6 +2119,12 @@ void P2PServer::P2PClient::post_handle_incoming_block(const uint32_t reset_count P2PServer* server = static_cast(m_owner); + // If the initial sync is not finished yet, try to ask the fastest peer too + P2PClient* c = server->m_fastestPeer; + if (c && (c != this) && !server->m_pool->side_chain().precalcFinished()) { + c->post_handle_incoming_block(c->m_resetCounter.load(), missing_blocks); + } + ReadLock lock(server->m_cachedBlocksLock); for (const hash& id : missing_blocks) { @@ -2116,7 +2137,7 @@ void P2PServer::P2PClient::post_handle_incoming_block(const uint32_t reset_count } } - const bool result = m_owner->send(this, + const bool result = server->send(this, [&id](void* buf, size_t buf_size) -> size_t { LOGINFO(5, "sending BLOCK_REQUEST for id = " << id); diff --git a/src/p2p_server.h b/src/p2p_server.h index f562257..d57c45d 100644 --- a/src/p2p_server.h +++ b/src/p2p_server.h @@ -167,6 +167,7 @@ private: void check_zmq(); void update_peer_connections(); void update_peer_list(); + void send_peer_list_request(P2PClient* client, uint64_t cur_time); void save_peer_list_async(); void save_peer_list(); void load_peer_list(); @@ -216,10 +217,10 @@ private: std::vector m_broadcastQueue; bool m_lookForMissingBlocks; - - uv_mutex_t m_missingBlockRequestsLock; unordered_set> m_missingBlockRequests; + P2PClient* m_fastestPeer; + static void on_broadcast(uv_async_t* handle) { reinterpret_cast(handle->data)->on_broadcast(); } void on_broadcast(); diff --git a/src/side_chain.h b/src/side_chain.h index 1f0e90d..1b14de6 100644 --- a/src/side_chain.h +++ b/src/side_chain.h @@ -76,6 +76,7 @@ public: bool is_mini() const; const PoolBlock* chainTip() const { return m_chainTip; } + bool precalcFinished() const { return m_precalcFinished.load(); } static bool split_reward(uint64_t reward, const std::vector& shares, std::vector& rewards);