diff --git a/src/json_rpc_request.cpp b/src/json_rpc_request.cpp index b6d20d9..6f25a7e 100644 --- a/src/json_rpc_request.cpp +++ b/src/json_rpc_request.cpp @@ -33,7 +33,7 @@ JSONRPCRequest::JSONRPCRequest(const char* address, int port, const char* req, C { m_readBuf[0] = '\0'; - uv_tcp_init(uv_default_loop(), &m_socket); + uv_tcp_init(uv_default_loop_checked(), &m_socket); uv_tcp_nodelay(&m_socket, 1); sockaddr_in dest; diff --git a/src/log.cpp b/src/log.cpp index d2fb5ed..fe4aa21 100644 --- a/src/log.cpp +++ b/src/log.cpp @@ -51,6 +51,8 @@ public: : m_writePos(0) , m_readPos(0) { + is_main_thread = true; + m_logFile.open(log_file_name, std::ios::app | std::ios::binary); m_buf.resize(BUF_SIZE); diff --git a/src/p2pool.cpp b/src/p2pool.cpp index 0e7978e..5f50f09 100644 --- a/src/p2pool.cpp +++ b/src/p2pool.cpp @@ -43,6 +43,7 @@ namespace p2pool { p2pool::p2pool(int argc, char* argv[]) : m_stopped(false) , m_params(new Params(argc, argv)) + , m_updateSeed(true) { if (!m_params->m_wallet.valid()) { LOGERR(1, "Invalid wallet address. Try \"p2pool --help\"."); @@ -58,6 +59,20 @@ p2pool::p2pool(int argc, char* argv[]) LOGWARN(1, "Mining to a stagenet wallet address"); } + int err = uv_async_init(uv_default_loop_checked(), &m_blockTemplateAsync, on_update_block_template); + if (err) { + LOGERR(1, "uv_async_init failed, error " << uv_err_name(err)); + panic(); + } + m_blockTemplateAsync.data = this; + + err = uv_async_init(uv_default_loop_checked(), &m_stopAsync, on_stop); + if (err) { + LOGERR(1, "uv_async_init failed, error " << uv_err_name(err)); + panic(); + } + m_stopAsync.data = this; + uv_rwlock_init_checked(&m_mainchainLock); MinerData d; @@ -71,6 +86,8 @@ p2pool::p2pool(int argc, char* argv[]) p2pool::~p2pool() { + uv_close(reinterpret_cast(&m_blockTemplateAsync), nullptr); + uv_close(reinterpret_cast(&m_stopAsync), nullptr); uv_rwlock_destroy(&m_mainchainLock); delete m_sideChain; @@ -152,6 +169,7 @@ void p2pool::handle_miner_data(MinerData& data) data.tx_backlog.clear(); m_minerData = data; + m_updateSeed = true; update_median_timestamp(); LOGINFO(2, @@ -167,10 +185,12 @@ void p2pool::handle_miner_data(MinerData& data) "\n---------------------------------------------------------------------------------------------------------------" ); - m_hasher->set_seed_async(m_minerData.seed_hash); - - m_blockTemplate->update(m_minerData, *m_mempool, &m_params->m_wallet); - stratum_on_block(); + if (!is_main_thread) { + update_block_template_async(); + } + else { + update_block_template(); + } } static constexpr char BLOCK_FOUND[] = "\n\ @@ -314,30 +334,22 @@ void p2pool::submit_sidechain_block(uint32_t template_id, uint32_t nonce, uint32 void p2pool::update_block_template_async() { - uv_work_t* req = new uv_work_t{}; - req->data = this; - - const int err = uv_queue_work(uv_default_loop(), req, - [](uv_work_t* req) - { - bkg_jobs_tracker.start("p2pool::update_block_template_async"); - - p2pool* pool = reinterpret_cast(req->data); - pool->m_blockTemplate->update(pool->m_minerData, *pool->m_mempool, &pool->m_params->m_wallet); - pool->stratum_on_block(); - }, - [](uv_work_t* req, int /*status*/) - { - delete req; - - bkg_jobs_tracker.stop("p2pool::update_block_template_async"); - }); - + const int err = uv_async_send(&m_blockTemplateAsync); if (err) { - LOGERR(1, "uv_queue_work failed, error " << uv_err_name(err)); + LOGERR(1, "uv_async_send failed, error " << uv_err_name(err)); } } +void p2pool::update_block_template() +{ + if (m_updateSeed) { + m_hasher->set_seed_async(m_minerData.seed_hash); + m_updateSeed = false; + } + m_blockTemplate->update(m_minerData, *m_mempool, &m_params->m_wallet); + stratum_on_block(); +} + void p2pool::download_block_headers(uint64_t current_height) { const uint64_t seed_height = get_seed_height(current_height); @@ -622,13 +634,13 @@ static void on_signal(uv_signal_t* handle, int signum) LOGINFO(1, "stopping"); uv_signal_stop(handle); - uv_stop(uv_default_loop()); + uv_stop(uv_default_loop_checked()); } static bool init_uv_threadpool() { static uv_work_t dummy; - return (uv_queue_work(uv_default_loop(), &dummy, [](uv_work_t*) {}, nullptr) == 0); + return (uv_queue_work(uv_default_loop_checked(), &dummy, [](uv_work_t*) {}, nullptr) == 0); } static bool init_signals() @@ -648,7 +660,7 @@ static bool init_signals() static uv_signal_t signals[array_size(signal_names)]; for (size_t i = 0; i < array_size(signal_names); ++i) { - uv_signal_init(uv_default_loop(), &signals[i]); + uv_signal_init(uv_default_loop_checked(), &signals[i]); const int rc = uv_signal_start(&signals[i], on_signal, signal_names[i]); if (rc != 0) { LOGERR(1, "failed to initialize signal, error " << rc); @@ -661,13 +673,8 @@ static bool init_signals() void p2pool::stop() { - uv_async_t asy; - uv_loop_t *loop = uv_default_loop(); - - /* use async handle to make sure event loops wake up and stop */ - uv_async_init(loop, &asy, NULL); - uv_stop(loop); - uv_async_send(&asy); + uv_stop(uv_default_loop()); + uv_async_send(&m_stopAsync); } int p2pool::run() @@ -690,7 +697,7 @@ int p2pool::run() { ZMQReader z(m_params->m_host, m_params->m_rpcPort, m_params->m_zmqPort, this); get_miner_data(); - const int rc = uv_run(uv_default_loop(), UV_RUN_DEFAULT); + const int rc = uv_run(uv_default_loop_checked(), UV_RUN_DEFAULT); LOGINFO(1, "uv_run exited, result = " << rc); } diff --git a/src/p2pool.h b/src/p2pool.h index 27cc2a9..c349bdb 100644 --- a/src/p2pool.h +++ b/src/p2pool.h @@ -62,7 +62,9 @@ public: void submit_block(uint32_t template_id, uint32_t nonce, uint32_t extra_nonce) const; void submit_sidechain_block(uint32_t template_id, uint32_t nonce, uint32_t extra_nonce); + void update_block_template_async(); + void update_block_template(); void download_block_headers(uint64_t current_height); @@ -72,6 +74,9 @@ private: p2pool(const p2pool&) = delete; p2pool(p2pool&&) = delete; + static void on_update_block_template(uv_async_t* async) { reinterpret_cast(async->data)->update_block_template(); } + static void on_stop(uv_async_t*) {} + bool m_stopped; Params* m_params; @@ -80,6 +85,7 @@ private: RandomX_Hasher* m_hasher; BlockTemplate* m_blockTemplate; MinerData m_minerData; + bool m_updateSeed; Mempool* m_mempool; mutable uv_rwlock_t m_mainchainLock; @@ -103,6 +109,9 @@ private: P2PServer* m_p2pServer = nullptr; ConsoleCommands* m_consoleCommands; + + uv_async_t m_blockTemplateAsync; + uv_async_t m_stopAsync; }; } // namespace p2pool diff --git a/src/pow_hash.cpp b/src/pow_hash.cpp index d2d9bd4..08ce0aa 100644 --- a/src/pow_hash.cpp +++ b/src/pow_hash.cpp @@ -130,7 +130,7 @@ void RandomX_Hasher::set_seed_async(const hash& seed) work->seed = seed; work->req.data = work; - uv_queue_work(uv_default_loop(), &work->req, + uv_queue_work(uv_default_loop_checked(), &work->req, [](uv_work_t* req) { bkg_jobs_tracker.start("RandomX_Hasher::set_seed_async"); @@ -163,7 +163,7 @@ void RandomX_Hasher::set_old_seed_async(const hash& seed) work->seed = seed; work->req.data = work; - uv_queue_work(uv_default_loop(), &work->req, + uv_queue_work(uv_default_loop_checked(), &work->req, [](uv_work_t* req) { bkg_jobs_tracker.start("RandomX_Hasher::set_old_seed_async"); diff --git a/src/util.cpp b/src/util.cpp index 011bd76..89d0f73 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -132,6 +132,19 @@ void uv_rwlock_init_checked(uv_rwlock_t* lock) } } +uv_loop_t* uv_default_loop_checked() +{ + if (!is_main_thread) { + LOGERR(1, "uv_default_loop() can only be used by the main thread. Fix the code!"); +#ifdef _WIN32 + if (IsDebuggerPresent()) { + __debugbreak(); + } +#endif + } + return uv_default_loop(); +} + struct BackgroundJobTracker::Impl { Impl() { uv_mutex_init_checked(&m_lock); } @@ -235,5 +248,6 @@ void BackgroundJobTracker::print_status() } BackgroundJobTracker bkg_jobs_tracker; +thread_local bool is_main_thread = false; } // namespace p2pool diff --git a/src/util.h b/src/util.h index bf807c5..c04edcc 100644 --- a/src/util.h +++ b/src/util.h @@ -115,6 +115,7 @@ private: }; extern BackgroundJobTracker bkg_jobs_tracker; +extern thread_local bool is_main_thread; } // namespace p2pool diff --git a/src/uv_util.h b/src/uv_util.h index 41740d4..3d11d13 100644 --- a/src/uv_util.h +++ b/src/uv_util.h @@ -56,5 +56,6 @@ typedef RWLock WriteLock; void uv_mutex_init_checked(uv_mutex_t* mutex); void uv_rwlock_init_checked(uv_rwlock_t* lock); +uv_loop_t* uv_default_loop_checked(); } // namespace p2pool