From f2301a8769e8d6b6f4722af5f680e8e6f4f976d0 Mon Sep 17 00:00:00 2001 From: Stefan Dietel Date: Fri, 25 Aug 2023 14:18:32 +0200 Subject: [PATCH 1/7] split check_event made monitor_socket protected --- zmq.hpp | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/zmq.hpp b/zmq.hpp index d67498f..049daaf 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -2362,8 +2362,6 @@ class monitor_t { assert(_monitor_socket); - zmq::message_t eventMsg; - zmq::pollitem_t items[] = { {_monitor_socket.handle(), 0, ZMQ_POLLIN, 0}, }; @@ -2374,7 +2372,14 @@ class monitor_t zmq::poll(&items[0], 1, timeout); #endif - if (items[0].revents & ZMQ_POLLIN) { + return process_event(items[0].revents); + } + + bool process_event(short events) + { + zmq::message_t eventMsg; + + if (events & ZMQ_POLLIN) { int rc = zmq_msg_recv(eventMsg.handle(), _monitor_socket.handle(), 0); if (rc == -1 && zmq_errno() == ETERM) return false; From 224bfca5b7cdeae99c38fdf3965d4aedc8d5c8bc Mon Sep 17 00:00:00 2001 From: Stefan Dietel Date: Fri, 25 Aug 2023 14:18:58 +0200 Subject: [PATCH 2/7] moved process_event into protected area --- zmq.hpp | 216 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 109 insertions(+), 107 deletions(-) diff --git a/zmq.hpp b/zmq.hpp index 049daaf..93314de 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -2374,112 +2374,6 @@ class monitor_t return process_event(items[0].revents); } - - bool process_event(short events) - { - zmq::message_t eventMsg; - - if (events & ZMQ_POLLIN) { - int rc = zmq_msg_recv(eventMsg.handle(), _monitor_socket.handle(), 0); - if (rc == -1 && zmq_errno() == ETERM) - return false; - assert(rc != -1); - - } else { - return false; - } - -#if ZMQ_VERSION_MAJOR >= 4 - const char *data = static_cast(eventMsg.data()); - zmq_event_t msgEvent; - memcpy(&msgEvent.event, data, sizeof(uint16_t)); - data += sizeof(uint16_t); - memcpy(&msgEvent.value, data, sizeof(int32_t)); - zmq_event_t *event = &msgEvent; -#else - zmq_event_t *event = static_cast(eventMsg.data()); -#endif - -#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT - zmq::message_t addrMsg; - int rc = zmq_msg_recv(addrMsg.handle(), _monitor_socket.handle(), 0); - if (rc == -1 && zmq_errno() == ETERM) { - return false; - } - - assert(rc != -1); - std::string address = addrMsg.to_string(); -#else - // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types. - std::string address = event->data.connected.addr; -#endif - -#ifdef ZMQ_EVENT_MONITOR_STOPPED - if (event->event == ZMQ_EVENT_MONITOR_STOPPED) { - return false; - } - -#endif - - switch (event->event) { - case ZMQ_EVENT_CONNECTED: - on_event_connected(*event, address.c_str()); - break; - case ZMQ_EVENT_CONNECT_DELAYED: - on_event_connect_delayed(*event, address.c_str()); - break; - case ZMQ_EVENT_CONNECT_RETRIED: - on_event_connect_retried(*event, address.c_str()); - break; - case ZMQ_EVENT_LISTENING: - on_event_listening(*event, address.c_str()); - break; - case ZMQ_EVENT_BIND_FAILED: - on_event_bind_failed(*event, address.c_str()); - break; - case ZMQ_EVENT_ACCEPTED: - on_event_accepted(*event, address.c_str()); - break; - case ZMQ_EVENT_ACCEPT_FAILED: - on_event_accept_failed(*event, address.c_str()); - break; - case ZMQ_EVENT_CLOSED: - on_event_closed(*event, address.c_str()); - break; - case ZMQ_EVENT_CLOSE_FAILED: - on_event_close_failed(*event, address.c_str()); - break; - case ZMQ_EVENT_DISCONNECTED: - on_event_disconnected(*event, address.c_str()); - break; -#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) || (defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3)) - case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL: - on_event_handshake_failed_no_detail(*event, address.c_str()); - break; - case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL: - on_event_handshake_failed_protocol(*event, address.c_str()); - break; - case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH: - on_event_handshake_failed_auth(*event, address.c_str()); - break; - case ZMQ_EVENT_HANDSHAKE_SUCCEEDED: - on_event_handshake_succeeded(*event, address.c_str()); - break; -#elif defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1) - case ZMQ_EVENT_HANDSHAKE_FAILED: - on_event_handshake_failed(*event, address.c_str()); - break; - case ZMQ_EVENT_HANDSHAKE_SUCCEED: - on_event_handshake_succeed(*event, address.c_str()); - break; -#endif - default: - on_event_unknown(*event, address.c_str()); - break; - } - - return true; - } #ifdef ZMQ_EVENT_MONITOR_STOPPED void abort() @@ -2588,12 +2482,120 @@ class monitor_t (void) addr_; } + protected: + bool process_event(short events) + { + zmq::message_t eventMsg; + + if (events & ZMQ_POLLIN) { + int rc = zmq_msg_recv(eventMsg.handle(), _monitor_socket.handle(), 0); + if (rc == -1 && zmq_errno() == ETERM) + return false; + assert(rc != -1); + + } else { + return false; + } + +#if ZMQ_VERSION_MAJOR >= 4 + const char *data = static_cast(eventMsg.data()); + zmq_event_t msgEvent; + memcpy(&msgEvent.event, data, sizeof(uint16_t)); + data += sizeof(uint16_t); + memcpy(&msgEvent.value, data, sizeof(int32_t)); + zmq_event_t *event = &msgEvent; +#else + zmq_event_t *event = static_cast(eventMsg.data()); +#endif + +#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT + zmq::message_t addrMsg; + int rc = zmq_msg_recv(addrMsg.handle(), _monitor_socket.handle(), 0); + if (rc == -1 && zmq_errno() == ETERM) { + return false; + } + + assert(rc != -1); + std::string address = addrMsg.to_string(); +#else + // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types. + std::string address = event->data.connected.addr; +#endif + +#ifdef ZMQ_EVENT_MONITOR_STOPPED + if (event->event == ZMQ_EVENT_MONITOR_STOPPED) { + return false; + } + +#endif + + switch (event->event) { + case ZMQ_EVENT_CONNECTED: + on_event_connected(*event, address.c_str()); + break; + case ZMQ_EVENT_CONNECT_DELAYED: + on_event_connect_delayed(*event, address.c_str()); + break; + case ZMQ_EVENT_CONNECT_RETRIED: + on_event_connect_retried(*event, address.c_str()); + break; + case ZMQ_EVENT_LISTENING: + on_event_listening(*event, address.c_str()); + break; + case ZMQ_EVENT_BIND_FAILED: + on_event_bind_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_ACCEPTED: + on_event_accepted(*event, address.c_str()); + break; + case ZMQ_EVENT_ACCEPT_FAILED: + on_event_accept_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_CLOSED: + on_event_closed(*event, address.c_str()); + break; + case ZMQ_EVENT_CLOSE_FAILED: + on_event_close_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_DISCONNECTED: + on_event_disconnected(*event, address.c_str()); + break; +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) || (defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3)) + case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL: + on_event_handshake_failed_no_detail(*event, address.c_str()); + break; + case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL: + on_event_handshake_failed_protocol(*event, address.c_str()); + break; + case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH: + on_event_handshake_failed_auth(*event, address.c_str()); + break; + case ZMQ_EVENT_HANDSHAKE_SUCCEEDED: + on_event_handshake_succeeded(*event, address.c_str()); + break; +#elif defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1) + case ZMQ_EVENT_HANDSHAKE_FAILED: + on_event_handshake_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_HANDSHAKE_SUCCEED: + on_event_handshake_succeed(*event, address.c_str()); + break; +#endif + default: + on_event_unknown(*event, address.c_str()); + break; + } + + return true; + } + + socket_t _monitor_socket; + private: monitor_t(const monitor_t &) ZMQ_DELETED_FUNCTION; void operator=(const monitor_t &) ZMQ_DELETED_FUNCTION; socket_ref _socket; - socket_t _monitor_socket; void close() ZMQ_NOTHROW { From 2dcab864258c614f08eeb1a7056bf2ffd2ddcd37 Mon Sep 17 00:00:00 2001 From: Stefan Dietel Date: Fri, 25 Aug 2023 16:19:55 +0200 Subject: [PATCH 3/7] add test for using active poller on monitor --- tests/monitor.cpp | 93 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 90 insertions(+), 3 deletions(-) diff --git a/tests/monitor.cpp b/tests/monitor.cpp index 09d5381..8637cc9 100644 --- a/tests/monitor.cpp +++ b/tests/monitor.cpp @@ -8,7 +8,7 @@ class mock_monitor_t : public zmq::monitor_t { -public: + public: void on_event_connected(const zmq_event_t &, const char *) ZMQ_OVERRIDE { @@ -89,7 +89,7 @@ TEST_CASE("monitor init abort", "[monitor]") { class mock_monitor : public mock_monitor_t { - public: + public: mock_monitor(std::function handle_connected) : handle_connected{std::move(handle_connected)} { @@ -128,7 +128,7 @@ TEST_CASE("monitor init abort", "[monitor]") { std::unique_lock lock(mutex); CHECK(cond_var.wait_for(lock, std::chrono::seconds(1), - [&done] { return done; })); + [&done] { return done; })); } CHECK(monitor.connected == 1); monitor.abort(); @@ -150,3 +150,90 @@ TEST_CASE("monitor from move assigned socket", "[monitor]") // failing } #endif + +#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) \ + && !defined(ZMQ_CPP11_PARTIAL) && defined(ZMQ_HAVE_POLLER) +#include "zmq_addon.hpp" +using namespace std::literals::chrono_literals; + +TEST_CASE("poll monitor events using active poller", "[monitor]") +{ + // define necessary class for test + class test_monitor : public zmq::monitor_t + { + public: + void init(zmq::socket_t &socket, + const char *const addr_, + int events = ZMQ_EVENT_ALL) + { + zmq::monitor_t::init(socket, addr_, events); + } + + void addToPoller(zmq::active_poller_t &inActivePoller) + { + inActivePoller.add( + _monitor_socket, zmq::event_flags::pollin, + [&](zmq::event_flags ef) { process_event(static_cast(ef)); }); + } + + void on_event_accepted(const zmq_event_t &event_, + const char *addr_) override + { + clientAccepted++; + } + void on_event_disconnected(const zmq_event_t &event, + const char *const addr) override + { + clientDisconnected++; + } + + int clientAccepted = 0; + int clientDisconnected = 0; + }; + + //Arrange + int messageCounter = 0; + const char monitorAddress[] = "inproc://monitor-server"; + + auto addToPoller = [&](zmq::socket_t &socket, zmq::active_poller_t &poller) { + poller.add(socket, zmq::event_flags::pollin, [&](zmq::event_flags ef) { + zmq::message_t msg; + auto result = socket.recv(msg, zmq::recv_flags::dontwait); + messageCounter++; + }); + }; + + common_server_client_setup sockets(false); + + test_monitor monitor; + monitor.init(sockets.server, monitorAddress); + + zmq::active_poller_t poller; + monitor.addToPoller(poller); + addToPoller(sockets.server, poller); + + sockets.init(); + sockets.client.send(zmq::message_t(0), zmq::send_flags::dontwait); + CHECK(monitor.clientAccepted == 0); + CHECK(monitor.clientDisconnected == 0); + + //Act + for (int i = 0; i < 10; i++) { + poller.wait(10ms); + } + CHECK(monitor.clientAccepted == 1); + CHECK(monitor.clientDisconnected == 0); + + sockets.client.close(); + + for (int i = 0; i < 10; i++) { + poller.wait(10ms); + } + sockets.server.close(); + + // Assert + CHECK(messageCounter == 1); + CHECK(monitor.clientAccepted == 1); + CHECK(monitor.clientDisconnected == 1); +} +#endif From d8ab66a7af2fad2d2e0fb1b34704f7fbe3d8a658 Mon Sep 17 00:00:00 2001 From: Stefan Dietel Date: Fri, 25 Aug 2023 16:26:41 +0200 Subject: [PATCH 4/7] keep _monitor_socket as private member --- tests/monitor.cpp | 2 +- zmq.hpp | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/monitor.cpp b/tests/monitor.cpp index 8637cc9..aefcf91 100644 --- a/tests/monitor.cpp +++ b/tests/monitor.cpp @@ -172,7 +172,7 @@ TEST_CASE("poll monitor events using active poller", "[monitor]") void addToPoller(zmq::active_poller_t &inActivePoller) { inActivePoller.add( - _monitor_socket, zmq::event_flags::pollin, + monitor_socket(), zmq::event_flags::pollin, [&](zmq::event_flags ef) { process_event(static_cast(ef)); }); } diff --git a/zmq.hpp b/zmq.hpp index 93314de..987d7ef 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -2589,13 +2589,14 @@ class monitor_t return true; } - socket_t _monitor_socket; + socket_ref monitor_socket() {return _monitor_socket;} private: monitor_t(const monitor_t &) ZMQ_DELETED_FUNCTION; void operator=(const monitor_t &) ZMQ_DELETED_FUNCTION; socket_ref _socket; + socket_t _monitor_socket; void close() ZMQ_NOTHROW { From be5206f3c7f02a62f2712e7b5202642c3a43defc Mon Sep 17 00:00:00 2001 From: Stefan Dietel Date: Mon, 28 Aug 2023 08:38:17 +0200 Subject: [PATCH 5/7] explicitly include chrono for monitor test --- tests/monitor.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/monitor.cpp b/tests/monitor.cpp index aefcf91..3ed2143 100644 --- a/tests/monitor.cpp +++ b/tests/monitor.cpp @@ -154,6 +154,8 @@ TEST_CASE("monitor from move assigned socket", "[monitor]") #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) \ && !defined(ZMQ_CPP11_PARTIAL) && defined(ZMQ_HAVE_POLLER) #include "zmq_addon.hpp" +#include + using namespace std::literals::chrono_literals; TEST_CASE("poll monitor events using active poller", "[monitor]") From f2e9762bc9d5a3b8bf518da8477ff1258cd3da35 Mon Sep 17 00:00:00 2001 From: Stefan Dietel Date: Mon, 28 Aug 2023 09:40:41 +0200 Subject: [PATCH 6/7] replaced chrono literals because they are not available prior to cpp14 --- tests/monitor.cpp | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/monitor.cpp b/tests/monitor.cpp index 3ed2143..abd0cd6 100644 --- a/tests/monitor.cpp +++ b/tests/monitor.cpp @@ -154,9 +154,6 @@ TEST_CASE("monitor from move assigned socket", "[monitor]") #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) \ && !defined(ZMQ_CPP11_PARTIAL) && defined(ZMQ_HAVE_POLLER) #include "zmq_addon.hpp" -#include - -using namespace std::literals::chrono_literals; TEST_CASE("poll monitor events using active poller", "[monitor]") { @@ -221,7 +218,7 @@ TEST_CASE("poll monitor events using active poller", "[monitor]") //Act for (int i = 0; i < 10; i++) { - poller.wait(10ms); + poller.wait(std::chrono::milliseconds(10)); } CHECK(monitor.clientAccepted == 1); CHECK(monitor.clientDisconnected == 0); @@ -229,7 +226,7 @@ TEST_CASE("poll monitor events using active poller", "[monitor]") sockets.client.close(); for (int i = 0; i < 10; i++) { - poller.wait(10ms); + poller.wait(std::chrono::milliseconds(10)); } sockets.server.close(); From 72730168d65a496c7e1686579f537b29c9ae7dfe Mon Sep 17 00:00:00 2001 From: Stefan Dietel Date: Thu, 7 Sep 2023 08:40:13 +0200 Subject: [PATCH 7/7] made test more robust by increasing tries for polling and breaking after event occurred --- tests/monitor.cpp | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/tests/monitor.cpp b/tests/monitor.cpp index abd0cd6..5eb0dbd 100644 --- a/tests/monitor.cpp +++ b/tests/monitor.cpp @@ -175,8 +175,7 @@ TEST_CASE("poll monitor events using active poller", "[monitor]") [&](zmq::event_flags ef) { process_event(static_cast(ef)); }); } - void on_event_accepted(const zmq_event_t &event_, - const char *addr_) override + void on_event_accepted(const zmq_event_t &event_, const char *addr_) override { clientAccepted++; } @@ -205,7 +204,8 @@ TEST_CASE("poll monitor events using active poller", "[monitor]") common_server_client_setup sockets(false); test_monitor monitor; - monitor.init(sockets.server, monitorAddress); + monitor.init(sockets.server, monitorAddress, + ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_DISCONNECTED); zmq::active_poller_t poller; monitor.addToPoller(poller); @@ -217,16 +217,22 @@ TEST_CASE("poll monitor events using active poller", "[monitor]") CHECK(monitor.clientDisconnected == 0); //Act - for (int i = 0; i < 10; i++) { - poller.wait(std::chrono::milliseconds(10)); + for (int i = 0; i < 100; i++) { + poller.wait(std::chrono::milliseconds(50)); + if (monitor.clientAccepted > 0) { + break; + } } CHECK(monitor.clientAccepted == 1); CHECK(monitor.clientDisconnected == 0); sockets.client.close(); - for (int i = 0; i < 10; i++) { - poller.wait(std::chrono::milliseconds(10)); + for (int i = 0; i < 100; i++) { + poller.wait(std::chrono::milliseconds(50)); + if (monitor.clientDisconnected > 0) { + break; + } } sockets.server.close();