diff --git a/common/binaryserializer.h b/common/binaryserializer.h index 2d97a70f0..b19d03b7a 100644 --- a/common/binaryserializer.h +++ b/common/binaryserializer.h @@ -3,6 +3,8 @@ #include "common/armhelper.h" +#include + namespace swss { class BinarySerializer { @@ -10,27 +12,31 @@ class BinarySerializer { static size_t serializeBuffer( const char* buffer, const size_t size, - const std::string& key, - const std::vector& values, - const std::string& command, const std::string& dbName, - const std::string& tableName) + const std::string& tableName, + const std::vector& kcos) { auto tmpSerializer = BinarySerializer(buffer, size); tmpSerializer.setKeyAndValue( dbName.c_str(), dbName.length(), tableName.c_str(), tableName.length()); - tmpSerializer.setKeyAndValue( - key.c_str(), key.length(), - command.c_str(), command.length()); - for (auto& kvp : values) + for (auto& kco : kcos) { - auto& field = fvField(kvp); - auto& value = fvValue(kvp); + auto& key = kfvKey(kco); + auto& fvs = kfvFieldsValues(kco); + std::string fvs_len = std::to_string(fvs.size()); tmpSerializer.setKeyAndValue( - field.c_str(), field.length(), - value.c_str(), value.length()); + key.c_str(), key.length(), + fvs_len.c_str(), fvs_len.length()); + for (auto& fv : fvs) + { + auto& field = fvField(fv); + auto& value = fvValue(fv); + tmpSerializer.setKeyAndValue( + field.c_str(), field.length(), + value.c_str(), value.length()); + } } return tmpSerializer.finalize(); @@ -86,6 +92,54 @@ class BinarySerializer { } } + static void deserializeBuffer( + const char* buffer, + const size_t size, + std::string& dbName, + std::string& tableName, + std::vector>& kcos) + { + std::vector values; + deserializeBuffer(buffer, size, values); + int fvs_size = -1; + KeyOpFieldsValuesTuple kco; + auto& key = kfvKey(kco); + auto& op = kfvOp(kco); + auto& fvs = kfvFieldsValues(kco); + for (auto& fv : values) + { + auto& field = fvField(fv); + auto& value = fvValue(fv); + if (fvs_size < 0) + { + dbName = field; + tableName = value; + fvs_size = 0; + continue; + } + if (fvs_size == 0) + { + key = field; + op = SET_COMMAND; + fvs_size = std::stoi(value); + fvs.clear(); + } + else + { + fvs.push_back(fv); + --fvs_size; + } + if (fvs_size == 0) + { + if (fvs.size() == 0) + { + op = DEL_COMMAND; + } + kcos.push_back(std::make_shared(kco)); + } + } + } + private: const char* m_buffer; const size_t m_buffer_size; diff --git a/common/zmqclient.cpp b/common/zmqclient.cpp index dc7f8d07e..a7cf2951a 100644 --- a/common/zmqclient.cpp +++ b/common/zmqclient.cpp @@ -103,21 +103,17 @@ void ZmqClient::connect() } void ZmqClient::sendMsg( - const std::string& key, - const std::vector& values, - const std::string& command, const std::string& dbName, const std::string& tableName, + const std::vector& kcos, std::vector& sendbuffer) { int serializedlen = (int)BinarySerializer::serializeBuffer( sendbuffer.data(), sendbuffer.size(), - key, - values, - command, dbName, - tableName); + tableName, + kcos); SWSS_LOG_DEBUG("sending: %d", serializedlen); int zmq_err = 0; diff --git a/common/zmqclient.h b/common/zmqclient.h index efe33bd5e..3f56cc299 100644 --- a/common/zmqclient.h +++ b/common/zmqclient.h @@ -19,11 +19,9 @@ class ZmqClient void connect(); - void sendMsg(const std::string& key, - const std::vector& values, - const std::string& command, - const std::string& dbName, + void sendMsg(const std::string& dbName, const std::string& tableName, + const std::vector& kcos, std::vector& sendbuffer); private: void initialize(const std::string& endpoint); diff --git a/common/zmqconsumerstatetable.cpp b/common/zmqconsumerstatetable.cpp index 217b7cf3e..8d7ea19ac 100644 --- a/common/zmqconsumerstatetable.cpp +++ b/common/zmqconsumerstatetable.cpp @@ -52,29 +52,31 @@ ZmqConsumerStateTable::~ZmqConsumerStateTable() } } -void ZmqConsumerStateTable::handleReceivedData(std::shared_ptr pkco) +void ZmqConsumerStateTable::handleReceivedData(const std::vector> &kcos) { - std::shared_ptr clone = nullptr; - if (m_dbUpdateThread != nullptr) - { - // clone before put to received queue, because received data may change by consumer. - clone = std::make_shared(*pkco); - } - + for (auto kco : kcos) { - std::lock_guard lock(m_receivedQueueMutex); - m_receivedOperationQueue.push(pkco); + std::shared_ptr clone = nullptr; + if (m_dbUpdateThread != nullptr) + { + // clone before put to received queue, because received data may change by consumer. + clone = std::make_shared(*kco); + } + { + std::lock_guard lock(m_receivedQueueMutex); + m_receivedOperationQueue.push(kco); + } + if (m_dbUpdateThread != nullptr) + { + { + std::lock_guard lock(m_dbUpdateDataQueueMutex); + m_dbUpdateDataQueue.push(clone); + } + } } - m_selectableEvent.notify(); // will release epoll - if (m_dbUpdateThread != nullptr) { - { - std::lock_guard lock(m_dbUpdateDataQueueMutex); - m_dbUpdateDataQueue.push(clone); - } - m_dbUpdateDataNotifyCv.notify_all(); } } diff --git a/common/zmqconsumerstatetable.h b/common/zmqconsumerstatetable.h index e61a666a8..b024e6ba5 100644 --- a/common/zmqconsumerstatetable.h +++ b/common/zmqconsumerstatetable.h @@ -9,7 +9,7 @@ #include "selectableevent.h" #include "zmqserver.h" -#define MQ_RESPONSE_MAX_COUNT (4*1024*1024) +#define MQ_RESPONSE_MAX_COUNT (16*1024*1024) #define MQ_SIZE 100 #define MQ_MAX_RETRY 10 #define MQ_POLL_TIMEOUT (1000) @@ -73,7 +73,7 @@ class ZmqConsumerStateTable : public Selectable, public TableBase, public ZmqMes } private: - void handleReceivedData(std::shared_ptr pkco); + void handleReceivedData(const std::vector> &kcos); void dbUpdateThread(); diff --git a/common/zmqproducerstatetable.cpp b/common/zmqproducerstatetable.cpp index d959b9206..314b39a40 100644 --- a/common/zmqproducerstatetable.cpp +++ b/common/zmqproducerstatetable.cpp @@ -47,12 +47,13 @@ void ZmqProducerStateTable::set( const string &op /*= SET_COMMAND*/, const string &prefix) { + std::vector kcos = std::vector{ + KeyOpFieldsValuesTuple{key, op, values} + }; m_zmqClient.sendMsg( - key, - values, - op, m_dbName, m_tableNameStr, + kcos, m_sendbuffer); } @@ -61,32 +62,46 @@ void ZmqProducerStateTable::del( const string &op /*= DEL_COMMAND*/, const string &prefix) { + std::vector kcos = std::vector{ + KeyOpFieldsValuesTuple{key, op, std::vector{}} + }; m_zmqClient.sendMsg( - key, - vector(), - op, m_dbName, m_tableNameStr, + kcos, m_sendbuffer); } void ZmqProducerStateTable::set(const std::vector &values) { - for (const auto &value : values) - { - set( - kfvKey(value), - kfvFieldsValues(value), - SET_COMMAND); - } + m_zmqClient.sendMsg( + m_dbName, + m_tableNameStr, + values, + m_sendbuffer); } void ZmqProducerStateTable::del(const std::vector &keys) { + std::vector kcos; for (const auto &key : keys) { - del(key, DEL_COMMAND); + kcos.push_back(KeyOpFieldsValuesTuple{key, DEL_COMMAND, std::vector{}}); } + m_zmqClient.sendMsg( + m_dbName, + m_tableNameStr, + kcos, + m_sendbuffer); +} + +void ZmqProducerStateTable::send(const std::vector &kcos) +{ + m_zmqClient.sendMsg( + m_dbName, + m_tableNameStr, + kcos, + m_sendbuffer); } } diff --git a/common/zmqproducerstatetable.h b/common/zmqproducerstatetable.h index 34ea3d715..fa624d633 100644 --- a/common/zmqproducerstatetable.h +++ b/common/zmqproducerstatetable.h @@ -33,6 +33,9 @@ class ZmqProducerStateTable : public ProducerStateTable virtual void del(const std::vector &keys); + // Batched send that can include both SET and DEL requests. + virtual void send(const std::vector &kcos); + private: void initialize(); diff --git a/common/zmqserver.cpp b/common/zmqserver.cpp index 2a3e3f2f4..c9996fb00 100644 --- a/common/zmqserver.cpp +++ b/common/zmqserver.cpp @@ -63,16 +63,10 @@ ZmqMessageHandler* ZmqServer::findMessageHandler( void ZmqServer::handleReceivedData(const char* buffer, const size_t size) { - auto pkco = std::make_shared(); - KeyOpFieldsValuesTuple &kco = *pkco; - auto& values = kfvFieldsValues(kco); - BinarySerializer::deserializeBuffer(buffer, size, values); - - // get table name - swss::FieldValueTuple fvt = values.at(0); - string dbName = fvField(fvt); - string tableName = fvValue(fvt); - values.erase(values.begin()); + std::string dbName; + std::string tableName; + std::vector> kcos; + BinarySerializer::deserializeBuffer(buffer, size, dbName, tableName, kcos); // find handler auto handler = findMessageHandler(dbName, tableName); @@ -81,13 +75,7 @@ void ZmqServer::handleReceivedData(const char* buffer, const size_t size) return; } - // get key and OP - fvt = values.at(0); - kfvKey(kco) = fvField(fvt); - kfvOp(kco) = fvValue(fvt); - values.erase(values.begin()); - - handler->handleReceivedData(pkco); + handler->handleReceivedData(kcos); } void ZmqServer::mqPollThread() diff --git a/common/zmqserver.h b/common/zmqserver.h index 596ab2c2a..6ffa099d8 100644 --- a/common/zmqserver.h +++ b/common/zmqserver.h @@ -5,7 +5,7 @@ #include #include "table.h" -#define MQ_RESPONSE_MAX_COUNT (4*1024*1024) +#define MQ_RESPONSE_MAX_COUNT (16*1024*1024) #define MQ_SIZE 100 #define MQ_MAX_RETRY 10 #define MQ_POLL_TIMEOUT (1000) @@ -20,7 +20,7 @@ class ZmqMessageHandler { public: virtual ~ZmqMessageHandler() {}; - virtual void handleReceivedData(std::shared_ptr pkco) = 0; + virtual void handleReceivedData(const std::vector>& kcos) = 0; }; class ZmqServer diff --git a/tests/binary_serializer_ut.cpp b/tests/binary_serializer_ut.cpp index beae65d4b..fa8b30db4 100644 --- a/tests/binary_serializer_ut.cpp +++ b/tests/binary_serializer_ut.cpp @@ -10,43 +10,43 @@ using namespace swss; TEST(BinarySerializer, serialize_deserialize) { string test_entry_key = "test_key"; - string test_command = "test_command"; + string test_command = "SET"; string test_db = "test_db"; string test_table = "test_table"; string test_key = "key"; string test_value= "value"; + string test_entry_key2 = "test_key_2"; + string test_command2 = "DEL"; char buffer[200]; std::vector values; values.push_back(std::make_pair(test_key, test_value)); + std::vector kcos = std::vector{ + KeyOpFieldsValuesTuple{test_entry_key, test_command, values}, + KeyOpFieldsValuesTuple{test_entry_key2, test_command2, std::vector{}}}; int serialized_len = (int)BinarySerializer::serializeBuffer( buffer, sizeof(buffer), - test_entry_key, - values, - test_command, test_db, - test_table); + test_table, + kcos); string serialized_str(buffer); - EXPECT_EQ(serialized_len, 107); + EXPECT_EQ(serialized_len, 125); - auto ptr = std::make_shared(); - KeyOpFieldsValuesTuple &kco = *ptr; - auto& deserialized_values = kfvFieldsValues(kco); - BinarySerializer::deserializeBuffer(buffer, serialized_len, deserialized_values); - - swss::FieldValueTuple fvt = deserialized_values.at(0); - EXPECT_TRUE(fvField(fvt) == test_db); - EXPECT_TRUE(fvValue(fvt) == test_table); + std::vector> kcos_ptrs; + std::vector deserialized_kcos; + string db_name; + string db_table; + BinarySerializer::deserializeBuffer(buffer, serialized_len, db_name, db_table, kcos_ptrs); + for (auto kco_ptr : kcos_ptrs) + { + deserialized_kcos.push_back(*kco_ptr); + } - fvt = deserialized_values.at(1); - EXPECT_TRUE(fvField(fvt) == test_entry_key); - EXPECT_TRUE(fvValue(fvt) == test_command); - - fvt = deserialized_values.at(2); - EXPECT_TRUE(fvField(fvt) == test_key); - EXPECT_TRUE(fvValue(fvt) == test_value); + EXPECT_EQ(db_name, test_db); + EXPECT_EQ(db_table, test_table); + EXPECT_EQ(deserialized_kcos, kcos); } TEST(BinarySerializer, serialize_overflow) @@ -54,14 +54,14 @@ TEST(BinarySerializer, serialize_overflow) char buffer[50]; std::vector values; values.push_back(std::make_pair("test_key", "test_value")); + std::vector kcos = std::vector{ + KeyOpFieldsValuesTuple{"test_entry_key", "SET", values}}; EXPECT_THROW(BinarySerializer::serializeBuffer( buffer, sizeof(buffer), - "test_entry_key", - values, - "test_command", "test_db", - "test_table"), runtime_error); + "test_table", + kcos), runtime_error); } TEST(BinarySerializer, deserialize_overflow) @@ -69,18 +69,18 @@ TEST(BinarySerializer, deserialize_overflow) char buffer[200]; std::vector values; values.push_back(std::make_pair("test_key", "test_value")); + std::vector kcos = std::vector{ + KeyOpFieldsValuesTuple{"test_entry_key", "SET", values}}; int serialized_len = (int)BinarySerializer::serializeBuffer( buffer, sizeof(buffer), - "test_entry_key", - values, - "test_command", "test_db", - "test_table"); + "test_table", + kcos); string serialized_str(buffer); - auto ptr = std::make_shared(); - KeyOpFieldsValuesTuple &kco = *ptr; - auto& deserialized_values = kfvFieldsValues(kco); - EXPECT_THROW(BinarySerializer::deserializeBuffer(buffer, serialized_len - 10, deserialized_values), runtime_error); + std::vector> kcos_ptrs; + string db_name; + string db_table; + EXPECT_THROW(BinarySerializer::deserializeBuffer(buffer, serialized_len - 10, db_name, db_table, kcos_ptrs), runtime_error); }