From 898aa5dbee22920847dafb0849907d55c8a84816 Mon Sep 17 00:00:00 2001 From: Hua Liu <58683130+liuh-80@users.noreply.github.com> Date: Mon, 30 Sep 2024 08:59:37 +0800 Subject: [PATCH] Add VRF support to ZMQ server/client (#920) --- common/zmqclient.cpp | 15 +++++++++++++-- common/zmqclient.h | 6 ++++-- common/zmqserver.cpp | 13 ++++++++++++- common/zmqserver.h | 3 +++ 4 files changed, 32 insertions(+), 5 deletions(-) diff --git a/common/zmqclient.cpp b/common/zmqclient.cpp index e6cb07da9..0225d4374 100644 --- a/common/zmqclient.cpp +++ b/common/zmqclient.cpp @@ -16,8 +16,13 @@ using namespace std; namespace swss { ZmqClient::ZmqClient(const std::string& endpoint) +:ZmqClient(endpoint, "") { - initialize(endpoint); +} + +ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf) +{ + initialize(endpoint, vrf); } ZmqClient::~ZmqClient() @@ -39,12 +44,13 @@ ZmqClient::~ZmqClient() } } -void ZmqClient::initialize(const std::string& endpoint) +void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf) { m_connected = false; m_endpoint = endpoint; m_context = nullptr; m_socket = nullptr; + m_vrf = vrf; connect(); } @@ -89,6 +95,11 @@ void ZmqClient::connect() int high_watermark = MQ_WATERMARK; zmq_setsockopt(m_socket, ZMQ_SNDHWM, &high_watermark, sizeof(high_watermark)); + if (!m_vrf.empty()) + { + zmq_setsockopt(m_socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length()); + } + SWSS_LOG_NOTICE("connect to zmq endpoint: %s", m_endpoint.c_str()); int rc = zmq_connect(m_socket, m_endpoint.c_str()); if (rc != 0) diff --git a/common/zmqclient.h b/common/zmqclient.h index 3f56cc299..313e65735 100644 --- a/common/zmqclient.h +++ b/common/zmqclient.h @@ -13,6 +13,7 @@ class ZmqClient { public: ZmqClient(const std::string& endpoint); + ZmqClient(const std::string& endpoint, const std::string& vrf); ~ZmqClient(); bool isConnected(); @@ -24,11 +25,12 @@ class ZmqClient const std::vector& kcos, std::vector& sendbuffer); private: - void initialize(const std::string& endpoint); - + void initialize(const std::string& endpoint, const std::string& vrf); std::string m_endpoint; + std::string m_vrf; + void* m_context; void* m_socket; diff --git a/common/zmqserver.cpp b/common/zmqserver.cpp index d553c2a26..4800b9ba2 100644 --- a/common/zmqserver.cpp +++ b/common/zmqserver.cpp @@ -12,7 +12,13 @@ using namespace std; namespace swss { ZmqServer::ZmqServer(const std::string& endpoint) - : m_endpoint(endpoint) + : ZmqServer(endpoint, "") +{ +} + +ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf) + : m_endpoint(endpoint), + m_vrf(vrf) { m_buffer.resize(MQ_RESPONSE_MAX_COUNT); m_runThread = true; @@ -92,6 +98,11 @@ void ZmqServer::mqPollThread() int high_watermark = MQ_WATERMARK; zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark)); + if (!m_vrf.empty()) + { + zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length()); + } + int rc = zmq_bind(socket, m_endpoint.c_str()); if (rc != 0) { diff --git a/common/zmqserver.h b/common/zmqserver.h index 002e78b1a..8afe18d7c 100644 --- a/common/zmqserver.h +++ b/common/zmqserver.h @@ -31,6 +31,7 @@ class ZmqServer static constexpr int DEFAULT_POP_BATCH_SIZE = 128; ZmqServer(const std::string& endpoint); + ZmqServer(const std::string& endpoint, const std::string& vrf); ~ZmqServer(); void registerMessageHandler( @@ -53,6 +54,8 @@ class ZmqServer std::string m_endpoint; + std::string m_vrf; + std::map> m_HandlerMap; };