diff --git a/src/json_rpc_request.cpp b/src/json_rpc_request.cpp index d20c605..78b4f42 100644 --- a/src/json_rpc_request.cpp +++ b/src/json_rpc_request.cpp @@ -78,6 +78,9 @@ struct CurlContext std::string m_error; curl_slist* m_headers; + + uint64_t m_startTime; + uint64_t m_connectedTime; }; CurlContext::CurlContext(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop) @@ -90,6 +93,8 @@ CurlContext::CurlContext(const std::string& address, int port, const std::string , m_handle(nullptr) , m_req(req) , m_headers(nullptr) + , m_startTime(0) + , m_connectedTime(0) { m_pollHandles.reserve(2); @@ -214,12 +219,17 @@ CurlContext::CurlContext(const std::string& address, int port, const std::string uv_close(reinterpret_cast(&m_timer), nullptr); throw std::runtime_error("curl_multi_add_handle failed"); } + + m_startTime = microseconds_since_epoch(); } CurlContext::~CurlContext() { + double tcp_ping = 0.0; + if (m_error.empty() && !m_response.empty()) { - (*m_callback)(m_response.data(), m_response.size()); + tcp_ping = static_cast(m_connectedTime - m_startTime) / 1000.0; + (*m_callback)(m_response.data(), m_response.size(), tcp_ping); } delete m_callback; @@ -232,7 +242,7 @@ CurlContext::~CurlContext() } } - (*m_closeCallback)(m_error.c_str(), m_error.length()); + (*m_closeCallback)(m_error.c_str(), m_error.length(), tcp_ping); delete m_closeCallback; curl_slist_free_all(m_headers); @@ -369,6 +379,8 @@ size_t CurlContext::on_write(const void* buffer, size_t size, size_t count) void CurlContext::curl_perform(uv_poll_t* req, int status, int events) { + CurlContext* ctx = reinterpret_cast(req->data); + int flags = 0; if (status < 0) { flags |= CURL_CSELECT_ERR; @@ -376,11 +388,14 @@ void CurlContext::curl_perform(uv_poll_t* req, int status, int events) } else { if (events & UV_READABLE) flags |= CURL_CSELECT_IN; - if (events & UV_WRITABLE) flags |= CURL_CSELECT_OUT; + if (events & UV_WRITABLE) { + flags |= CURL_CSELECT_OUT; + if (!ctx->m_connectedTime) { + ctx->m_connectedTime = microseconds_since_epoch(); + } + } } - CurlContext* ctx = reinterpret_cast(req->data); - int running_handles = 0; auto it = std::find_if(ctx->m_pollHandles.begin(), ctx->m_pollHandles.end(), [req](const auto& value) { return value.second == req; }); if (it != ctx->m_pollHandles.end()) { @@ -471,7 +486,7 @@ void Call(const std::string& address, int port, const std::string& req, const st } catch (const std::exception& e) { const char* msg = e.what(); - (*close_cb)(msg, strlen(msg)); + (*close_cb)(msg, strlen(msg), 0.0); } }); diff --git a/src/json_rpc_request.h b/src/json_rpc_request.h index 7c88a28..2062ebf 100644 --- a/src/json_rpc_request.h +++ b/src/json_rpc_request.h @@ -20,15 +20,15 @@ namespace p2pool { namespace JSONRPCRequest { -typedef Callback::Base CallbackBase; +typedef Callback::Base CallbackBase; void Call(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, CallbackBase* cb, CallbackBase* close_cb, uv_loop_t* loop); template FORCEINLINE void call(const std::string& address, int port, const std::string& req, const std::string& auth, const std::string& proxy, T&& cb, U&& close_cb, uv_loop_t* loop = nullptr) { - typedef Callback::Derived CallbackT; - typedef Callback::Derived CallbackU; + typedef Callback::Derived CallbackT; + typedef Callback::Derived CallbackU; Call(address, port, req, auth, proxy, new CallbackT(std::move(cb)), new CallbackU(std::move(close_cb)), loop); } diff --git a/src/p2p_server.cpp b/src/p2p_server.cpp index 03ecddd..632861a 100644 --- a/src/p2p_server.cpp +++ b/src/p2p_server.cpp @@ -618,7 +618,7 @@ void P2PServer::load_monerod_peer_list() const Params::Host host = m_pool->current_host(); JSONRPCRequest::call(host.m_address, host.m_rpcPort, "/get_peer_list", host.m_rpcLogin, m_socks5Proxy, - [this](const char* data, size_t size) + [this](const char* data, size_t size, double) { #define ERR_STR "/get_peer_list RPC request returned invalid JSON " @@ -685,7 +685,7 @@ void P2PServer::load_monerod_peer_list() LOGINFO(4, "monerod peer list loaded (" << m_peerListMonero.size() << " peers)"); }, - [](const char* data, size_t size) + [](const char* data, size_t size, double) { if (size > 0) { LOGWARN(4, "/get_peer_list RPC request failed: error " << log::const_buf(data, size)); diff --git a/src/p2pool.cpp b/src/p2pool.cpp index 2af3fbe..0fc8f83 100644 --- a/src/p2pool.cpp +++ b/src/p2pool.cpp @@ -78,6 +78,8 @@ p2pool::p2pool(int argc, char* argv[]) m_currentHost = p->m_hosts.front(); m_currentHostIndex = 0; + m_hostPing.resize(p->m_hosts.size()); + hash pub, sec, eph_public_key; generate_keys(pub, sec); @@ -202,16 +204,43 @@ p2pool::~p2pool() delete m_params; } +void p2pool::update_host_ping(const std::string& display_name, double ping) +{ + if (ping < 100) { + LOGINFO(1, display_name << " ping is " << ping << " ms"); + } + else { + LOGWARN(1, display_name << " ping is " << ping << " ms, this is too high for an efficient mining. Try to use a different node, or your own local node."); + } + + const std::vector& v = m_params->m_hosts; + + for (size_t i = 0, n = v.size(); i < n; ++i) { + if (v[i].m_displayName == display_name) { + m_hostPing[i] = ping; + return; + } + } +} + void p2pool::print_hosts() const { const Params::Host host = current_host(); - for (const Params::Host& h : m_params->m_hosts) { + for (size_t i = 0, n = m_params->m_hosts.size(); i < n; ++i) { + const Params::Host& h = m_params->m_hosts[i]; + + char buf[64] = {}; + if (m_hostPing[i] > 0.0) { + log::Stream s(buf); + s << " (" << m_hostPing[i] << " ms)"; + } + if (h.m_displayName == host.m_displayName) { - LOGINFO(0, log::LightCyan() << "-> " << h.m_displayName); + LOGINFO(0, log::LightCyan() << "-> " << h.m_displayName << buf); } else { - LOGINFO(0, " " << h.m_displayName); + LOGINFO(0, " " << h.m_displayName << buf); } } } @@ -368,14 +397,14 @@ void p2pool::handle_miner_data(MinerData& data) s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_header_by_height\",\"params\":{\"height\":" << h << "}}\0"; JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, m_params->m_socks5Proxy, - [this, h](const char* data, size_t size) + [this, h](const char* data, size_t size, double) { ChainMain block; if (!parse_block_header(data, size, block)) { LOGERR(1, "couldn't download block header for height " << h); } }, - [h](const char* data, size_t size) + [h](const char* data, size_t size, double) { if (size > 0) { LOGERR(1, "couldn't download block header for height " << h << ", error " << log::const_buf(data, size)); @@ -603,7 +632,7 @@ void p2pool::submit_block() const const Params::Host host = current_host(); JSONRPCRequest::call(host.m_address, host.m_rpcPort, request, host.m_rpcLogin, m_params->m_socks5Proxy, - [height, diff, template_id, nonce, extra_nonce, sidechain_id, is_external](const char* data, size_t size) + [height, diff, template_id, nonce, extra_nonce, sidechain_id, is_external](const char* data, size_t size, double) { rapidjson::Document doc; if (doc.Parse(data, size).HasParseError() || !doc.IsObject()) { @@ -647,7 +676,7 @@ void p2pool::submit_block() const LOGWARN(0, "submit_block: daemon sent unrecognizable reply: " << log::const_buf(data, size)); }, - [is_external](const char* data, size_t size) + [is_external](const char* data, size_t size, double) { if (size > 0) { if (is_external) { @@ -722,7 +751,7 @@ void p2pool::download_block_headers(uint64_t current_height) s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_header_by_height\",\"params\":{\"height\":" << height << "}}\0"; JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, m_params->m_socks5Proxy, - [this, prev_seed_height, height](const char* data, size_t size) + [this, prev_seed_height, height](const char* data, size_t size, double) { ChainMain block; if (parse_block_header(data, size, block)) { @@ -736,7 +765,7 @@ void p2pool::download_block_headers(uint64_t current_height) PANIC_STOP(); } }, - [height](const char* data, size_t size) + [height](const char* data, size_t size, double) { if (size > 0) { LOGERR(1, "fatal error: couldn't download block header for seed height " << height << ", error " << log::const_buf(data, size)); @@ -751,7 +780,7 @@ void p2pool::download_block_headers(uint64_t current_height) s << "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_block_headers_range\",\"params\":{\"start_height\":" << start_height << ",\"end_height\":" << current_height - 1 << "}}\0"; JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, m_params->m_socks5Proxy, - [this, start_height, current_height, host](const char* data, size_t size) + [this, start_height, current_height, host](const char* data, size_t size, double) { if (parse_block_headers_range(data, size) == current_height - start_height) { update_median_timestamp(); @@ -779,6 +808,16 @@ void p2pool::download_block_headers(uint64_t current_height) api_update_network_stats(); get_miner_data(); + // Get ping times for all other hosts + for (const Params::Host& h : m_params->m_hosts) { + const std::string& name = h.m_displayName; + if (name != host.m_displayName) { + JSONRPCRequest::call(h.m_address, h.m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_version\"}", h.m_rpcLogin, m_params->m_socks5Proxy, + [this, name](const char*, size_t, double tcp_ping) { update_host_ping(name, tcp_ping); }, + [](const char*, size_t, double) {}); + } + } + m_startupFinished = true; } } @@ -787,7 +826,7 @@ void p2pool::download_block_headers(uint64_t current_height) download_block_headers(current_height); } }, - [this, start_height, current_height](const char* data, size_t size) + [this, start_height, current_height](const char* data, size_t size, double) { if (size > 0) { LOGERR(1, "Couldn't download block headers for heights " << start_height << " - " << current_height - 1 << ", error " << log::const_buf(data, size)); @@ -869,11 +908,11 @@ void p2pool::get_info() const Params::Host host = current_host(); JSONRPCRequest::call(host.m_address, host.m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_info\"}", host.m_rpcLogin, m_params->m_socks5Proxy, - [this](const char* data, size_t size) + [this](const char* data, size_t size, double) { parse_get_info_rpc(data, size); }, - [this, host](const char* data, size_t size) + [this, host](const char* data, size_t size, double) { if (size > 0) { LOGWARN(1, "get_info RPC request to " << host.m_displayName << " failed: error " << log::const_buf(data, size) << ", trying again in 1 second"); @@ -984,21 +1023,12 @@ void p2pool::get_version() { const Params::Host host = current_host(); - const uint64_t t1 = microseconds_since_epoch(); - JSONRPCRequest::call(host.m_address, host.m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_version\"}", host.m_rpcLogin, m_params->m_socks5Proxy, - [this, t1, host](const char* data, size_t size) + [this, host](const char* data, size_t size, double) { - const double node_ping = static_cast(microseconds_since_epoch() - t1) / 1e3; - if (node_ping < 100) { - LOGINFO(1, host.m_displayName << " ping time is " << node_ping << " ms"); - } - else { - LOGWARN(1, host.m_displayName << " ping time is " << node_ping << " ms, this is too high for an efficient mining. Try to use a different node, or your own local node."); - } parse_get_version_rpc(data, size); }, - [this](const char* data, size_t size) + [this](const char* data, size_t size, double) { if (size > 0) { LOGWARN(1, "get_version RPC request failed: error " << log::const_buf(data, size) << ", trying again in 1 second"); @@ -1071,11 +1101,12 @@ void p2pool::get_miner_data(bool retry) const Params::Host host = current_host(); JSONRPCRequest::call(host.m_address, host.m_rpcPort, "{\"jsonrpc\":\"2.0\",\"id\":\"0\",\"method\":\"get_miner_data\"}", host.m_rpcLogin, m_params->m_socks5Proxy, - [this](const char* data, size_t size) + [this, host](const char* data, size_t size, double tcp_ping) { parse_get_miner_data_rpc(data, size); + update_host_ping(host.m_displayName, tcp_ping); }, - [this, host, retry](const char* data, size_t size) + [this, host, retry](const char* data, size_t size, double) { if (size > 0) { LOGWARN(1, "get_miner_data RPC request to " << host.m_displayName << " failed: error " << log::const_buf(data, size) << (retry ? ", trying again in 1 second" : "")); diff --git a/src/p2pool.h b/src/p2pool.h index cac8848..5e7b3de 100644 --- a/src/p2pool.h +++ b/src/p2pool.h @@ -56,6 +56,7 @@ public: return m_currentHost; } + void update_host_ping(const std::string& display_name, double ping); void print_hosts() const; FORCEINLINE MinerData miner_data() const @@ -125,6 +126,7 @@ private: std::atomic m_stopped; const Params* m_params; + std::vector m_hostPing; mutable uv_rwlock_t m_currentHostLock; Params::Host m_currentHost; diff --git a/src/pow_hash.cpp b/src/pow_hash.cpp index 2221c16..a9c22ea 100644 --- a/src/pow_hash.cpp +++ b/src/pow_hash.cpp @@ -450,7 +450,7 @@ bool RandomX_Hasher_RPC::calculate(const void* data_ptr, size_t size, uint64_t h const Params::Host host = m_pool->current_host(); JSONRPCRequest::call(host.m_address, host.m_rpcPort, buf, host.m_rpcLogin, params.m_socks5Proxy, - [&result, &h](const char* data, size_t size) + [&result, &h](const char* data, size_t size, double) { rapidjson::Document doc; if (doc.Parse(data, size).HasParseError() || !parseValue(doc, "result", h)) { @@ -460,7 +460,7 @@ bool RandomX_Hasher_RPC::calculate(const void* data_ptr, size_t size, uint64_t h } result = 1; }, - [this, &result, &done](const char* data, size_t size) + [this, &result, &done](const char* data, size_t size, double) { if (size > 0) { LOGWARN(3, "RPC calc_pow: server returned error " << log::const_buf(data, size));