diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index d15ffb3..4aa49bb 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -678,7 +678,7 @@ void P2PServer::remove_peer_from_list(const raw_ip& ip) void P2PServer::broadcast(const PoolBlock& block) { - const MinerData& miner_data = m_pool->miner_data(); + MinerData miner_data = m_pool->miner_data(); if (block.m_txinGenHeight + 2 < miner_data.height) { LOGWARN(3, "Trying to broadcast a stale block " << block.m_sidechainId << " (mainchain height " << block.m_txinGenHeight << ", current height is " << miner_data.height << ')'); @@ -1736,7 +1736,7 @@ bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size) m_broadcastedHashes[m_broadcastedHashesIndex.fetch_add(1) % array_size(&P2PClient::m_broadcastedHashes)] = server->m_block->m_sidechainId; - const MinerData& miner_data = server->m_pool->miner_data(); + MinerData miner_data = server->m_pool->miner_data(); if (server->m_block->m_prevId != miner_data.prev_id) { // This peer is mining on top of a different Monero block, investigate it diff --git a/src/p2pool.cpp b/src/p2pool.cpp index dcc1cb0..d46dc0c 100644 --- a/src/p2pool.cpp +++ b/src/p2pool.cpp @@ -108,6 +108,7 @@ p2pool::p2pool(int argc, char* argv[]) m_stopAsync.data = this; uv_rwlock_init_checked(&m_mainchainLock); + uv_rwlock_init_checked(&m_minerDataLock); uv_mutex_init_checked(&m_foundBlocksLock); uv_mutex_init_checked(&m_submitBlockDataLock); @@ -149,6 +150,7 @@ p2pool::p2pool(int argc, char* argv[]) p2pool::~p2pool() { uv_rwlock_destroy(&m_mainchainLock); + uv_rwlock_destroy(&m_minerDataLock); uv_mutex_destroy(&m_foundBlocksLock); uv_mutex_destroy(&m_submitBlockDataLock); @@ -203,7 +205,7 @@ void p2pool::handle_tx(TxMempoolData& tx) ", fee = " << log::Gray() << static_cast(tx.fee) / 1e6 << " um"); #if TEST_MEMPOOL_PICKING_ALGORITHM - m_blockTemplate->update(m_minerData, *m_mempool, &m_params->m_wallet); + m_blockTemplate->update(miner_data(), *m_mempool, &m_params->m_wallet); #endif m_zmqLastActive = seconds_since_epoch(); @@ -239,7 +241,10 @@ void p2pool::handle_miner_data(MinerData& data) data.tx_backlog.clear(); data.time_received = std::chrono::high_resolution_clock::now(); - m_minerData = data; + { + WriteLock lock(m_minerDataLock); + m_minerData = data; + } m_updateSeed = true; update_median_timestamp(); @@ -549,11 +554,13 @@ void p2pool::update_block_template_async() void p2pool::update_block_template() { + MinerData data = miner_data(); + if (m_updateSeed) { - m_hasher->set_seed_async(m_minerData.seed_hash); + m_hasher->set_seed_async(data.seed_hash); m_updateSeed = false; } - m_blockTemplate->update(m_minerData, *m_mempool, &m_params->m_wallet); + m_blockTemplate->update(data, *m_mempool, &m_params->m_wallet); stratum_on_block(); api_update_pool_stats(); } @@ -666,6 +673,7 @@ void p2pool::update_median_timestamp() uint64_t timestamps[TIMESTAMP_WINDOW]; if (!get_timestamps(timestamps)) { + WriteLock lock(m_minerDataLock); m_minerData.median_timestamp = 0; return; } @@ -673,8 +681,11 @@ void p2pool::update_median_timestamp() std::sort(timestamps, timestamps + TIMESTAMP_WINDOW); // Shift it +1 block compared to Monero's code because we don't have the latest block yet when we receive new miner data - m_minerData.median_timestamp = (timestamps[TIMESTAMP_WINDOW / 2] + timestamps[TIMESTAMP_WINDOW / 2 + 1]) / 2; - LOGINFO(4, "median timestamp updated to " << log::Gray() << m_minerData.median_timestamp); + const uint64_t ts = (timestamps[TIMESTAMP_WINDOW / 2] + timestamps[TIMESTAMP_WINDOW / 2 + 1]) / 2; + LOGINFO(4, "median timestamp updated to " << log::Gray() << ts); + + WriteLock lock(m_minerDataLock); + m_minerData.median_timestamp = ts; } void p2pool::stratum_on_block() @@ -1021,10 +1032,16 @@ void p2pool::api_update_network_stats() return; } + hash prev_id; + { + ReadLock lock(m_minerDataLock); + prev_id = m_minerData.prev_id; + } + ChainMain mainnet_tip; { ReadLock lock(m_mainchainLock); - mainnet_tip = m_mainchainByHash[m_minerData.prev_id]; + mainnet_tip = m_mainchainByHash[prev_id]; } m_api->set(p2pool_api::Category::NETWORK, "stats", @@ -1086,10 +1103,16 @@ void p2pool::api_update_stats_mod() return; } + hash prev_id; + { + ReadLock lock(m_minerDataLock); + prev_id = m_minerData.prev_id; + } + ChainMain mainnet_tip; { ReadLock lock(m_mainchainLock); - mainnet_tip = m_mainchainByHash[m_minerData.prev_id]; + mainnet_tip = m_mainchainByHash[prev_id]; } time_t last_block_found_time = 0; diff --git a/src/p2pool.h b/src/p2pool.h index 0aec13d..7b0a7b3 100644 --- a/src/p2pool.h +++ b/src/p2pool.h @@ -48,7 +48,12 @@ public: const Params& params() const { return *m_params; } BlockTemplate& block_template() { return *m_blockTemplate; } SideChain& side_chain() { return *m_sideChain; } - const MinerData& miner_data() const { return m_minerData; } + + FORCEINLINE MinerData miner_data() const + { + ReadLock lock(m_minerDataLock); + return m_minerData; + } p2pool_api* api() const { return m_api; } @@ -108,7 +113,6 @@ private: SideChain* m_sideChain; RandomX_Hasher_Base* m_hasher; BlockTemplate* m_blockTemplate; - MinerData m_minerData; bool m_updateSeed; Mempool* m_mempool; @@ -116,6 +120,9 @@ private: std::map m_mainchainByHeight; unordered_map m_mainchainByHash; + mutable uv_rwlock_t m_minerDataLock; + MinerData m_minerData; + enum { TIMESTAMP_WINDOW = 60 }; bool get_timestamps(uint64_t (×tamps)[TIMESTAMP_WINDOW]) const; void update_median_timestamp(); @@ -185,7 +192,7 @@ private: uv_async_t m_blockTemplateAsync; uv_async_t m_stopAsync; - uint64_t m_zmqLastActive; + std::atomic m_zmqLastActive; uint64_t m_startTime; ZMQReader* m_ZMQReader = nullptr; diff --git a/src/side_chain.cpp b/src/side_chain.cpp index 104e769..5364948 100644 --- a/src/side_chain.cpp +++ b/src/side_chain.cpp @@ -60,7 +60,7 @@ static constexpr uint8_t mini_consensus_id[HASH_SIZE] = { 57,130,201,26,149,174, SideChain::SideChain(p2pool* pool, NetworkType type, const char* pool_name) : m_pool(pool) , m_networkType(type) - , m_chainTip(nullptr) + , m_chainTip{ nullptr } , m_poolName(pool_name ? pool_name : "default") , m_targetBlockTime(10) , m_minDifficulty(MIN_DIFFICULTY, 0) @@ -177,7 +177,9 @@ void SideChain::fill_sidechain_data(PoolBlock& block, Wallet* w, const hash& txk block.m_txkeySec = txkeySec; block.m_uncles.clear(); - if (!m_chainTip) { + const PoolBlock* tip = m_chainTip; + + if (!tip) { block.m_parent = {}; block.m_sidechainHeight = 0; block.m_difficulty = m_minDifficulty; @@ -187,8 +189,8 @@ void SideChain::fill_sidechain_data(PoolBlock& block, Wallet* w, const hash& txk return; } - block.m_parent = m_chainTip->m_sidechainId; - block.m_sidechainHeight = m_chainTip->m_sidechainHeight + 1; + block.m_parent = tip->m_sidechainId; + block.m_sidechainHeight = tip->m_sidechainHeight + 1; // Collect uncles from 3 previous block heights @@ -196,15 +198,15 @@ void SideChain::fill_sidechain_data(PoolBlock& block, Wallet* w, const hash& txk std::vector mined_blocks; mined_blocks.reserve(UNCLE_BLOCK_DEPTH * 2 + 1); - PoolBlock* tmp = m_chainTip; - for (uint64_t i = 0, n = std::min(UNCLE_BLOCK_DEPTH, m_chainTip->m_sidechainHeight + 1); tmp && (i < n); ++i) { + const PoolBlock* tmp = tip; + for (uint64_t i = 0, n = std::min(UNCLE_BLOCK_DEPTH, tip->m_sidechainHeight + 1); tmp && (i < n); ++i) { mined_blocks.push_back(tmp->m_sidechainId); mined_blocks.insert(mined_blocks.end(), tmp->m_uncles.begin(), tmp->m_uncles.end()); tmp = get_parent(tmp); } - for (uint64_t i = 0, n = std::min(UNCLE_BLOCK_DEPTH, m_chainTip->m_sidechainHeight + 1); i < n; ++i) { - for (PoolBlock* uncle : m_blocksByHeight[m_chainTip->m_sidechainHeight - i]) { + for (uint64_t i = 0, n = std::min(UNCLE_BLOCK_DEPTH, tip->m_sidechainHeight + 1); i < n; ++i) { + for (PoolBlock* uncle : m_blocksByHeight[tip->m_sidechainHeight - i]) { // Only add verified and valid blocks if (!uncle || !uncle->m_verified || uncle->m_invalid) { continue; @@ -218,7 +220,7 @@ void SideChain::fill_sidechain_data(PoolBlock& block, Wallet* w, const hash& txk // Only add it if it's on the same chain bool same_chain = false; do { - tmp = m_chainTip; + tmp = tip; while (tmp->m_sidechainHeight > uncle->m_sidechainHeight) { tmp = get_parent(tmp); if (!tmp) { @@ -262,7 +264,7 @@ void SideChain::fill_sidechain_data(PoolBlock& block, Wallet* w, const hash& txk } block.m_difficulty = m_curDifficulty; - block.m_cumulativeDifficulty = m_chainTip->m_cumulativeDifficulty + block.m_difficulty; + block.m_cumulativeDifficulty = tip->m_cumulativeDifficulty + block.m_difficulty; for (const hash& uncle_id : block.m_uncles) { auto it = m_blocksById.find(uncle_id); @@ -367,6 +369,7 @@ bool SideChain::block_seen(const PoolBlock& block) { // Check if it's some old block const PoolBlock* tip = m_chainTip; + if (tip && tip->m_sidechainHeight > block.m_sidechainHeight + m_chainWindowSize * 2 && block.m_cumulativeDifficulty < tip->m_cumulativeDifficulty) { return true; @@ -406,7 +409,9 @@ bool SideChain::add_external_block(PoolBlock& block, std::vector& missing_ difficulty_type diff2 = block.m_difficulty; diff2 += block.m_difficulty; - for (PoolBlock* tmp = m_chainTip; tmp && (tmp->m_sidechainHeight + m_chainWindowSize > m_chainTip->m_sidechainHeight); tmp = get_parent(tmp)) { + const PoolBlock* tip = m_chainTip; + + for (const PoolBlock* tmp = tip; tmp && (tmp->m_sidechainHeight + m_chainWindowSize > tip->m_sidechainHeight); tmp = get_parent(tmp)) { if (diff2 >= tmp->m_difficulty) { too_low_diff = false; break; @@ -449,7 +454,7 @@ bool SideChain::add_external_block(PoolBlock& block, std::vector& missing_ } // Check if it has the correct parent and difficulty to go right to monerod for checking - const MinerData& miner_data = m_pool->miner_data(); + MinerData miner_data = m_pool->miner_data(); if ((block.m_prevId == miner_data.prev_id) && miner_data.difficulty.check_pow(pow_hash)) { LOGINFO(0, log::LightGreen() << "add_external_block: block " << block.m_sidechainId << " has enough PoW for Monero network, submitting it"); m_pool->submit_block_async(block.m_mainChainData); @@ -573,7 +578,7 @@ bool SideChain::get_block_blob(const hash& id, std::vector& blob) { MutexLock lock(m_sidechainLock); - PoolBlock* block = nullptr; + const PoolBlock* block = nullptr; // Empty hash means we return current sidechain tip if (id == hash()) { @@ -661,12 +666,14 @@ void SideChain::print_status() uint64_t rem; uint64_t pool_hashrate = udiv128(m_curDifficulty.hi, m_curDifficulty.lo, m_targetBlockTime, &rem); - const difficulty_type& network_diff = m_pool->miner_data().difficulty; + difficulty_type network_diff = m_pool->miner_data().difficulty; uint64_t network_hashrate = udiv128(network_diff.hi, network_diff.lo, 120, &rem); + const PoolBlock* tip = m_chainTip; + uint64_t block_depth = 0; - PoolBlock* cur = m_chainTip; - const uint64_t tip_height = m_chainTip ? m_chainTip->m_sidechainHeight : 0; + const PoolBlock* cur = tip; + const uint64_t tip_height = tip ? tip->m_sidechainHeight : 0; uint32_t total_blocks_in_window = 0; uint32_t total_uncles_in_window = 0; @@ -715,7 +722,7 @@ void SideChain::print_status() uint64_t your_reward = 0; uint64_t total_reward = 0; - if (m_chainTip) { + if (tip) { std::sort(blocks_in_window.begin(), blocks_in_window.end()); for (uint64_t i = 0; (i < m_chainWindowSize) && (i <= tip_height); ++i) { for (PoolBlock* block : m_blocksByHeight[tip_height - i]) { @@ -730,11 +737,11 @@ void SideChain::print_status() } Wallet w = m_pool->params().m_wallet; - const std::vector& outs = m_chainTip->m_outputs; + const std::vector& outs = tip->m_outputs; hash eph_public_key; for (size_t i = 0, n = outs.size(); i < n; ++i) { - if (w.get_eph_public_key(m_chainTip->m_txkeySec, i, eph_public_key) && (outs[i].m_ephPublicKey == eph_public_key)) { + if (w.get_eph_public_key(tip->m_txkeySec, i, eph_public_key) && (outs[i].m_ephPublicKey == eph_public_key)) { your_reward = outs[i].m_reward; } total_reward += outs[i].m_reward; @@ -785,7 +792,8 @@ void SideChain::print_status() difficulty_type SideChain::total_hashes() const { - return m_chainTip ? m_chainTip->m_cumulativeDifficulty : difficulty_type(); + const PoolBlock* tip = m_chainTip; + return tip ? tip->m_cumulativeDifficulty : difficulty_type(); } uint64_t SideChain::miner_count() @@ -809,7 +817,8 @@ uint64_t SideChain::miner_count() uint64_t SideChain::last_updated() const { - return m_chainTip ? m_chainTip->m_localTimestamp : 0; + const PoolBlock* tip = m_chainTip; + return tip ? tip->m_localTimestamp : 0; } bool SideChain::is_default() const @@ -1356,8 +1365,10 @@ void SideChain::update_chain_tip(PoolBlock* block) return; } + const PoolBlock* tip = m_chainTip; + bool is_alternative; - if (is_longer_chain(m_chainTip, block, is_alternative)) { + if (is_longer_chain(tip, block, is_alternative)) { difficulty_type diff; if (get_difficulty(block, m_difficultyData, diff)) { m_chainTip = block; @@ -1365,7 +1376,7 @@ void SideChain::update_chain_tip(PoolBlock* block) LOGINFO(2, "new chain tip: next height = " << log::Gray() << block->m_sidechainHeight + 1 << log::NoColor() << ", next difficulty = " << log::Gray() << m_curDifficulty << log::NoColor() << - ", main chain height = " << log::Gray() << m_chainTip->m_txinGenHeight); + ", main chain height = " << log::Gray() << block->m_txinGenHeight); block->m_wantBroadcast = true; if (m_pool) { @@ -1380,13 +1391,13 @@ void SideChain::update_chain_tip(PoolBlock* block) prune_old_blocks(); } } - else if (block->m_sidechainHeight > m_chainTip->m_sidechainHeight) { + else if (block->m_sidechainHeight > tip->m_sidechainHeight) { LOGINFO(4, "block " << block->m_sidechainId << ", height = " << block->m_sidechainHeight << - " is not a longer chain than " << m_chainTip->m_sidechainId << - ", height " << m_chainTip->m_sidechainHeight); + " is not a longer chain than " << tip->m_sidechainId << + ", height " << tip->m_sidechainHeight); } - else if (block->m_sidechainHeight + UNCLE_BLOCK_DEPTH > m_chainTip->m_sidechainHeight) { + else if (block->m_sidechainHeight + UNCLE_BLOCK_DEPTH > tip->m_sidechainHeight) { LOGINFO(4, "possible uncle block: id = " << log::Gray() << block->m_sidechainId << log::NoColor() << ", height = " << log::Gray() << block->m_sidechainHeight); m_pool->update_block_template_async(); @@ -1519,8 +1530,9 @@ bool SideChain::is_longer_chain(const PoolBlock* block, const PoolBlock* candida } // Final check: candidate chain must be built on top of recent mainchain blocks - if (candidate_mainchain_height + 10 < m_pool->miner_data().height) { - LOGWARN(3, "received a longer alternative chain but it's stale: height " << candidate_mainchain_height << ", current height " << m_pool->miner_data().height); + MinerData data = m_pool->miner_data(); + if (candidate_mainchain_height + 10 < data.height) { + LOGWARN(3, "received a longer alternative chain but it's stale: height " << candidate_mainchain_height << ", current height " << data.height); return false; } @@ -1604,11 +1616,13 @@ void SideChain::prune_old_blocks() const uint64_t cur_time = seconds_since_epoch(); const uint64_t prune_delay = m_chainWindowSize * 4 * m_targetBlockTime; - if (m_chainTip->m_sidechainHeight < prune_distance) { + const PoolBlock* tip = m_chainTip; + + if (tip->m_sidechainHeight < prune_distance) { return; } - const uint64_t h = m_chainTip->m_sidechainHeight - prune_distance; + const uint64_t h = tip->m_sidechainHeight - prune_distance; uint64_t num_blocks_pruned = 0; diff --git a/src/side_chain.h b/src/side_chain.h index 286f97c..4b05708 100644 --- a/src/side_chain.h +++ b/src/side_chain.h @@ -99,7 +99,7 @@ private: bool check_config(); mutable uv_mutex_t m_sidechainLock; - PoolBlock* m_chainTip; + std::atomic m_chainTip; std::map> m_blocksByHeight; unordered_map m_blocksById; unordered_map m_seenWallets; diff --git a/src/tcp_server.h b/src/tcp_server.h index d853ff1..294d11f 100644 --- a/src/tcp_server.h +++ b/src/tcp_server.h @@ -53,7 +53,7 @@ public: struct Client { Client(); - virtual ~Client(); + virtual ~Client() {} virtual void reset(); virtual bool on_connect() = 0; @@ -90,8 +90,6 @@ public: uint32_t m_numRead; std::atomic m_resetCounter{ 0 }; - - uv_mutex_t m_sendLock; }; struct WriteBuf @@ -152,7 +150,7 @@ protected: int m_listenPort; uv_loop_t m_loop; - volatile bool m_loopStopped; + std::atomic m_loopStopped; uv_mutex_t m_clientsListLock; std::vector m_preallocatedClients; diff --git a/src/tcp_server.inl b/src/tcp_server.inl index 7f60a1e..7c69c28 100644 --- a/src/tcp_server.inl +++ b/src/tcp_server.inl @@ -27,7 +27,7 @@ TCPServer::TCPServer(allocate_client_callback all , m_loopThread{} , m_finished(0) , m_listenPort(-1) - , m_loopStopped(false) + , m_loopStopped{false} , m_numConnections(0) , m_numIncomingConnections(0) { @@ -532,8 +532,6 @@ bool TCPServer::send_internal(Client* client, Sen LOGERR(1, "sending data from another thread, this is not thread safe"); } - MutexLock lock0(client->m_sendLock); - WriteBuf* buf = nullptr; { @@ -620,9 +618,11 @@ void TCPServer::on_new_connection(uv_stream_t* se template void TCPServer::on_connection_close(uv_handle_t* handle) { - Client* client = static_cast(handle->data); - MutexLock lock0(client->m_sendLock); + if (!server_event_loop_thread) { + LOGERR(1, "on_connection_close called from another thread, this is not thread safe"); + } + Client* client = static_cast(handle->data); TCPServer* owner = client->m_owner; LOGINFO(5, "peer " << log::Gray() << static_cast(client->m_addrString) << log::NoColor() << " disconnected"); @@ -811,17 +811,9 @@ TCPServer::Client::Client() { Client::reset(); - uv_mutex_init_checked(&m_sendLock); - m_readBuf[0] = '\0'; } -template -TCPServer::Client::~Client() -{ - uv_mutex_destroy(&m_sendLock); -} - template void TCPServer::Client::reset() {