Skip to content

Commit

Permalink
Add VRF support to ZMQ server/client (#920)
Browse files Browse the repository at this point in the history
  • Loading branch information
liuh-80 authored and divyagayathri-hcl committed Sep 30, 2024
1 parent 24979b0 commit 9f7d789
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 5 deletions.
15 changes: 13 additions & 2 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ using namespace std;
namespace swss {

ZmqClient::ZmqClient(const std::string& endpoint)
:ZmqClient(endpoint, "")
{
initialize(endpoint);
}

ZmqClient::ZmqClient(const std::string& endpoint, const std::string& vrf)
{
initialize(endpoint, vrf);
}

ZmqClient::~ZmqClient()
Expand All @@ -39,12 +44,13 @@ ZmqClient::~ZmqClient()
}
}

void ZmqClient::initialize(const std::string& endpoint)
void ZmqClient::initialize(const std::string& endpoint, const std::string& vrf)
{
m_connected = false;
m_endpoint = endpoint;
m_context = nullptr;
m_socket = nullptr;
m_vrf = vrf;

connect();
}
Expand Down Expand Up @@ -89,6 +95,11 @@ void ZmqClient::connect()
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(m_socket, ZMQ_SNDHWM, &high_watermark, sizeof(high_watermark));

if (!m_vrf.empty())
{
zmq_setsockopt(m_socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
}

SWSS_LOG_NOTICE("connect to zmq endpoint: %s", m_endpoint.c_str());
int rc = zmq_connect(m_socket, m_endpoint.c_str());
if (rc != 0)
Expand Down
6 changes: 4 additions & 2 deletions common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class ZmqClient
{
public:
ZmqClient(const std::string& endpoint);
ZmqClient(const std::string& endpoint, const std::string& vrf);
~ZmqClient();

bool isConnected();
Expand All @@ -24,11 +25,12 @@ class ZmqClient
const std::vector<KeyOpFieldsValuesTuple>& kcos,
std::vector<char>& sendbuffer);
private:
void initialize(const std::string& endpoint);

void initialize(const std::string& endpoint, const std::string& vrf);

std::string m_endpoint;

std::string m_vrf;

void* m_context;

void* m_socket;
Expand Down
13 changes: 12 additions & 1 deletion common/zmqserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ using namespace std;
namespace swss {

ZmqServer::ZmqServer(const std::string& endpoint)
: m_endpoint(endpoint)
: ZmqServer(endpoint, "")
{
}

ZmqServer::ZmqServer(const std::string& endpoint, const std::string& vrf)
: m_endpoint(endpoint),
m_vrf(vrf)
{
m_buffer.resize(MQ_RESPONSE_MAX_COUNT);
m_runThread = true;
Expand Down Expand Up @@ -92,6 +98,11 @@ void ZmqServer::mqPollThread()
int high_watermark = MQ_WATERMARK;
zmq_setsockopt(socket, ZMQ_RCVHWM, &high_watermark, sizeof(high_watermark));

if (!m_vrf.empty())
{
zmq_setsockopt(socket, ZMQ_BINDTODEVICE, m_vrf.c_str(), m_vrf.length());
}

int rc = zmq_bind(socket, m_endpoint.c_str());
if (rc != 0)
{
Expand Down
3 changes: 3 additions & 0 deletions common/zmqserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class ZmqServer
static constexpr int DEFAULT_POP_BATCH_SIZE = 128;

ZmqServer(const std::string& endpoint);
ZmqServer(const std::string& endpoint, const std::string& vrf);
~ZmqServer();

void registerMessageHandler(
Expand All @@ -53,6 +54,8 @@ class ZmqServer

std::string m_endpoint;

std::string m_vrf;

std::map<std::string, std::map<std::string, ZmqMessageHandler*>> m_HandlerMap;
};

Expand Down
98 changes: 98 additions & 0 deletions tests/zmq_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ static void consumerWorker(string tableName, string endpoint, bool dbPersistence
}
}

// Wait for some time to write into the DB.
sleep(3);

allDataReceived = true;

if (dbPersistence)
Expand Down Expand Up @@ -288,6 +291,9 @@ static void testMethod(bool producerPersistence)
// start consumer first, SHM can only have 1 consumer per table.
thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence);

// Wait for the consumer to start.
sleep(1);

cout << "Starting " << NUMBER_OF_THREADS << " producers" << endl;
/* Starting the producer before the producer */
for (int i = 0; i < NUMBER_OF_THREADS; i++)
Expand Down Expand Up @@ -351,6 +357,9 @@ static void testBatchMethod(bool producerPersistence)
// start consumer first, SHM can only have 1 consumer per table.
thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence);

// Wait for the consumer to start.
sleep(1);

cout << "Starting " << NUMBER_OF_THREADS << " producers" << endl;
/* Starting the producer before the producer */
for (int i = 0; i < NUMBER_OF_THREADS; i++)
Expand Down Expand Up @@ -438,3 +447,92 @@ TEST(ZmqConsumerStateTableBatchBufferOverflow, test)
}
EXPECT_ANY_THROW(p.send(kcos));
}

