diff --git a/src/merge_mining_client_tari.cpp b/src/merge_mining_client_tari.cpp index bda0429..02b428d 100644 --- a/src/merge_mining_client_tari.cpp +++ b/src/merge_mining_client_tari.cpp @@ -33,6 +33,7 @@ MergeMiningClientTari::MergeMiningClientTari(p2pool* pool, std::string host, con , m_pool(pool) , m_server(new TariServer(pool->params().m_socks5Proxy)) , m_hostStr(host) + , m_workerStop(0) { if (host.find(TARI_PREFIX) != 0) { LOGERR(1, "Invalid host " << host << " - \"" << TARI_PREFIX << "\" prefix not found"); @@ -65,7 +66,7 @@ MergeMiningClientTari::MergeMiningClientTari(p2pool* pool, std::string host, con throw std::exception(); } - uv_rwlock_init_checked(&m_lock); + uv_rwlock_init_checked(&m_chainParamsLock); if (!m_server->start()) { throw std::exception(); @@ -77,22 +78,40 @@ MergeMiningClientTari::MergeMiningClientTari(p2pool* pool, std::string host, con m_TariNode = new BaseNode::Stub(grpc::CreateChannel(buf, grpc::InsecureChannelCredentials())); - merge_mining_get_chain_id(); + uv_mutex_init_checked(&m_workerLock); + uv_cond_init_checked(&m_workerCond); + + const int err = uv_thread_create(&m_worker, run_wrapper, this); + if (err) { + LOGERR(1, "failed to start worker thread, error " << uv_err_name(err)); + throw std::exception(); + } } MergeMiningClientTari::~MergeMiningClientTari() { + LOGINFO(1, "stopping"); + + m_workerStop.exchange(1); + uv_cond_signal(&m_workerCond); + uv_thread_join(&m_worker); + m_server->shutdown_tcp(); delete m_server; delete m_TariNode; + uv_rwlock_destroy(&m_chainParamsLock); + + uv_mutex_destroy(&m_workerLock); + uv_cond_destroy(&m_workerCond); + LOGINFO(1, "stopped"); } bool MergeMiningClientTari::get_params(ChainParameters& out_params) const { - ReadLock lock(m_lock); + ReadLock lock(m_chainParamsLock); if (m_chainParams.aux_id.empty() || m_chainParams.aux_diff.empty()) { return false; @@ -108,58 +127,59 @@ void MergeMiningClientTari::submit_solution(const std::vector& blob, co (void)merkle_proof; } -void MergeMiningClientTari::merge_mining_get_chain_id() +void MergeMiningClientTari::run_wrapper(void* arg) { - struct Work - { - uv_work_t req; - MergeMiningClientTari* client; - }; + reinterpret_cast(arg)->run(); + LOGINFO(1, "worker thread stopped"); +} - Work* work = new Work{}; - work->req.data = work; - work->client = this; +void MergeMiningClientTari::run() +{ + LOGINFO(1, "worker thread ready"); - uv_queue_work(m_server->get_loop(), &work->req, - [](uv_work_t* req) + do { + MutexLock lock(m_workerLock); + + LOGINFO(6, "Getting new block template from Tari node"); + + grpc::Status status; + + NewBlockTemplateRequest request; + PowAlgo* algo = new PowAlgo(); + algo->set_pow_algo(PowAlgo_PowAlgos_POW_ALGOS_RANDOMX); + request.clear_algo(); + request.set_allocated_algo(algo); + request.set_max_weight(1); + + grpc::ClientContext ctx; + NewBlockTemplateResponse response; + status = m_TariNode->GetNewBlockTemplate(&ctx, request, &response); + + grpc::ClientContext ctx2; + GetNewBlockResult response2; + status = m_TariNode->GetNewBlock(&ctx2, response.new_block_template(), &response2); + + bool aux_id_empty; { - BACKGROUND_JOB_START(MergeMiningClientTari::merge_mining_get_chain_id); - - MergeMiningClientTari* client = reinterpret_cast(req->data)->client; - - grpc::Status status; - - NewBlockTemplateRequest request; - PowAlgo* algo = new PowAlgo(); - algo->set_pow_algo(PowAlgo_PowAlgos_POW_ALGOS_RANDOMX); - request.clear_algo(); - request.set_allocated_algo(algo); - request.set_max_weight(1); - - grpc::ClientContext ctx; - NewBlockTemplateResponse response; - status = client->m_TariNode->GetNewBlockTemplate(&ctx, request, &response); - - grpc::ClientContext ctx2; - GetNewBlockResult response2; - status = client->m_TariNode->GetNewBlock(&ctx2, response.new_block_template(), &response2); + ReadLock lock2(m_chainParamsLock); + aux_id_empty = m_chainParams.aux_id.empty(); + } + if (aux_id_empty) { const std::string& id = response2.tari_unique_id(); - LOGINFO(1, client->m_hostStr << " uses chain_id " << log::LightCyan() << log::hex_buf(id.data(), id.size())); + LOGINFO(1, m_hostStr << " uses chain_id " << log::LightCyan() << log::hex_buf(id.data(), id.size())); if (id.size() == HASH_SIZE) { - WriteLock lock(client->m_lock); - std::copy(id.begin(), id.end(), client->m_chainParams.aux_id.h); + WriteLock lock2(m_chainParamsLock); + std::copy(id.begin(), id.end(), m_chainParams.aux_id.h); } else { LOGERR(1, "Tari unique_id has invalid size (" << id.size() << ')'); } - }, - [](uv_work_t* req, int /*status*/) - { - delete reinterpret_cast(req->data); - BACKGROUND_JOB_STOP(MergeMiningClientTari::merge_mining_get_chain_id); - }); + } + + LOGINFO(6, "Tari height = " << response2.block().header().height()); + } while ((uv_cond_timedwait(&m_workerCond, &m_workerLock, 500'000'000) == UV_ETIMEDOUT) && (m_workerStop.load() == 0)); } // TariServer and TariClient are simply a proxy from a localhost TCP port to the external Tari node @@ -232,9 +252,6 @@ bool MergeMiningClientTari::TariServer::connect_upstream(TariClient* downstream) upstream->m_pairedClient = downstream; upstream->m_pairedClientSavedResetCounter = downstream->m_resetCounter; - downstream->m_pairedClient = upstream; - downstream->m_pairedClientSavedResetCounter = upstream->m_resetCounter; - return true; } @@ -275,6 +292,29 @@ bool MergeMiningClientTari::TariClient::on_connect() if (m_isIncoming) { return server->connect_upstream(this); } + else { + TariClient* downstream = m_pairedClient; + downstream->m_pairedClient = this; + downstream->m_pairedClientSavedResetCounter = m_resetCounter; + + const std::vector& v = downstream->m_pendingData; + + if (!v.empty()) { + const bool result = server->send(this, + [&v](uint8_t* buf, size_t buf_size) -> size_t + { + if (v.size() > buf_size) { + return 0U; + } + + std::copy(v.begin(), v.end(), buf); + return v.size(); + }); + + downstream->m_pendingData.clear(); + return result; + } + } return true; } @@ -287,7 +327,9 @@ bool MergeMiningClientTari::TariClient::on_read(char* data, uint32_t size) } if (!is_paired()) { - return false; + LOGWARN(5, "Read " << size << " bytes from " << static_cast(m_addrString) << " but it's not paired yet. Buffering it."); + m_pendingData.insert(m_pendingData.end(), data, data + size); + return true; } return server->send(m_pairedClient, @@ -297,7 +339,7 @@ bool MergeMiningClientTari::TariClient::on_read(char* data, uint32_t size) return 0U; } - memcpy(buf, data, size); + std::copy(data, data + size, buf); return size; }); } diff --git a/src/merge_mining_client_tari.h b/src/merge_mining_client_tari.h index 1fdbe72..752a9d6 100644 --- a/src/merge_mining_client_tari.h +++ b/src/merge_mining_client_tari.h @@ -36,9 +36,7 @@ public: static constexpr char TARI_PREFIX[] = "tari://"; private: - void merge_mining_get_chain_id(); - - mutable uv_rwlock_t m_lock; + mutable uv_rwlock_t m_chainParamsLock; ChainParameters m_chainParams; std::string m_auxWallet; @@ -85,12 +83,22 @@ private: [[nodiscard]] bool on_read(char* data, uint32_t size) override; char m_buf[BUF_SIZE]; + std::vector m_pendingData; bool is_paired() const { return m_pairedClient && (m_pairedClient->m_resetCounter == m_pairedClientSavedResetCounter); } TariClient* m_pairedClient; uint32_t m_pairedClientSavedResetCounter; }; + + uv_thread_t m_worker; + + uv_mutex_t m_workerLock; + uv_cond_t m_workerCond; + std::atomic m_workerStop; + + static void run_wrapper(void* arg); + void run(); }; } // namespace p2pool