Skip to content

Commit

Permalink
Add batch support in zmq.
Browse files Browse the repository at this point in the history
  • Loading branch information
mint570 committed Jul 7, 2023
1 parent dffd76a commit ed5e2e0
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 108 deletions.
78 changes: 66 additions & 12 deletions common/binaryserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,40 @@

#include "common/armhelper.h"

#include <string>

namespace swss {

class BinarySerializer {
public:
static size_t serializeBuffer(
const char* buffer,
const size_t size,
const std::string& key,
const std::vector<swss::FieldValueTuple>& values,
const std::string& command,
const std::string& dbName,
const std::string& tableName)
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos)
{
auto tmpSerializer = BinarySerializer(buffer, size);

tmpSerializer.setKeyAndValue(
dbName.c_str(), dbName.length(),
tableName.c_str(), tableName.length());
tmpSerializer.setKeyAndValue(
key.c_str(), key.length(),
command.c_str(), command.length());
for (auto& kvp : values)
for (auto& kco : kcos)
{
auto& field = fvField(kvp);
auto& value = fvValue(kvp);
auto& key = kfvKey(kco);
auto& fvs = kfvFieldsValues(kco);
std::string fvs_len = std::to_string(fvs.size());
tmpSerializer.setKeyAndValue(
field.c_str(), field.length(),
value.c_str(), value.length());
key.c_str(), key.length(),
fvs_len.c_str(), fvs_len.length());
for (auto& fv : fvs)
{
auto& field = fvField(fv);
auto& value = fvValue(fv);
tmpSerializer.setKeyAndValue(
field.c_str(), field.length(),
value.c_str(), value.length());
}
}

return tmpSerializer.finalize();
Expand Down Expand Up @@ -86,6 +92,54 @@ class BinarySerializer {
}
}

static void deserializeBuffer(
const char* buffer,
const size_t size,
std::string& dbName,
std::string& tableName,
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos)
{
std::vector<FieldValueTuple> values;
deserializeBuffer(buffer, size, values);
int fvs_size = -1;
KeyOpFieldsValuesTuple kco;
auto& key = kfvKey(kco);
auto& op = kfvOp(kco);
auto& fvs = kfvFieldsValues(kco);
for (auto& fv : values)
{
auto& field = fvField(fv);
auto& value = fvValue(fv);
if (fvs_size < 0)
{
dbName = field;
tableName = value;
fvs_size = 0;
continue;
}
if (fvs_size == 0)
{
key = field;
op = SET_COMMAND;
fvs_size = std::stoi(value);
fvs.clear();
}
else
{
fvs.push_back(fv);
--fvs_size;
}
if (fvs_size == 0)
{
if (fvs.size() == 0)
{
op = DEL_COMMAND;
}
kcos.push_back(std::make_shared<KeyOpFieldsValuesTuple>(kco));
}
}
}

private:
const char* m_buffer;
const size_t m_buffer_size;
Expand Down
10 changes: 3 additions & 7 deletions common/zmqclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,17 @@ void ZmqClient::connect()
}

void ZmqClient::sendMsg(
const std::string& key,
const std::vector<swss::FieldValueTuple>& values,
const std::string& command,
const std::string& dbName,
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos,
std::vector<char>& sendbuffer)
{
int serializedlen = (int)BinarySerializer::serializeBuffer(
sendbuffer.data(),
sendbuffer.size(),
key,
values,
command,
dbName,
tableName);
tableName,
kcos);

SWSS_LOG_DEBUG("sending: %d", serializedlen);
int zmq_err = 0;
Expand Down
6 changes: 2 additions & 4 deletions common/zmqclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ class ZmqClient

void connect();

void sendMsg(const std::string& key,
const std::vector<swss::FieldValueTuple>& values,
const std::string& command,
const std::string& dbName,
void sendMsg(const std::string& dbName,
const std::string& tableName,
const std::vector<KeyOpFieldsValuesTuple>& kcos,
std::vector<char>& sendbuffer);
private:
void initialize(const std::string& endpoint);
Expand Down
36 changes: 19 additions & 17 deletions common/zmqconsumerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,31 @@ ZmqConsumerStateTable::~ZmqConsumerStateTable()
}
}

void ZmqConsumerStateTable::handleReceivedData(std::shared_ptr<KeyOpFieldsValuesTuple> pkco)
void ZmqConsumerStateTable::handleReceivedData(const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> &kcos)
{
std::shared_ptr<KeyOpFieldsValuesTuple> clone = nullptr;
if (m_dbUpdateThread != nullptr)
{
// clone before put to received queue, because received data may change by consumer.
clone = std::make_shared<KeyOpFieldsValuesTuple>(*pkco);
}

for (auto kco : kcos)
{
std::lock_guard<std::mutex> lock(m_receivedQueueMutex);
m_receivedOperationQueue.push(pkco);
std::shared_ptr<KeyOpFieldsValuesTuple> clone = nullptr;
if (m_dbUpdateThread != nullptr)
{
// clone before put to received queue, because received data may change by consumer.
clone = std::make_shared<KeyOpFieldsValuesTuple>(*kco);
}
{
std::lock_guard<std::mutex> lock(m_receivedQueueMutex);
m_receivedOperationQueue.push(kco);
}
if (m_dbUpdateThread != nullptr)
{
{
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex);
m_dbUpdateDataQueue.push(clone);
}
}
}

m_selectableEvent.notify(); // will release epoll