static bool zmq_done = false;
static void zmqConsumerWorker(string tableName, string endpoint)
{
cout << "Consumer thread started: " << tableName << endl;
DBConnector db(TEST_DB, 0, true);
ZmqServer server(endpoint, true);
ZmqConsumerStateTable c(&db, tableName, server);
Select cs;
cs.addSelectable(&c);
//validate received data
Selectable *selectcs;
std::deque<KeyOpFieldsValuesTuple> vkco;
int ret = 0;
while (!zmq_done)
{
ret = cs.select(&selectcs, 10, true);
if (ret == Select::OBJECT)
{
c.pops(vkco);
std::vector<swss::KeyOpFieldsValuesTuple> values;
values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}}});
server.sendMsg(TEST_DB, tableName, values);
}
}
allDataReceived = true;
cout << "Consumer thread ended: " << tableName << endl;
}
TEST(ZmqOneToOneSync, test)
{
std::string testTableName = "ZMQ_PROD_CONS_UT";
std::string pushEndpoint = "tcp://localhost:1234";
std::string pullEndpoint = "tcp://*:1234";
// start consumer first, SHM can only have 1 consumer per table.
thread *consumerThread = new thread(zmqConsumerWorker, testTableName, pullEndpoint);
// Wait for the consumer to be ready.
sleep(1);
DBConnector db(TEST_DB, 0, true);
ZmqClient client(pushEndpoint, 3000);
ZmqProducerStateTable p(&db, testTableName, client);
std::vector<KeyOpFieldsValuesTuple> kcos;
kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}}});
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> kcos_p;
std::string dbName, tableName;
for (int i =0; i < 3; ++i)
{
p.send(kcos);
ASSERT_TRUE(p.wait(dbName, tableName, kcos_p));
EXPECT_EQ(dbName, TEST_DB);
EXPECT_EQ(tableName, testTableName);
ASSERT_EQ(kcos_p.size(), 1);
EXPECT_EQ(kfvKey(*kcos_p[0]), "k");
EXPECT_EQ(kfvOp(*kcos_p[0]), SET_COMMAND);
std::vector<FieldValueTuple> cos = std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}};
EXPECT_EQ(kfvFieldsValues(*kcos_p[0]), cos);
}
zmq_done = true;
consumerThread->join();
delete consumerThread;
}
TEST(ZmqOneToOneSyncClientError, test)
{
std::string testTableName = "ZMQ_PROD_CONS_UT";
std::string pushEndpoint = "tcp://localhost:1234";
DBConnector db(TEST_DB, 0, true);
ZmqClient client(pushEndpoint, 3000);
ZmqProducerStateTable p(&db, testTableName, client);
std::vector<KeyOpFieldsValuesTuple> kcos;
kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{}});
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> kcos_p;
std::string dbName, tableName;
p.send(kcos);
// Wait will timeout without server reply.
EXPECT_FALSE(p.wait(dbName, tableName, kcos_p));
// Send will return error without server reply.
EXPECT_THROW(p.send(kcos), std::system_error);
}
TEST(ZmqOneToOneSyncServerError, test)
{
std::string testTableName = "ZMQ_PROD_CONS_UT";
std::string pullEndpoint = "tcp://*:1234";
DBConnector db(TEST_DB, 0, true);
ZmqServer server(pullEndpoint, true);
ZmqConsumerStateTable c(&db, testTableName, server);
std::vector<swss::KeyOpFieldsValuesTuple> values;
values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}}});
// Send will return error without client request.
EXPECT_THROW(server.sendMsg(TEST_DB, testTableName, values), std::system_error);
}

0 comments on commit 9f7d789

Please sign in to comment.