Skip to content

Commit

Permalink
ZMQ lib change.
Browse files Browse the repository at this point in the history
  • Loading branch information
divyagayathri-hcl committed Oct 1, 2024
1 parent 898aa5d commit 99e238d
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 0 deletions.
35 changes: 35 additions & 0 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,39 @@ void ZmqClient::sendMsg(
throw system_error(make_error_code(errc::io_error), message);
}

bool ZmqClient::wait(std::string& dbName,
std::string& tableName,
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos,
std::vector<char>& buffer)
{
SWSS_LOG_ENTER();
zmq_pollitem_t items [1] = { };
items[0].socket = m_socket;
items[0].events = ZMQ_POLLIN;
int rc;
for (int i = 0; true ; ++i)
{
rc = zmq_recv(m_socket, buffer.data(), buffer.size(), 0);
if (rc < 0)
{
if (zmq_errno() == EINTR && i <= MQ_MAX_RETRY)
{
continue;
}
SWSS_LOG_THROW("zmq_recv failed, zmqerrno: %d", zmq_errno());
}
if (rc >= (int)buffer.size())
{
SWSS_LOG_THROW(
"zmq_recv message was truncated (over %d bytes, received %d), increase buffer size, message DROPPED",
(int)buffer.size(), rc);
}
break;
}
buffer.at(rc) = 0; // make sure that we end string with zero before parse
kcos.clear();
BinarySerializer::deserializeBuffer(buffer.data(), buffer.size(), dbName, tableName, kcos);
return true;
}

}
5 changes: 5 additions & 0 deletions common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ class ZmqClient
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos,
std::vector<char>& sendbuffer);
bool wait(std::string& dbName,
std::string& tableName,
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos,
std::vector<char>& buffer);

private:
void initialize(const std::string& endpoint, const std::string& vrf);

Expand Down
7 changes: 7 additions & 0 deletions common/zmqproducerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,11 @@ size_t ZmqProducerStateTable::dbUpdaterQueueSize()
return m_asyncDBUpdater->queueSize();
}

bool ZmqProducerStateTable::wait(std::string& dbName,
std::string& tableName,
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
{
return m_zmqClient.wait(dbName, tableName, kcos, m_sendbuffer);
}

}
6 changes: 6 additions & 0 deletions common/zmqproducerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ class ZmqProducerStateTable : public ProducerStateTable
virtual void send(const std::vector<KeyOpFieldsValuesTuple> &kcos);

size_t dbUpdaterQueueSize();

// This method should only be used if the ZmqClient enables one-to-one sync.
virtual bool wait(std::string& dbName,
std::string& tableName,
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos);

private:
void initialize(DBConnector *db, const std::string &tableName, bool dbPersistence);

Expand Down
6 changes: 6 additions & 0 deletions common/zmqserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ class ZmqServer
const std::string tableName,
ZmqMessageHandler* handler);

// This method should only be used in one-to-one sync mode with the client.
void sendMsg(const std::string& dbName, const std::string& tableName,
const std::vector<swss::KeyOpFieldsValuesTuple>& values);

private:
void connect();

void handleReceivedData(const char* buffer, const size_t size);

void mqPollThread();
Expand Down
94 changes: 94 additions & 0 deletions tests/zmq_state_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ static void testMethod(bool producerPersistence)
// start consumer first, SHM can only have 1 consumer per table.
thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence);

// Wait for the consumer to start.
sleep(1);

cout << "Starting " << NUMBER_OF_THREADS << " producers" << endl;
/* Starting the producer before the producer */
for (int i = 0; i < NUMBER_OF_THREADS; i++)
Expand Down Expand Up @@ -351,6 +354,9 @@ static void testBatchMethod(bool producerPersistence)
// start consumer first, SHM can only have 1 consumer per table.
thread *consumerThread = new thread(consumerWorker, testTableName, pullEndpoint, !producerPersistence);

// Wait for the consumer to start.
sleep(1);

