Added Tari gRPC client stub

merge-mining
SChernykh 2024-02-06 17:20:58 +01:00
parent 9b15e8179f
commit fdb2ab19ab
11 changed files with 292 additions and 20 deletions

View File

@ -86,6 +86,7 @@ set(HEADERS
src/mempool.h
src/merge_mining_client.h
src/merge_mining_client_json_rpc.h
src/merge_mining_client_tari.h
src/merkle.h
src/p2p_server.h
src/p2pool.h
@ -120,6 +121,7 @@ set(SOURCES
src/mempool.cpp
src/merge_mining_client.cpp
src/merge_mining_client_json_rpc.cpp
src/merge_mining_client_tari.cpp
src/merkle.cpp
src/p2p_server.cpp
src/p2pool.cpp

View File

@ -35,7 +35,7 @@ static constexpr int DEFAULT_BACKLOG = 1;
namespace p2pool {
ConsoleCommands::ConsoleCommands(p2pool* pool)
: TCPServer(DEFAULT_BACKLOG, ConsoleClient::allocate)
: TCPServer(DEFAULT_BACKLOG, ConsoleClient::allocate, std::string())
, m_pool(pool)
, m_tty{}
, m_stdin_pipe{}

View File

@ -18,13 +18,19 @@
#include "common.h"
#include "merge_mining_client.h"
#include "merge_mining_client_json_rpc.h"
#include "merge_mining_client_tari.h"
namespace p2pool {
IMergeMiningClient* IMergeMiningClient::create(p2pool* pool, const std::string& host, const std::string& wallet) noexcept
{
try {
return new MergeMiningClientJSON_RPC(pool, host, wallet);
if (host.find(MergeMiningClientTari::TARI_PREFIX) == 0) {
return new MergeMiningClientTari(pool, host, wallet);
}
else {
return new MergeMiningClientJSON_RPC(pool, host, wallet);
}
}
catch (...) {
}

View File

@ -36,7 +36,7 @@ public:
static IMergeMiningClient* create(p2pool* pool, const std::string& host, const std::string& wallet) noexcept;
virtual ~IMergeMiningClient() {}
virtual bool get_params(ChainParameters& out_params) const = 0;
[[nodiscard]] virtual bool get_params(ChainParameters& out_params) const = 0;
virtual void submit_solution(const std::vector<uint8_t>& blob, const std::vector<hash>& merkle_proof) = 0;
};

View File

@ -0,0 +1,180 @@
/*
* This file is part of the Monero P2Pool <https://github.com/SChernykh/p2pool>
* Copyright (c) 2021-2024 SChernykh <https://github.com/SChernykh>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "common.h"
#include "merge_mining_client.h"
#include "merge_mining_client_tari.h"
#include "p2pool.h"
#include "params.h"
#include "Tari/proto.h"
LOG_CATEGORY(MergeMiningClientTari)
namespace p2pool {
MergeMiningClientTari::MergeMiningClientTari(p2pool* pool, const std::string& host, const std::string& wallet)
: m_server(new gRPC_Server(pool->params().m_socks5Proxy))
, m_host(host)
, m_port(0)
, m_auxWallet(wallet)
, m_pool(pool)
{
if (host.find(TARI_PREFIX) != 0) {
LOGERR(1, "Invalid host " << host << " - \"" << TARI_PREFIX << "\" prefix not found");
throw std::exception();
}
const size_t k = host.find_last_of(':');
if (k != std::string::npos) {
m_host = host.substr(sizeof(TARI_PREFIX) - 1, k - (sizeof(TARI_PREFIX) - 1));
m_port = std::stoul(host.substr(k + 1), nullptr, 10);
}
if (m_host.empty() || (m_port == 0) || (m_port >= 65536)) {
LOGERR(1, "Invalid host " << host);
throw std::exception();
}
uv_rwlock_init_checked(&m_lock);
if (!m_server->start(m_pool->params().m_dns, m_host, m_port)) {
throw std::exception();
}
}
MergeMiningClientTari::~MergeMiningClientTari()
{
m_server->shutdown_tcp();
delete m_server;
LOGINFO(1, "stopped");
}
bool MergeMiningClientTari::get_params(ChainParameters& out_params) const
{
ReadLock lock(m_lock);
if (m_chainParams.aux_id.empty() || m_chainParams.aux_diff.empty()) {
return false;
}
out_params = m_chainParams;
return true;
}
void MergeMiningClientTari::submit_solution(const std::vector<uint8_t>& blob, const std::vector<hash>& merkle_proof)
{
(void)blob;
(void)merkle_proof;
}
MergeMiningClientTari::gRPC_Client::gRPC_Client()
: Client(m_buf, sizeof(m_buf))
{
m_buf[0] = '\0';
}
void MergeMiningClientTari::gRPC_Client::reset()
{
m_data.clear();
}
bool MergeMiningClientTari::gRPC_Client::on_connect()
{
const MergeMiningClientTari::gRPC_Server* server = static_cast<MergeMiningClientTari::gRPC_Server*>(m_owner);
if (server) {
LOGINFO(4, "Connected to " << server->m_host << ':' << server->m_port);
}
return true;
}
bool MergeMiningClientTari::gRPC_Client::on_read(char* data, uint32_t size)
{
const MergeMiningClientTari::gRPC_Server* server = static_cast<MergeMiningClientTari::gRPC_Server*>(m_owner);
if (server) {
LOGINFO(4, "Read " << size << " bytes from " << server->m_host << ':' << server->m_port);
LOGINFO(4, log::hex_buf(data, size));
}
m_data.insert(m_data.end(), data, data + size);
return true;
}
void MergeMiningClientTari::gRPC_Client::on_read_failed(int err)
{
const MergeMiningClientTari::gRPC_Server* server = static_cast<MergeMiningClientTari::gRPC_Server*>(m_owner);
if (server) {
LOGERR(1, "Read from " << server->m_host << ':' << server->m_port << "failed, error " << err);
}
}
void MergeMiningClientTari::gRPC_Client::on_disconnected()
{
const MergeMiningClientTari::gRPC_Server* server = static_cast<MergeMiningClientTari::gRPC_Server*>(m_owner);
if (server) {
LOGINFO(4, "Disconnected from " << server->m_host << ':' << server->m_port);
}
}
MergeMiningClientTari::gRPC_Server::gRPC_Server(const std::string& socks5Proxy)
: TCPServer(1, MergeMiningClientTari::gRPC_Client::allocate, socks5Proxy)
, m_port(0)
{
}
MergeMiningClientTari::gRPC_Server::~gRPC_Server()
{
}
bool MergeMiningClientTari::gRPC_Server::start(bool use_dns, const std::string& host, int port)
{
const int err = uv_thread_create(&m_loopThread, loop, this);
if (err) {
LOGERR(1, "failed to start event loop thread, error " << uv_err_name(err));
return false;
}
m_loopThreadCreated = true;
m_host = host;
m_port = port;
std::string ip = host;
bool is_v6 = host.find_first_of(':') != std::string::npos;
if (!use_dns || resolve_host(ip, is_v6)) {
if (!connect_to_peer(is_v6, ip.c_str(), port)) {
LOGERR(1, "Failed to connect to " << host << ':' << port);
return false;
}
}
return true;
}
void MergeMiningClientTari::gRPC_Server::on_shutdown()
{
}
const char* MergeMiningClientTari::gRPC_Server::get_log_category() const
{
return log_category_prefix;
}
} // namespace p2pool

View File

@ -0,0 +1,82 @@
/*
* This file is part of the Monero P2Pool <https://github.com/SChernykh/p2pool>
* Copyright (c) 2021-2024 SChernykh <https://github.com/SChernykh>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "tcp_server.h"
namespace p2pool {
class p2pool;
class MergeMiningClientTari : public IMergeMiningClient, public nocopy_nomove
{
public:
MergeMiningClientTari(p2pool* pool, const std::string& host, const std::string& wallet);
~MergeMiningClientTari();
bool get_params(ChainParameters& out_params) const override;
void submit_solution(const std::vector<uint8_t>& blob, const std::vector<hash>& merkle_proof) override;
static constexpr char TARI_PREFIX[] = "tari://";
private:
struct gRPC_Server : public TCPServer
{
explicit gRPC_Server(const std::string& socks5Proxy);
~gRPC_Server();
[[nodiscard]] bool start(bool use_dns, const std::string& host, int port);
void on_shutdown() override;
[[nodiscard]] const char* get_log_category() const override;
std::string m_host;
int m_port;
} *m_server;
struct gRPC_Client : public TCPServer::Client
{
gRPC_Client();
~gRPC_Client() {}
static Client* allocate() { return new gRPC_Client(); }
virtual size_t size() const override { return sizeof(gRPC_Client); }
void reset() override;
[[nodiscard]] bool on_connect() override;
[[nodiscard]] bool on_read(char* data, uint32_t size) override;
void on_read_failed(int err) override;
void on_disconnected() override;
char m_buf[1024];
std::vector<char> m_data;
};
std::string m_host;
uint32_t m_port;
mutable uv_rwlock_t m_lock;
ChainParameters m_chainParams;
std::string m_auxWallet;
p2pool* m_pool;
};
} // namespace p2pool

View File

@ -44,7 +44,7 @@ static constexpr uint64_t PEER_REQUEST_DELAY = 60;
namespace p2pool {
P2PServer::P2PServer(p2pool* pool)
: TCPServer(DEFAULT_BACKLOG, P2PClient::allocate)
: TCPServer(DEFAULT_BACKLOG, P2PClient::allocate, pool->params().m_socks5Proxy)
, m_pool(pool)
, m_cache(pool->params().m_blockCache ? new BlockCache() : nullptr)
, m_cacheLoaded(false)
@ -71,19 +71,6 @@ P2PServer::P2PServer(p2pool* pool)
const Params& params = pool->params();
if (!params.m_socks5Proxy.empty()) {
parse_address_list(params.m_socks5Proxy,
[this](bool is_v6, const std::string& /*address*/, const std::string& ip, int port)
{
if (!str_to_ip(is_v6, ip.c_str(), m_socks5ProxyIP)) {
PANIC_STOP();
}
m_socks5ProxyV6 = is_v6;
m_socks5ProxyPort = port;
});
m_socks5Proxy = params.m_socks5Proxy;
}
set_max_outgoing_peers(params.m_maxOutgoingPeers);
set_max_incoming_peers(params.m_maxIncomingPeers);

View File

@ -40,7 +40,7 @@ static constexpr int32_t BAN_THRESHOLD_POINTS = -15;
namespace p2pool {
StratumServer::StratumServer(p2pool* pool)
: TCPServer(DEFAULT_BACKLOG, StratumClient::allocate)
: TCPServer(DEFAULT_BACKLOG, StratumClient::allocate, std::string())
, m_pool(pool)
, m_autoDiff(pool->params().m_autoDiff)
, m_rng(RandomDeviceSeed::instance)

View File

@ -23,7 +23,7 @@ static thread_local const char* log_category_prefix = "TCPServer ";
namespace p2pool {
TCPServer::TCPServer(int default_backlog, allocate_client_callback allocate_new_client)
TCPServer::TCPServer(int default_backlog, allocate_client_callback allocate_new_client, const std::string& socks5Proxy)
: m_allocateNewClient(allocate_new_client)
, m_defaultBacklog(default_backlog)
, m_loopThread{}
@ -31,6 +31,7 @@ TCPServer::TCPServer(int default_backlog, allocate_client_callback allocate_new_
#ifdef WITH_UPNP
, m_portMapping(0)
#endif
, m_socks5Proxy(socks5Proxy)
, m_socks5ProxyV6(false)
, m_socks5ProxyIP{}
, m_socks5ProxyPort(-1)
@ -72,6 +73,18 @@ TCPServer::TCPServer(int default_backlog, allocate_client_callback allocate_new_
m_connectedClientsList = m_allocateNewClient();
m_connectedClientsList->m_next = m_connectedClientsList;
m_connectedClientsList->m_prev = m_connectedClientsList;
if (!m_socks5Proxy.empty()) {
parse_address_list(m_socks5Proxy,
[this](bool is_v6, const std::string& /*address*/, const std::string& ip, int port)
{
if (!str_to_ip(is_v6, ip.c_str(), m_socks5ProxyIP)) {
PANIC_STOP();
}
m_socks5ProxyV6 = is_v6;
m_socks5ProxyPort = port;
});
}
}
TCPServer::~TCPServer()

View File

@ -28,7 +28,7 @@ public:
struct Client;
typedef Client* (*allocate_client_callback)();
TCPServer(int default_backlog, allocate_client_callback allocate_new_client);
TCPServer(int default_backlog, allocate_client_callback allocate_new_client, const std::string& socks5Proxy);
virtual ~TCPServer();
[[nodiscard]] bool connect_to_peer(bool is_v6, const char* ip, int port);

View File

@ -55,6 +55,7 @@ set(SOURCES
../src/mempool.cpp
../src/merge_mining_client.cpp
../src/merge_mining_client_json_rpc.cpp
../src/merge_mining_client_tari.cpp
../src/merkle.cpp
../src/miner.cpp
../src/p2p_server.cpp
@ -96,6 +97,7 @@ include_directories(../external/src/cryptonote)
include_directories(${UV_INCLUDE_DIR})
include_directories(../external/src/cppzmq)
include_directories(${ZMQ_INCLUDE_DIR})
include_directories(../external/src/protobuf-c)
include_directories(../external/src/RandomX/src)
include_directories(../external/src/rapidjson/include)
include_directories(../external/src/robin-hood-hashing/src/include)