From 9f7d78919bf6fda3b19b3dfd1f547fe50f1cb343 Mon Sep 17 00:00:00 2001 From: Hua Liu <58683130+liuh-80@users.noreply.github.com> Date: Mon, 30 Sep 2024 08:59:37 +0800 Subject: [PATCH] Add VRF support to ZMQ server/client (#920) --- common/zmqclient.cpp | 15 ++++++- common/zmqclient.h | 6 ++- common/zmqserver.cpp | 13 +++++- common/zmqserver.h | 3 ++ tests/zmq_state_ut.cpp | 98 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 130 insertions(+), 5 deletions(-) diff --git a/common/zmqclient.cpp b/common/zmqclient.cpp index e6cb07da9..0225d4374 100644 --- a/common/zmqclient.cpp +++ b/common/zmqclient.cpp @@ -16,8 +16,13 @@ using namespace std; namespace swss { ZmqClient::ZmqClient(const std::string& endpoint) +:ZmqClient(endpoint, "") { - initialize(endpoint); +} + +ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf) +{ + initialize(endpoint, vrf); } ZmqClient::~ZmqClient() @@ -39,12 +44,13 @@ ZmqClient::~ZmqClient() } } -void ZmqClient::initialize(const std::string& endpoint) +void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf) { m_connected = false; m_endpoint = endpoint; m_context = nullptr; m_socket = nullptr; + m_vrf = vrf; connect(); } @@ -89,6 +95,11 @@ void ZmqClient::connect() int high_watermark = MQ_WATERMARK; zmq_setsockopt(m_socket, ZMQ_SNDHWM, &high_watermark, sizeof(high_watermark)); + if (!m_vrf.empty()) + { + zmq_setsockopt(m_socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length()); + } + SWSS_LOG_NOTICE("connect to zmq endpoint: %s", m_endpoint.c_str()); int rc = zmq_connect(m_socket, m_endpoint.c_str()); if (rc != 0) diff --git a/common/zmqclient.h b/common/zmqclient.h index 3f56cc299..313e65735 100644 --- a/common/zmqclient.h +++ b/common/zmqclient.h @@ -13,6 +13,7 @@ class ZmqClient { public: ZmqClient(const std::string& endpoint); + ZmqClient(const std::string& endpoint, const std::string& vrf); ~ZmqClient(); bool isConnected(); @@ -24,11 +25,12 @@ class ZmqClient const std::vector& kcos, std::vector& sendbuffer); private: - void initialize(const std::string& endpoint); - + void initialize(const std::string& endpoint, const std::string& vrf); std::string m_endpoint; + std::string m_vrf; + void* m_context; void* m_socket; diff --git a/common/zmqserver.cpp b/common/zmqserver.cpp index d553c2a26..4800b9ba2 100644 --- a/common/zmqserver.cpp +++ b/common/zmqserver.cpp @@ -12,7 +12,13 @@ using namespace std; namespace swss { ZmqServer::ZmqServer(const std::string& endpoint) - : m_endpoint(endpoint) + : ZmqServer(endpoint, "") +{ +} + +ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf) + : m_endpoint(endpoint), + m_vrf(vrf) { m_buffer.resize(MQ_RESPONSE_MAX_COUNT); m_runThread = true; @@ -92,6 +98,11 @@ void ZmqServer::mqPollThread() int high_watermark = MQ_WATERMARK; zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark)); + if (!m_vrf.empty()) + { + zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length()); + } + int rc = zmq_bind(socket, m_endpoint.c_str()); if (rc != 0) { diff --git a/common/zmqserver.h b/common/zmqserver.h index 002e78b1a..8afe18d7c 100644 --- a/common/zmqserver.h +++ b/common/zmqserver.h @@ -31,6 +31,7 @@ class ZmqServer static constexpr int DEFAULT_POP_BATCH_SIZE = 128; ZmqServer(const std::string& endpoint); + ZmqServer(const std::string& endpoint, const std::string& vrf); ~ZmqServer(); void registerMessageHandler( @@ -53,6 +54,8 @@ class ZmqServer std::string m_endpoint; + std::string m_vrf; + std::map> m_HandlerMap; }; diff --git a/tests/zmq_state_ut.cpp b/tests/zmq_state_ut.cpp index 4818b7fd8..a7eeb3efe 100644 --- a/tests/zmq_state_ut.cpp +++ b/tests/zmq_state_ut.cpp @@ -257,6 +257,9 @@ static void consumerWorker(string tableName, string endpoint, bool dbPersistence } } + // Wait for some time to write into the DB. + sleep(3); + allDataReceived = true; if (dbPersistence) @@ -288,6 +291,9 @@ static void testMethod(bool producerPersistence) // start consumer first, SHM can only have 1 consumer per table. thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence); + // Wait for the consumer to start. + sleep(1); + cout << "Starting " << NUMBER_OF_THREADS << " producers" << endl; /* Starting the producer before the producer */ for (int i = 0; i < NUMBER_OF_THREADS; i++) @@ -351,6 +357,9 @@ static void testBatchMethod(bool producerPersistence) // start consumer first, SHM can only have 1 consumer per table. thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence); + // Wait for the consumer to start. + sleep(1); + cout << "Starting " << NUMBER_OF_THREADS << " producers" << endl; /* Starting the producer before the producer */ for (int i = 0; i < NUMBER_OF_THREADS; i++) @@ -438,3 +447,92 @@ TEST(ZmqConsumerStateTableBatchBufferOverflow, test) } EXPECT_ANY_THROW(p.send(kcos)); } + +static bool zmq_done = false; +static void zmqConsumerWorker(string tableName, string endpoint) +{ + cout << "Consumer thread started: " << tableName << endl; + DBConnector db(TEST_DB, 0, true); + ZmqServer server(endpoint, true); + ZmqConsumerStateTable c(&db, tableName, server); + Select cs; + cs.addSelectable(&c); + //validate received data + Selectable *selectcs; + std::deque vkco; + int ret = 0; + while (!zmq_done) + { + ret = cs.select(&selectcs, 10, true); + if (ret == Select::OBJECT) + { + c.pops(vkco); + std::vector values; + values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); + server.sendMsg(TEST_DB, tableName, values); + } + } + allDataReceived = true; + cout << "Consumer thread ended: " << tableName << endl; +} +TEST(ZmqOneToOneSync, test) +{ + std::string testTableName = "ZMQ_PROD_CONS_UT"; + std::string pushEndpoint = "tcp://localhost:1234"; + std::string pullEndpoint = "tcp://*:1234"; + // start consumer first, SHM can only have 1 consumer per table. + thread *consumerThread = new thread(zmqConsumerWorker, testTableName, pullEndpoint); + // Wait for the consumer to be ready. + sleep(1); + DBConnector db(TEST_DB, 0, true); + ZmqClient client(pushEndpoint, 3000); + ZmqProducerStateTable p(&db, testTableName, client); + std::vector kcos; + kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); + std::vector> kcos_p; + std::string dbName, tableName; + for (int i =0; i < 3; ++i) + { + p.send(kcos); + ASSERT_TRUE(p.wait(dbName, tableName, kcos_p)); + EXPECT_EQ(dbName, TEST_DB); + EXPECT_EQ(tableName, testTableName); + ASSERT_EQ(kcos_p.size(), 1); + EXPECT_EQ(kfvKey(*kcos_p[0]), "k"); + EXPECT_EQ(kfvOp(*kcos_p[0]), SET_COMMAND); + std::vector cos = std::vector{FieldValueTuple{"f", "v"}}; + EXPECT_EQ(kfvFieldsValues(*kcos_p[0]), cos); + } + zmq_done = true; + consumerThread->join(); + delete consumerThread; +} +TEST(ZmqOneToOneSyncClientError, test) +{ + std::string testTableName = "ZMQ_PROD_CONS_UT"; + std::string pushEndpoint = "tcp://localhost:1234"; + DBConnector db(TEST_DB, 0, true); + ZmqClient client(pushEndpoint, 3000); + ZmqProducerStateTable p(&db, testTableName, client); + std::vector kcos; + kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{}}); + std::vector> kcos_p; + std::string dbName, tableName; + p.send(kcos); + // Wait will timeout without server reply. + EXPECT_FALSE(p.wait(dbName, tableName, kcos_p)); + // Send will return error without server reply. + EXPECT_THROW(p.send(kcos), std::system_error); +} +TEST(ZmqOneToOneSyncServerError, test) +{ + std::string testTableName = "ZMQ_PROD_CONS_UT"; + std::string pullEndpoint = "tcp://*:1234"; + DBConnector db(TEST_DB, 0, true); + ZmqServer server(pullEndpoint, true); + ZmqConsumerStateTable c(&db, testTableName, server); + std::vector values; + values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector{FieldValueTuple{"f", "v"}}}); + // Send will return error without client request. + EXPECT_THROW(server.sendMsg(TEST_DB, testTableName, values), std::system_error); +}