cout << "Starting " << NUMBER_OF_THREADS << " producers" << endl;
/* Starting the producer before the producer */
for (int i = 0; i < NUMBER_OF_THREADS; i++)
Expand Down Expand Up @@ -397,6 +403,94 @@ static void testBatchMethod(bool producerPersistence)
cout << endl << "Done." << endl;
}

static bool zmq_done = false;
static void zmqConsumerWorker(string tableName, string endpoint)
{
cout << "Consumer thread started: " << tableName << endl;
DBConnector db(TEST_DB, 0, true);
ZmqServer server(endpoint, true);
ZmqConsumerStateTable c(&db, tableName, server);
Select cs;
cs.addSelectable(&c);
//validate received data
Selectable *selectcs;
std::deque<KeyOpFieldsValuesTuple> vkco;
int ret = 0;
while (!zmq_done)
{
ret = cs.select(&selectcs, 10, true);
if (ret == Select::OBJECT)
{
c.pops(vkco);
std::vector<swss::KeyOpFieldsValuesTuple> values;
values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}}});
server.sendMsg(TEST_DB, tableName, values);
}
}
allDataReceived = true;
cout << "Consumer thread ended: " << tableName << endl;
}
TEST(ZmqOneToOneSync, test)
{
std::string testTableName = "ZMQ_PROD_CONS_UT";
std::string pushEndpoint = "tcp://localhost:1234";
std::string pullEndpoint = "tcp://*:1234";
// start consumer first, SHM can only have 1 consumer per table.
thread *consumerThread = new thread(zmqConsumerWorker, testTableName, pullEndpoint);
// Wait for the consumer to be ready.
sleep(1);
DBConnector db(TEST_DB, 0, true);
ZmqClient client(pushEndpoint, 3000);
ZmqProducerStateTable p(&db, testTableName, client);
std::vector<KeyOpFieldsValuesTuple> kcos;
kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}}});
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> kcos_p;
std::string dbName, tableName;
for (int i =0; i < 3; ++i)
{
p.send(kcos);
ASSERT_TRUE(p.wait(dbName, tableName, kcos_p));
EXPECT_EQ(dbName, TEST_DB);
EXPECT_EQ(tableName, testTableName);
ASSERT_EQ(kcos_p.size(), 1);
EXPECT_EQ(kfvKey(*kcos_p[0]), "k");
EXPECT_EQ(kfvOp(*kcos_p[0]), SET_COMMAND);
std::vector<FieldValueTuple> cos = std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}};
EXPECT_EQ(kfvFieldsValues(*kcos_p[0]), cos);
}
zmq_done = true;
consumerThread->join();
delete consumerThread;
}
TEST(ZmqOneToOneSyncClientError, test)
{
std::string testTableName = "ZMQ_PROD_CONS_UT";
std::string pushEndpoint = "tcp://localhost:1234";
DBConnector db(TEST_DB, 0, true);
ZmqClient client(pushEndpoint, 3000);
ZmqProducerStateTable p(&db, testTableName, client);
std::vector<KeyOpFieldsValuesTuple> kcos;
kcos.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{}});
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> kcos_p;
std::string dbName, tableName;
p.send(kcos);
// Wait will timeout without server reply.
EXPECT_FALSE(p.wait(dbName, tableName, kcos_p));
// Send will return error without server reply.
EXPECT_THROW(p.send(kcos), std::system_error);
}
TEST(ZmqOneToOneSyncServerError, test)
{
std::string testTableName = "ZMQ_PROD_CONS_UT";
std::string pullEndpoint = "tcp://*:1234";
DBConnector db(TEST_DB, 0, true);
ZmqServer server(pullEndpoint, true);
ZmqConsumerStateTable c(&db, testTableName, server);
std::vector<swss::KeyOpFieldsValuesTuple> values;
values.push_back(KeyOpFieldsValuesTuple{"k", SET_COMMAND, std::vector<FieldValueTuple>{FieldValueTuple{"f", "v"}}});
// Send will return error without client request.
EXPECT_THROW(server.sendMsg(TEST_DB, testTableName, values), std::system_error);
}
TEST(ZmqConsumerStateTable, test)
{
// test with persist by consumer
Expand Down

0 comments on commit 99e238d

Please sign in to comment.