From 30861bbf912c37a32b002d44e012f1efbbf4dc93 Mon Sep 17 00:00:00 2001 From: SChernykh Date: Mon, 15 Aug 2022 11:16:00 +0200 Subject: [PATCH] Speedup SideChain::get_outputs_blob() --- src/block_cache.cpp | 2 +- src/block_template.cpp | 4 +- src/p2p_server.cpp | 2 +- src/pool_block.h | 2 +- src/pool_block_parser.inl | 4 +- src/side_chain.cpp | 72 +++++++++++++++++++++++++++++++++- src/side_chain.h | 2 +- tests/src/pool_block_tests.cpp | 4 +- 8 files changed, 81 insertions(+), 11 deletions(-) diff --git a/src/block_cache.cpp b/src/block_cache.cpp index 5f5daf8..a1bed33 100644 --- a/src/block_cache.cpp +++ b/src/block_cache.cpp @@ -198,7 +198,7 @@ void BlockCache::load_all(SideChain& side_chain, P2PServer& server) continue; } - if (block.deserialize(data + sizeof(uint32_t), n, side_chain) == 0) { + if (block.deserialize(data + sizeof(uint32_t), n, side_chain, uv_default_loop_checked()) == 0) { server.add_cached_block(block); ++blocks_loaded; } diff --git a/src/block_template.cpp b/src/block_template.cpp index 1f20bd0..530303d 100644 --- a/src/block_template.cpp +++ b/src/block_template.cpp @@ -548,7 +548,7 @@ void BlockTemplate::update(const MinerData& data, const Mempool& mempool, Wallet buf.insert(buf.end(), m_poolBlockTemplate->m_sideChainData.begin(), m_poolBlockTemplate->m_sideChainData.end()); PoolBlock check; - const int result = check.deserialize(buf.data(), buf.size(), m_pool->side_chain()); + const int result = check.deserialize(buf.data(), buf.size(), m_pool->side_chain(), nullptr); if (result != 0) { LOGERR(1, "pool block blob generation and/or parsing is broken, error " << result); } @@ -1077,7 +1077,7 @@ void BlockTemplate::submit_sidechain_block(uint32_t template_id, uint32_t nonce, buf.insert(buf.end(), m_poolBlockTemplate->m_sideChainData.begin(), m_poolBlockTemplate->m_sideChainData.end()); PoolBlock check; - const int result = check.deserialize(buf.data(), buf.size(), side_chain); + const int result = check.deserialize(buf.data(), buf.size(), side_chain, nullptr); if (result != 0) { LOGERR(1, "pool block blob generation and/or parsing is broken, error " << result); } diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index 966365a..1400443 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -900,7 +900,7 @@ int P2PServer::deserialize_block(const uint8_t* buf, uint32_t size) result = m_blockDeserializeResult; } else { - result = m_block->deserialize(buf, size, m_pool->side_chain()); + result = m_block->deserialize(buf, size, m_pool->side_chain(), &m_loop); m_blockDeserializeBuf.assign(buf, buf + size); m_blockDeserializeResult = result; m_lookForMissingBlocks = true; diff --git a/src/pool_block.h b/src/pool_block.h index ba50ce9..d865b67 100644 --- a/src/pool_block.h +++ b/src/pool_block.h @@ -139,7 +139,7 @@ struct PoolBlock void serialize_mainchain_data(uint32_t nonce, uint32_t extra_nonce, const hash& sidechain_hash); void serialize_sidechain_data(); - int deserialize(const uint8_t* data, size_t size, const SideChain& sidechain); + int deserialize(const uint8_t* data, size_t size, const SideChain& sidechain, uv_loop_t* loop); void reset_offchain_data(); bool get_pow_hash(RandomX_Hasher_Base* hasher, uint64_t height, const hash& seed_hash, hash& pow_hash); diff --git a/src/pool_block_parser.inl b/src/pool_block_parser.inl index f34d949..98f358a 100644 --- a/src/pool_block_parser.inl +++ b/src/pool_block_parser.inl @@ -23,7 +23,7 @@ namespace p2pool { // Since data here can come from external and possibly malicious sources, check everything // Only the syntax (i.e. the serialized block binary format) and the keccak hash are checked here // Semantics must also be checked elsewhere before accepting the block (PoW, reward split between miners, difficulty calculation and so on) -int PoolBlock::deserialize(const uint8_t* data, size_t size, const SideChain& sidechain) +int PoolBlock::deserialize(const uint8_t* data, size_t size, const SideChain& sidechain, uv_loop_t* loop) { try { // Sanity check @@ -272,7 +272,7 @@ int PoolBlock::deserialize(const uint8_t* data, size_t size, const SideChain& si return __LINE__; } - if ((num_outputs == 0) && !sidechain.get_outputs_blob(this, total_reward, outputs_blob)) { + if ((num_outputs == 0) && !sidechain.get_outputs_blob(this, total_reward, outputs_blob, loop)) { return __LINE__; } diff --git a/src/side_chain.cpp b/src/side_chain.cpp index 368b5f2..bd766cc 100644 --- a/src/side_chain.cpp +++ b/src/side_chain.cpp @@ -682,7 +682,7 @@ bool SideChain::get_block_blob(const hash& id, std::vector& blob) const return true; } -bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::vector& blob) const +bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::vector& blob, uv_loop_t* loop) const { blob.clear(); @@ -719,6 +719,60 @@ bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::v const size_t n = tmpShares.size(); + // Helper jobs call get_eph_public_key with indices in descending order + // Current thread will process indices in ascending order so when they meet, everything will be cached + + std::atomic counter{ 0 }; + std::atomic num_helper_jobs_finished{ 0 }; + int num_helper_jobs_started = 0; + + if (loop) { + constexpr size_t HELPER_JOBS_COUNT = 4; + + struct Work + { + uv_work_t req; + const std::vector& tmpShares; + const hash& txkeySec; + std::atomic& counter; + std::atomic& num_helper_jobs_finished; + + // Fix MSVC warnings + Work() = delete; + Work& operator=(Work&&) = delete; + }; + + counter = static_cast(n) - 1; + num_helper_jobs_started = HELPER_JOBS_COUNT; + + for (size_t i = 0; i < HELPER_JOBS_COUNT; ++i) { + Work* w = new Work{ {}, tmpShares, block->m_txkeySec, counter, num_helper_jobs_finished }; + w->req.data = w; + + const int err = uv_queue_work(loop, &w->req, + [](uv_work_t* req) + { + Work* work = reinterpret_cast(req->data); + hash eph_public_key; + + int index; + while ((index = work->counter.fetch_sub(1)) >= 0) { + uint8_t view_tag; + work->tmpShares[index].m_wallet->get_eph_public_key(work->txkeySec, static_cast(index), eph_public_key, view_tag); + } + + ++work->num_helper_jobs_finished; + delete work; + }, nullptr); + + if (err) { + LOGERR(1, "get_outputs_blob: uv_queue_work failed, error " << uv_err_name(err)); + --num_helper_jobs_started; + delete w; + } + } + } + blob.reserve(n * 39 + 64); writeVarint(n, blob); @@ -730,6 +784,13 @@ bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::v hash eph_public_key; for (size_t i = 0; i < n; ++i) { + // stop helper jobs when they meet with current thread + const int c = counter.load(); + if ((c >= 0) && (static_cast(i) >= c)) { + // this will cause all helper jobs to finish immediately + counter = -1; + } + writeVarint(tmpRewards[i], blob); blob.emplace_back(tx_type); @@ -747,6 +808,15 @@ bool SideChain::get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::v block->m_outputs.emplace_back(tmpRewards[i], eph_public_key, tx_type, view_tag); } + if (loop) { + // this will cause all helper jobs to finish immediately + counter = -1; + + while (num_helper_jobs_finished < num_helper_jobs_started) { + std::this_thread::yield(); + } + } + return true; } diff --git a/src/side_chain.h b/src/side_chain.h index c8f99f1..0118504 100644 --- a/src/side_chain.h +++ b/src/side_chain.h @@ -56,7 +56,7 @@ public: void watch_mainchain_block(const ChainMain& data, const hash& possible_id); bool get_block_blob(const hash& id, std::vector& blob) const; - bool get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::vector& blob) const; + bool get_outputs_blob(PoolBlock* block, uint64_t total_reward, std::vector& blob, uv_loop_t* loop) const; void print_status() const; double get_reward_share(const Wallet& w) const; diff --git a/tests/src/pool_block_tests.cpp b/tests/src/pool_block_tests.cpp index 196761d..39a4f4d 100644 --- a/tests/src/pool_block_tests.cpp +++ b/tests/src/pool_block_tests.cpp @@ -49,7 +49,7 @@ TEST(pool_block, deserialize) f.read(reinterpret_cast(buf.data()), buf.size()); ASSERT_EQ(f.good(), true); - ASSERT_EQ(b.deserialize(buf.data(), buf.size(), sidechain), 0); + ASSERT_EQ(b.deserialize(buf.data(), buf.size(), sidechain, nullptr), 0); ASSERT_EQ(b.m_mainChainData.size(), 5607); ASSERT_EQ(b.m_mainChainHeaderSize, 43); @@ -121,7 +121,7 @@ TEST(pool_block, verify) p += sizeof(uint32_t); ASSERT_TRUE(p + n <= e); - ASSERT_EQ(b.deserialize(p, n, sidechain), 0); + ASSERT_EQ(b.deserialize(p, n, sidechain, nullptr), 0); p += n; sidechain.add_block(b);