if (m_dbUpdateThread != nullptr)
{
{
std::lock_guard<std::mutex> lock(m_dbUpdateDataQueueMutex);
m_dbUpdateDataQueue.push(clone);
}

m_dbUpdateDataNotifyCv.notify_all();
}
}
Expand Down
4 changes: 2 additions & 2 deletions common/zmqconsumerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include "selectableevent.h"
#include "zmqserver.h"

#define MQ_RESPONSE_MAX_COUNT (4*1024*1024)
#define MQ_RESPONSE_MAX_COUNT (16*1024*1024)
#define MQ_SIZE 100
#define MQ_MAX_RETRY 10
#define MQ_POLL_TIMEOUT (1000)
Expand Down Expand Up @@ -73,7 +73,7 @@ class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMes
}

private:
void handleReceivedData(std::shared_ptr<KeyOpFieldsValuesTuple> pkco);
void handleReceivedData(const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> &kcos);

void dbUpdateThread();

Expand Down
43 changes: 29 additions & 14 deletions common/zmqproducerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ void ZmqProducerStateTable::set(
const string &op /*= SET_COMMAND*/,
const string &prefix)
{
std::vector<KeyOpFieldsValuesTuple> kcos = std::vector<KeyOpFieldsValuesTuple>{
KeyOpFieldsValuesTuple{key, op, values}
};
m_zmqClient.sendMsg(
key,
values,
op,
m_dbName,
m_tableNameStr,
kcos,
m_sendbuffer);
}

Expand All @@ -61,32 +62,46 @@ void ZmqProducerStateTable::del(
const string &op /*= DEL_COMMAND*/,
const string &prefix)
{
std::vector<KeyOpFieldsValuesTuple> kcos = std::vector<KeyOpFieldsValuesTuple>{
KeyOpFieldsValuesTuple{key, op, std::vector<FieldValueTuple>{}}
};
m_zmqClient.sendMsg(
key,
vector<FieldValueTuple>(),
op,
m_dbName,
m_tableNameStr,
kcos,
m_sendbuffer);
}

void ZmqProducerStateTable::set(const std::vector<KeyOpFieldsValuesTuple> &values)
{
for (const auto &value : values)
{
set(
kfvKey(value),
kfvFieldsValues(value),
SET_COMMAND);
}
m_zmqClient.sendMsg(
m_dbName,
m_tableNameStr,
values,
m_sendbuffer);
}

void ZmqProducerStateTable::del(const std::vector<std::string> &keys)
{
std::vector<KeyOpFieldsValuesTuple> kcos;
for (const auto &key : keys)
{
del(key, DEL_COMMAND);
kcos.push_back(KeyOpFieldsValuesTuple{key, DEL_COMMAND, std::vector<FieldValueTuple>{}});
}
m_zmqClient.sendMsg(
m_dbName,
m_tableNameStr,
kcos,
m_sendbuffer);
}

void ZmqProducerStateTable::send(const std::vector<KeyOpFieldsValuesTuple> &kcos)
{
m_zmqClient.sendMsg(
m_dbName,
m_tableNameStr,
kcos,
m_sendbuffer);
}

}
3 changes: 3 additions & 0 deletions common/zmqproducerstatetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class ZmqProducerStateTable : public ProducerStateTable

virtual void del(const std::vector<std::string> &keys);

// Batched send that can include both SET and DEL requests.
virtual void send(const std::vector<KeyOpFieldsValuesTuple> &kcos);

private:
void initialize();

Expand Down
22 changes: 5 additions & 17 deletions common/zmqserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,10 @@ ZmqMessageHandler* ZmqServer::findMessageHandler(

void ZmqServer::handleReceivedData(const char* buffer, const size_t size)
{
auto pkco = std::make_shared<KeyOpFieldsValuesTuple>();
KeyOpFieldsValuesTuple &kco = *pkco;
auto& values = kfvFieldsValues(kco);
BinarySerializer::deserializeBuffer(buffer, size, values);

// get table name
swss::FieldValueTuple fvt = values.at(0);
string dbName = fvField(fvt);
string tableName = fvValue(fvt);
values.erase(values.begin());
std::string dbName;
std::string tableName;
std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>> kcos;
BinarySerializer::deserializeBuffer(buffer, size, dbName, tableName, kcos);

// find handler
auto handler = findMessageHandler(dbName, tableName);
Expand All @@ -81,13 +75,7 @@ void ZmqServer::handleReceivedData(const char* buffer, const size_t size)
return;
}

// get key and OP
fvt = values.at(0);
kfvKey(kco) = fvField(fvt);
kfvOp(kco) = fvValue(fvt);
values.erase(values.begin());

handler->handleReceivedData(pkco);
handler->handleReceivedData(kcos);
}

void ZmqServer::mqPollThread()
Expand Down
4 changes: 2 additions & 2 deletions common/zmqserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <condition_variable>
#include "table.h"

#define MQ_RESPONSE_MAX_COUNT (4*1024*1024)
#define MQ_RESPONSE_MAX_COUNT (16*1024*1024)
#define MQ_SIZE 100
#define MQ_MAX_RETRY 10
#define MQ_POLL_TIMEOUT (1000)
Expand All @@ -20,7 +20,7 @@ class ZmqMessageHandler
{
public:
virtual ~ZmqMessageHandler() {};
virtual void handleReceivedData(std::shared_ptr<KeyOpFieldsValuesTuple> pkco) = 0;
virtual void handleReceivedData(const std::vector<std::shared_ptr<KeyOpFieldsValuesTuple>>& kcos) = 0;
};

class ZmqServer
Expand Down
Loading

0 comments on commit ed5e2e0

Please sign in to comment.