From 4fce76576fbdd1d391a9f5336d97beeac6bef550 Mon Sep 17 00:00:00 2001 From: SChernykh Date: Mon, 27 Sep 2021 11:32:49 +0200 Subject: [PATCH] ZMQ reader: abort if connect to monerod failed --- src/zmq_reader.cpp | 40 ++++++++++++++++++++++++++++++++++++++-- src/zmq_reader.h | 1 + 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/src/zmq_reader.cpp b/src/zmq_reader.cpp index 0a52025..906e5cf 100644 --- a/src/zmq_reader.cpp +++ b/src/zmq_reader.cpp @@ -82,10 +82,14 @@ void ZMQReader::run() char addr[32]; snprintf(addr, sizeof(addr), "tcp://%s:%u", m_address, m_zmqPort); - m_subscriber.connect(addr); + if (!connect(addr, m_zmqPort)) { + throw zmq::error_t(); + } snprintf(addr, sizeof(addr), "tcp://127.0.0.1:%u", m_publisherPort); - m_subscriber.connect(addr); + if (!connect(addr, m_publisherPort)) { + throw zmq::error_t(); + } m_subscriber.set(zmq::sockopt::subscribe, "json-full-chain_main"); m_subscriber.set(zmq::sockopt::subscribe, "json-full-miner_data"); @@ -121,6 +125,38 @@ void ZMQReader::run() LOGINFO(1, "worker thread stopped"); } +bool ZMQReader::connect(const char* address, uint32_t id) +{ + struct ConnectMonitor : public zmq::monitor_t + { + void on_event_connected(const zmq_event_t&, const char* address) ZMQ_OVERRIDE + { + LOGINFO(1, "connected to " << address); + connected = true; + } + + bool connected = false; + } monitor; + + char buf[32]; + snprintf(buf, sizeof(buf), "inproc://connect-mon-%u", id); + monitor.init(m_subscriber, buf); + m_subscriber.connect(address); + + using namespace std::chrono; + const system_clock::time_point start_time = system_clock::now(); + + while (!monitor.connected && monitor.check_event(-1)) { + const int64_t elapsed_time = duration_cast(system_clock::now() - start_time).count(); + if (elapsed_time >= 3000) { + LOGERR(1, "failed to connect to " << address); + return false; + } + } + + return true; +} + void ZMQReader::parse(char* data, size_t size) { char* value = data; diff --git a/src/zmq_reader.h b/src/zmq_reader.h index e690e18..6a56bda 100644 --- a/src/zmq_reader.h +++ b/src/zmq_reader.h @@ -30,6 +30,7 @@ public: private: static void run_wrapper(void* arg) { reinterpret_cast(arg)->run(); } void run(); + bool connect(const char* address, uint32_t id); void parse(char* data, size_t size);