From aba3bc50b8384a817a13082cc9499df445cf70c9 Mon Sep 17 00:00:00 2001 From: SChernykh Date: Tue, 24 Aug 2021 11:42:41 +0200 Subject: [PATCH] Block cache WIP and other fixes - Block cache is implemented only on Windows for now - Tracking of background jobs - More robust sidechain syncing --- CMakeLists.txt | 2 + src/block_cache.cpp | 161 +++++++++++++++++++++++++++++++++++++++++ src/block_cache.h | 41 +++++++++++ src/log.cpp | 51 +++++++------ src/log.h | 2 + src/p2p_server.cpp | 114 +++++++++++++++++++++++++---- src/p2p_server.h | 13 +++- src/p2pool.cpp | 12 +-- src/pow_hash.cpp | 8 +- src/side_chain.cpp | 7 +- src/stratum_server.cpp | 4 +- src/util.cpp | 86 +++++++++++++++++++++- src/util.h | 17 ++++- 13 files changed, 461 insertions(+), 57 deletions(-) create mode 100644 src/block_cache.cpp create mode 100644 src/block_cache.h diff --git a/CMakeLists.txt b/CMakeLists.txt index e80e1be..c335b07 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,6 +13,7 @@ include(cmake/flags.cmake) set(HEADERS external/src/cryptonote/crypto-ops.h external/src/llhttp/llhttp.h + src/block_cache.h src/block_template.h src/common.h src/console_commands.h @@ -44,6 +45,7 @@ set(SOURCES external/src/llhttp/api.c external/src/llhttp/http.c external/src/llhttp/llhttp.c + src/block_cache.cpp src/block_template.cpp src/console_commands.cpp src/crypto.cpp diff --git a/src/block_cache.cpp b/src/block_cache.cpp new file mode 100644 index 0000000..0b8ae7e --- /dev/null +++ b/src/block_cache.cpp @@ -0,0 +1,161 @@ +/* + * This file is part of the Monero P2Pool + * Copyright (c) 2021 SChernykh + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include "common.h" +#include "block_cache.h" +#include "pool_block.h" +#include "p2p_server.h" + +static constexpr char log_category_prefix[] = "BlockCache "; + +static constexpr uint32_t BLOCK_SIZE = 96 * 1024; +static constexpr uint32_t NUM_BLOCKS = 5120; +static constexpr uint32_t CACHE_SIZE = BLOCK_SIZE * NUM_BLOCKS; +static constexpr char cache_name[] = "p2pool.cache"; + +namespace p2pool { + +struct BlockCache::Impl +{ +#ifdef _WIN32 + Impl() + { + m_file = CreateFile(cache_name, GENERIC_ALL, FILE_SHARE_READ, NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_HIDDEN, NULL); + if (m_file == INVALID_HANDLE_VALUE) { + LOGERR(1, "couldn't open " << cache_name << ", error " << static_cast(GetLastError())); + return; + } + + if (SetFilePointer(m_file, CACHE_SIZE, NULL, FILE_BEGIN) == INVALID_SET_FILE_POINTER) { + LOGERR(1, "SetFilePointer failed, error " << static_cast(GetLastError())); + CloseHandle(m_file); + m_file = INVALID_HANDLE_VALUE; + return; + } + + if (!SetEndOfFile(m_file)) { + LOGERR(1, "SetEndOfFile failed, error " << static_cast(GetLastError())); + CloseHandle(m_file); + m_file = INVALID_HANDLE_VALUE; + return; + } + + m_map = CreateFileMapping(m_file, NULL, PAGE_READWRITE, 0, CACHE_SIZE, NULL); + if (!m_map) { + LOGERR(1, "CreateFileMapping failed, error " << static_cast(GetLastError())); + CloseHandle(m_file); + m_file = INVALID_HANDLE_VALUE; + return; + } + + m_data = reinterpret_cast(MapViewOfFile(m_map, FILE_MAP_ALL_ACCESS, 0, 0, 0)); + if (!m_data) { + LOGERR(1, "MapViewOfFile failed, error " << static_cast(GetLastError())); + CloseHandle(m_map); + CloseHandle(m_file); + m_map = 0; + m_file = INVALID_HANDLE_VALUE; + } + } + + ~Impl() + { + if (m_data) UnmapViewOfFile(m_data); + if (m_map) CloseHandle(m_map); + if (m_file != INVALID_HANDLE_VALUE) CloseHandle(m_file); + } + + void flush() + { + if (m_data && (m_flushRunning.exchange(1) == 0)) { + FlushViewOfFile(m_data, 0); + FlushFileBuffers(m_file); + m_flushRunning.store(0); + } + } + + HANDLE m_file = INVALID_HANDLE_VALUE; + HANDLE m_map = 0; + +#else + // TODO: Linux version is not implemented yet + void flush() {} +#endif + + uint8_t* m_data = nullptr; + std::atomic m_flushRunning{ 0 }; +}; + +BlockCache::BlockCache() + : m_impl(new Impl()) +{ +} + +BlockCache::~BlockCache() +{ + delete m_impl; +} + +void BlockCache::store(const PoolBlock& block) +{ + if (!m_impl->m_data) { + return; + } + + uint8_t* data = m_impl->m_data + (static_cast(block.m_sidechainHeight % NUM_BLOCKS) * BLOCK_SIZE); + + const size_t n1 = block.m_mainChainData.size(); + const size_t n2 = block.m_sideChainData.size(); + + *reinterpret_cast(data) = static_cast(n1 + n2); + memcpy(data + sizeof(uint32_t), block.m_mainChainData.data(), n1); + memcpy(data + sizeof(uint32_t) + n1, block.m_sideChainData.data(), n2); +} + +void BlockCache::load_all(SideChain& side_chain, P2PServer& server) +{ + if (!m_impl->m_data) { + return; + } + + LOGINFO(1, "loading cached blocks"); + + PoolBlock block; + uint32_t blocks_loaded = 0; + + for (uint64_t i = 0; i < NUM_BLOCKS; ++i) { + const uint8_t* data = m_impl->m_data + i * BLOCK_SIZE; + const uint32_t n = *reinterpret_cast(data); + + if (!n || (n + sizeof(uint32_t) > BLOCK_SIZE)) { + continue; + } + + block.deserialize(data + sizeof(uint32_t), n, side_chain); + server.add_cached_block(block); + ++blocks_loaded; + } + + LOGINFO(1, "loaded " << blocks_loaded << " cached blocks"); +} + +void BlockCache::flush() +{ + m_impl->flush(); +} + +} // namespace p2pool diff --git a/src/block_cache.h b/src/block_cache.h new file mode 100644 index 0000000..341e03b --- /dev/null +++ b/src/block_cache.h @@ -0,0 +1,41 @@ +/* + * This file is part of the Monero P2Pool + * Copyright (c) 2021 SChernykh + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, version 3. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#pragma once + +namespace p2pool { + +struct PoolBlock; +class SideChain; +class P2PServer; + +class BlockCache +{ +public: + BlockCache(); + ~BlockCache(); + + void store(const PoolBlock& block); + void load_all(SideChain& side_chain, P2PServer& server); + void flush(); + +private: + struct Impl; + Impl* m_impl; +}; + +} // namespace p2pool diff --git a/src/log.cpp b/src/log.cpp index 9142bcc..55b4da0 100644 --- a/src/log.cpp +++ b/src/log.cpp @@ -44,7 +44,7 @@ public: enum params : int { SLOT_SIZE = 1024, - BUF_SIZE = SLOT_SIZE * 1024, + BUF_SIZE = SLOT_SIZE * 16384, }; FORCEINLINE Worker() @@ -240,27 +240,9 @@ NOINLINE Writer::Writer(Severity severity) : Stream(m_stackBuf) m_buf[0] = static_cast(severity); m_pos = 3; - using namespace std::chrono; - - const system_clock::time_point now = system_clock::now(); - const time_t t0 = system_clock::to_time_t(now); - - tm t; - -#ifdef _WIN32 - localtime_s(&t, &t0); -#else - localtime_r(&t0, &t); -#endif - - m_numberWidth = 2; - *this << Cyan() << (t.tm_year + 1900) << '-' << (t.tm_mon + 1) << '-' << t.tm_mday << ' ' << t.tm_hour << ':' << t.tm_min << ':' << t.tm_sec << '.'; - - const int32_t mcs = time_point_cast(now).time_since_epoch().count() % 1000000; - - m_numberWidth = 4; - *this << (mcs / 100) << NoColor() << ' '; - m_numberWidth = 1; + *this << Cyan(); + writeCurrentTime(); + *this << NoColor() << ' '; } NOINLINE Writer::~Writer() @@ -283,6 +265,31 @@ void stop() worker.stop(); } +NOINLINE void Stream::writeCurrentTime() +{ + using namespace std::chrono; + + const system_clock::time_point now = system_clock::now(); + const time_t t0 = system_clock::to_time_t(now); + + tm t; + +#ifdef _WIN32 + localtime_s(&t, &t0); +#else + localtime_r(&t0, &t); +#endif + + m_numberWidth = 2; + *this << (t.tm_year + 1900) << '-' << (t.tm_mon + 1) << '-' << t.tm_mday << ' ' << t.tm_hour << ':' << t.tm_min << ':' << t.tm_sec << '.'; + + const int32_t mcs = time_point_cast(now).time_since_epoch().count() % 1000000; + + m_numberWidth = 4; + *this << (mcs / 100); + m_numberWidth = 1; +} + } // namespace log } // namespace p2pool diff --git a/src/log.h b/src/log.h index b09a355..7099d20 100644 --- a/src/log.h +++ b/src/log.h @@ -98,6 +98,8 @@ struct Stream FORCEINLINE int getNumberWidth() const { return m_numberWidth; } FORCEINLINE void setNumberWidth(int width) { m_numberWidth = width; } + NOINLINE void writeCurrentTime(); + int m_pos; int m_numberWidth; char* m_buf; diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index 0879bee..b73500f 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -22,6 +22,7 @@ #include "keccak.h" #include "side_chain.h" #include "pool_block.h" +#include "block_cache.h" #include #include @@ -38,6 +39,8 @@ namespace p2pool { P2PServer::P2PServer(p2pool* pool) : TCPServer(P2PClient::allocate, pool->params().m_p2pAddresses) , m_pool(pool) + , m_cache(new BlockCache()) + , m_cacheLoaded(false) , m_rd{} , m_rng(m_rd()) , m_block(new PoolBlock()) @@ -49,6 +52,7 @@ P2PServer::P2PServer(p2pool* pool) uv_mutex_init_checked(&m_blockLock); uv_mutex_init_checked(&m_peerListLock); uv_mutex_init_checked(&m_broadcastLock); + uv_rwlock_init_checked(&m_cachedBlocksLock); int err = uv_async_init(&m_loop, &m_broadcastAsync, on_broadcast); if (err) { @@ -64,6 +68,12 @@ P2PServer::P2PServer(p2pool* pool) panic(); } + if (m_cache) { + WriteLock lock(m_cachedBlocksLock); + m_cache->load_all(m_pool->side_chain(), *this); + m_cacheLoaded = true; + } + m_timer.data = this; err = uv_timer_start(&m_timer, on_timer, 10000, 2000); if (err) { @@ -86,8 +96,33 @@ P2PServer::~P2PServer() uv_mutex_destroy(&m_blockLock); uv_mutex_destroy(&m_peerListLock); uv_mutex_destroy(&m_broadcastLock); + uv_rwlock_destroy(&m_cachedBlocksLock); delete m_block; + + for (auto it : m_cachedBlocks) { + delete it.second; + } + + delete m_cache; +} + +void P2PServer::add_cached_block(const PoolBlock& block) +{ + if (m_cacheLoaded) { + LOGERR(1, "add_cached_block can only be called on startup. Fix the code!"); + return; + } + + PoolBlock* new_block = new PoolBlock(block); + m_cachedBlocks.insert({ new_block->m_sidechainId, new_block }); +} + +void P2PServer::store_in_cache(const PoolBlock& block) +{ + if (m_cache) { + m_cache->store(block); + } } void P2PServer::connect_to_peers(const std::string& peer_list) @@ -200,13 +235,13 @@ void P2PServer::save_peer_list_async() const int err = uv_queue_work(&m_loop, &work->req, [](uv_work_t* req) { - num_running_jobs.fetch_add(1); + bkg_jobs_tracker.start("P2PServer::save_peer_list_async"); reinterpret_cast(req->data)->server->save_peer_list(); }, [](uv_work_t* req, int /*status*/) { delete reinterpret_cast(req->data); - num_running_jobs.fetch_sub(1); + bkg_jobs_tracker.stop("P2PServer::save_peer_list_async"); }); if (err) { @@ -384,7 +419,7 @@ void P2PServer::broadcast(const PoolBlock& block) data->ancestor_hashes = block.m_uncles; data->ancestor_hashes.push_back(block.m_parent); - LOGINFO(5, "Broadcasting block " << block.m_sidechainId << ": " << data->pruned_blob.size() << '/' << data->blob.size() << " bytes (pruned/full)"); + LOGINFO(5, "Broadcasting block " << block.m_sidechainId << " (height " << block.m_sidechainHeight << "): " << data->pruned_blob.size() << '/' << data->blob.size() << " bytes (pruned/full)"); { MutexLock lock(m_broadcastLock); @@ -504,12 +539,46 @@ void P2PServer::print_status() void P2PServer::on_timer() { + flush_cache(); download_missing_blocks(); update_peer_connections(); update_peer_list(); save_peer_list_async(); } +void P2PServer::flush_cache() +{ + if (!m_cache) { + return; + } + + struct Work + { + uv_work_t req; + BlockCache* cache; + }; + + Work* work = new Work{}; + work->req.data = work; + work->cache = m_cache; + + const int err = uv_queue_work(uv_default_loop(), &work->req, + [](uv_work_t* req) + { + bkg_jobs_tracker.start("P2PServer::flush_cache"); + reinterpret_cast(req->data)->cache->flush(); + }, + [](uv_work_t* req, int) + { + delete reinterpret_cast(req->data); + bkg_jobs_tracker.stop("P2PServer::flush_cache"); + }); + + if (err) { + LOGERR(1, "flush_cache: uv_queue_work failed, error " << uv_err_name(err)); + } +} + void P2PServer::download_missing_blocks() { std::vector missing_blocks; @@ -822,7 +891,7 @@ void P2PServer::P2PClient::send_handshake_solution(const uint8_t (&challenge)[CH const int err = uv_queue_work(&server->m_loop, &work->req, [](uv_work_t* req) { - num_running_jobs.fetch_add(1); + bkg_jobs_tracker.start("P2PServer::send_handshake_solution"); Work* work = reinterpret_cast(req->data); const std::vector& consensus_id = work->server->m_pool->side_chain().consensus_id(); @@ -879,7 +948,7 @@ void P2PServer::P2PClient::send_handshake_solution(const uint8_t (&challenge)[CH [work]() { delete work; - num_running_jobs.fetch_sub(1); + bkg_jobs_tracker.stop("P2PServer::send_handshake_solution"); }); // We might've been disconnected while working on the challenge, do nothing in this case @@ -1118,7 +1187,7 @@ bool P2PServer::P2PClient::on_block_response(const uint8_t* buf, uint32_t size) return false; } - return handle_incoming_block_async(); + return handle_incoming_block_async(server->m_block); } bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size) @@ -1145,13 +1214,13 @@ bool P2PServer::P2PClient::on_block_broadcast(const uint8_t* buf, uint32_t size) if ((server->m_block->m_prevId != server->m_pool->miner_data().prev_id) && (server->m_block->m_txinGenHeight < server->m_pool->miner_data().height)){ - LOGINFO(4, "peer " << static_cast(m_addrString) << " broadcasted a stale block, ignoring it"); + LOGINFO(4, "peer " << static_cast(m_addrString) << " broadcasted a stale block (mainchain height " << server->m_block->m_txinGenHeight << ", expected >= " << server->m_pool->miner_data().height << "), ignoring it"); return true; } server->m_block->m_wantBroadcast = true; - return handle_incoming_block_async(); + return handle_incoming_block_async(server->m_block); } bool P2PServer::P2PClient::on_peer_list_request(const uint8_t*) @@ -1245,15 +1314,17 @@ bool P2PServer::P2PClient::on_peer_list_response(const uint8_t* buf) const return true; } -bool P2PServer::P2PClient::handle_incoming_block_async() +bool P2PServer::P2PClient::handle_incoming_block_async(PoolBlock* block) { P2PServer* server = static_cast(m_owner); - if (server->m_pool->side_chain().block_seen(*server->m_block)) { - LOGINFO(5, "block " << server->m_block->m_sidechainId << " was received before, skipping it"); + if (server->m_pool->side_chain().block_seen(*block)) { + LOGINFO(5, "block " << block->m_sidechainId << " was received before, skipping it"); return true; } + server->store_in_cache(*block); + struct Work { uv_work_t req; @@ -1264,13 +1335,13 @@ bool P2PServer::P2PClient::handle_incoming_block_async() std::vector missing_blocks; }; - Work* work = new Work{ {}, *server->m_block, this, server, m_resetCounter.load(), {} }; + Work* work = new Work{ {}, *block, this, server, m_resetCounter.load(), {} }; work->req.data = work; const int err = uv_queue_work(&server->m_loop, &work->req, [](uv_work_t* req) { - num_running_jobs.fetch_add(1); + bkg_jobs_tracker.start("P2PServer::handle_incoming_block_async"); Work* work = reinterpret_cast(req->data); work->client->handle_incoming_block(work->server->m_pool, work->block, work->client_reset_counter, work->missing_blocks); }, @@ -1279,7 +1350,7 @@ bool P2PServer::P2PClient::handle_incoming_block_async() Work* work = reinterpret_cast(req->data); work->client->post_handle_incoming_block(work->client_reset_counter, work->missing_blocks); delete work; - num_running_jobs.fetch_sub(1); + bkg_jobs_tracker.stop("P2PServer::handle_incoming_block_async"); }); if (err != 0) { @@ -1311,7 +1382,22 @@ void P2PServer::P2PClient::post_handle_incoming_block(const uint32_t reset_count return; } + if (missing_blocks.empty()) { + return; + } + + P2PServer* server = static_cast(m_owner); + + ReadLock lock(server->m_cachedBlocksLock); + for (const hash& id : missing_blocks) { + auto it = server->m_cachedBlocks.find(id); + if (it != server->m_cachedBlocks.end()) { + LOGINFO(5, "using cached block for id = " << id); + handle_incoming_block_async(it->second); + continue; + } + const bool result = m_owner->send(this, [this, &id](void* buf) { diff --git a/src/p2p_server.h b/src/p2p_server.h index 8cb73b8..980b1bd 100644 --- a/src/p2p_server.h +++ b/src/p2p_server.h @@ -19,11 +19,13 @@ #include "tcp_server.h" #include +#include namespace p2pool { class p2pool; struct PoolBlock; +class BlockCache; static constexpr size_t P2P_BUF_SIZE = 128 * 1024; static constexpr size_t PEER_LIST_RESPONSE_MAX_PEERS = 16; @@ -45,6 +47,9 @@ public: explicit P2PServer(p2pool *pool); ~P2PServer(); + void add_cached_block(const PoolBlock& block); + void store_in_cache(const PoolBlock& block); + void connect_to_peers(const std::string& peer_list); void on_connect_failed(bool is_v6, const raw_ip& ip, int port) override; @@ -89,7 +94,7 @@ public: bool on_peer_list_request(const uint8_t* buf); bool on_peer_list_response(const uint8_t* buf) const; - bool handle_incoming_block_async(); + bool handle_incoming_block_async(PoolBlock* block); void handle_incoming_block(p2pool* pool, PoolBlock& block, const uint32_t reset_counter, std::vector& missing_blocks); void post_handle_incoming_block(const uint32_t reset_counter, std::vector& missing_blocks); @@ -113,11 +118,17 @@ public: private: p2pool* m_pool; + BlockCache* m_cache; + bool m_cacheLoaded; + + uv_rwlock_t m_cachedBlocksLock; + std::unordered_map m_cachedBlocks; private: static void on_timer(uv_timer_t* timer) { reinterpret_cast(timer->data)->on_timer(); } void on_timer(); + void flush_cache(); void download_missing_blocks(); void update_peer_connections(); void update_peer_list(); diff --git a/src/p2pool.cpp b/src/p2pool.cpp index 78536c7..4e40a89 100644 --- a/src/p2pool.cpp +++ b/src/p2pool.cpp @@ -320,7 +320,7 @@ void p2pool::update_block_template_async() const int err = uv_queue_work(uv_default_loop(), req, [](uv_work_t* req) { - num_running_jobs.fetch_add(1); + bkg_jobs_tracker.start("p2pool::update_block_template_async"); p2pool* pool = reinterpret_cast(req->data); pool->m_blockTemplate->update(pool->m_minerData, *pool->m_mempool, &pool->m_params->m_wallet); @@ -330,7 +330,7 @@ void p2pool::update_block_template_async() { delete req; - num_running_jobs.fetch_sub(1); + bkg_jobs_tracker.stop("p2pool::update_block_template_async"); }); if (err) { @@ -685,13 +685,7 @@ int p2pool::run() m_stopped = true; - const int32_t k = num_running_jobs.load(); - if (k != 0) { - LOGINFO(1, "waiting for " << k << " background jobs to finish"); - while (num_running_jobs != 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - } + bkg_jobs_tracker.wait(); delete m_stratumServer; delete m_p2pServer; diff --git a/src/pow_hash.cpp b/src/pow_hash.cpp index 28c88fa..d2d9bd4 100644 --- a/src/pow_hash.cpp +++ b/src/pow_hash.cpp @@ -133,7 +133,7 @@ void RandomX_Hasher::set_seed_async(const hash& seed) uv_queue_work(uv_default_loop(), &work->req, [](uv_work_t* req) { - num_running_jobs.fetch_add(1); + bkg_jobs_tracker.start("RandomX_Hasher::set_seed_async"); Work* work = reinterpret_cast(req->data); if (!work->pool->stopped()) { work->hasher->set_seed(work->seed); @@ -142,7 +142,7 @@ void RandomX_Hasher::set_seed_async(const hash& seed) [](uv_work_t* req, int) { delete reinterpret_cast(req->data); - num_running_jobs.fetch_sub(1); + bkg_jobs_tracker.stop("RandomX_Hasher::set_seed_async"); } ); } @@ -166,7 +166,7 @@ void RandomX_Hasher::set_old_seed_async(const hash& seed) uv_queue_work(uv_default_loop(), &work->req, [](uv_work_t* req) { - num_running_jobs.fetch_add(1); + bkg_jobs_tracker.start("RandomX_Hasher::set_old_seed_async"); Work* work = reinterpret_cast(req->data); if (!work->pool->stopped()) { work->hasher->set_old_seed(work->seed); @@ -175,7 +175,7 @@ void RandomX_Hasher::set_old_seed_async(const hash& seed) [](uv_work_t* req, int) { delete reinterpret_cast(req->data); - num_running_jobs.fetch_sub(1); + bkg_jobs_tracker.stop("RandomX_Hasher::set_old_seed_async"); } ); } diff --git a/src/side_chain.cpp b/src/side_chain.cpp index 70385dd..79e87c6 100644 --- a/src/side_chain.cpp +++ b/src/side_chain.cpp @@ -621,8 +621,6 @@ void SideChain::print_status() "\nYour shares = " << our_blocks_in_window << " blocks (+" << our_uncles_in_window << " uncles, " << our_orphans << " orphans)" "\nNext payout = " << log::XMRAmount(m_pool->block_template().next_payout()) ); - - LOGINFO(0, "background jobs running: " << num_running_jobs.load()); } bool SideChain::split_reward(uint64_t reward, const std::vector& shares, std::vector& rewards) @@ -1323,6 +1321,11 @@ void SideChain::update_depths(PoolBlock* block) block = blocks_to_update.back(); blocks_to_update.pop_back(); + // Verify this block and possibly other blocks on top of it when we're sure it will get verified + if (!block->m_verified && ((block->m_depth >= m_chainWindowSize * 2) || (block->m_sidechainHeight == 0))) { + verify_loop(block); + } + auto it = m_blocksById.find(block->m_parent); if (it != m_blocksById.end()) { if (it->second->m_sidechainHeight + 1 != block->m_sidechainHeight) { diff --git a/src/stratum_server.cpp b/src/stratum_server.cpp index 7cf38ee..b944330 100644 --- a/src/stratum_server.cpp +++ b/src/stratum_server.cpp @@ -357,7 +357,7 @@ void StratumServer::on_blobs_ready() void StratumServer::on_share_found(uv_work_t* req) { - num_running_jobs.fetch_add(1); + bkg_jobs_tracker.start("StratumServer::on_share_found"); SubmittedShare* share = reinterpret_cast(req->data); StratumClient* client = share->m_client; @@ -427,7 +427,7 @@ void StratumServer::on_after_share_found(uv_work_t* req, int /*status*/) ON_SCOPE_LEAVE( [share]() { - num_running_jobs.fetch_sub(1); + bkg_jobs_tracker.stop("StratumServer::on_share_found"); MutexLock lock(share->m_server->m_submittedSharesPoolLock); share->m_server->m_submittedSharesPool.push_back(share); diff --git a/src/util.cpp b/src/util.cpp index d687759..29b6aea 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -18,6 +18,9 @@ #include "common.h" #include "util.h" #include "uv_util.h" +#include +#include +#include #ifndef _WIN32 #include @@ -27,8 +30,6 @@ static constexpr char log_category_prefix[] = "Util "; namespace p2pool { -std::atomic num_running_jobs{ 0 }; - MinerCallbackHandler::~MinerCallbackHandler() {} void panic() @@ -131,4 +132,85 @@ void uv_rwlock_init_checked(uv_rwlock_t* lock) } } +struct BackgroundJobTracker::Impl +{ + Impl() { uv_mutex_init_checked(&m_lock); } + ~Impl() { uv_mutex_destroy(&m_lock); } + + void start(const char* name) + { + MutexLock lock(m_lock); + + auto it = m_jobs.insert({ name, 1 }); + if (!it.second) { + ++it.first->second; + } + } + + void stop(const char* name) + { + MutexLock lock(m_lock); + + auto it = m_jobs.find(name); + if (it == m_jobs.end()) { + LOGWARN(1, "background job " << name << " is not running, but stop() was called"); + return; + } + + --it->second; + if (it->second <= 0) { + m_jobs.erase(it); + } + } + + void wait() + { + do { + bool is_empty = true; + { + MutexLock lock(m_lock); + is_empty = m_jobs.empty(); + for (const auto& job : m_jobs) { + LOGINFO(1, "waiting for " << job.second << " \"" << job.first << "\" jobs to finish"); + } + } + + if (is_empty) { + return; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } while (1); + } + + uv_mutex_t m_lock; + std::map m_jobs; +}; + +BackgroundJobTracker::BackgroundJobTracker() : m_impl(new Impl()) +{ +} + +BackgroundJobTracker::~BackgroundJobTracker() +{ + delete m_impl; +} + +void BackgroundJobTracker::start(const char* name) +{ + m_impl->start(name); +} + +void BackgroundJobTracker::stop(const char* name) +{ + m_impl->stop(name); +} + +void BackgroundJobTracker::wait() +{ + m_impl->wait(); +} + +BackgroundJobTracker bkg_jobs_tracker; + } // namespace p2pool diff --git a/src/util.h b/src/util.h index a04d315..fe0f96a 100644 --- a/src/util.h +++ b/src/util.h @@ -98,7 +98,22 @@ template FORCEINLINE constexpr size_t array_size(T(&)[N]) void make_thread_background(); -extern std::atomic num_running_jobs; +class BackgroundJobTracker +{ +public: + BackgroundJobTracker(); + ~BackgroundJobTracker(); + + void start(const char* name); + void stop(const char* name); + void wait(); + +private: + struct Impl; + Impl* m_impl; +}; + +extern BackgroundJobTracker bkg_jobs_tracker; } // namespace p2pool