From d2cb95f5c9381a24af99db8eae2b7fc83826ab3e Mon Sep 17 00:00:00 2001 From: Yijiao Qin Date: Mon, 22 Jul 2024 15:34:22 -0700 Subject: [PATCH] redispipeline publish at flush --- common/Makefile.am | 3 +- common/producerstatetable.cpp | 43 +++++----- common/producerstatetable.h | 4 +- common/redispipeline.cpp | 147 ++++++++++++++++++++++++++++++++++ common/redispipeline.h | 147 ++++++---------------------------- 5 files changed, 201 insertions(+), 143 deletions(-) create mode 100644 common/redispipeline.cpp diff --git a/common/Makefile.am b/common/Makefile.am index 18cfd8035..f2f522583 100644 --- a/common/Makefile.am +++ b/common/Makefile.am @@ -68,7 +68,8 @@ common_libswsscommon_la_SOURCES = \ common/zmqclient.cpp \ common/zmqserver.cpp \ common/asyncdbupdater.cpp \ - common/redis_table_waiter.cpp + common/redis_table_waiter.cpp \ + common/redispipeline.cpp common_libswsscommon_la_CXXFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CFLAGS) $(CODE_COVERAGE_CXXFLAGS) common_libswsscommon_la_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(LIBNL_CPPFLAGS) $(CODE_COVERAGE_CPPFLAGS) diff --git a/common/producerstatetable.cpp b/common/producerstatetable.cpp index d0db5e2a5..963eb9dfb 100644 --- a/common/producerstatetable.cpp +++ b/common/producerstatetable.cpp @@ -3,6 +3,7 @@ #include #include #include +#include "redispipeline.h" #include "redisreply.h" #include "table.h" #include "redisapi.h" @@ -13,20 +14,25 @@ using namespace std; namespace swss { -ProducerStateTable::ProducerStateTable(DBConnector *db, const string &tableName) - : ProducerStateTable(new RedisPipeline(db, 1), tableName, false) +ProducerStateTable::ProducerStateTable(DBConnector *db, const string &tableName, bool flushPub) + : ProducerStateTable(new RedisPipeline(db, 1), tableName, false, flushPub) { m_pipeowned = true; } -ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered) +ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered, bool flushPub) : TableBase(tableName, SonicDBConfig::getSeparator(pipeline->getDBConnector())) , TableName_KeySet(tableName) + , m_flushPub(flushPub) , m_buffered(buffered) , m_pipeowned(false) , m_tempViewActive(false) , m_pipe(pipeline) { + if (m_flushPub) { + m_pipe->addChannel(getChannelName(m_pipe->getDbId())); + } + // num in luaSet and luaDel means number of elements that were added to the key set, // not including all the elements already present into the set. string luaSet = @@ -34,19 +40,11 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta "for i = 0, #KEYS - 3 do\n" " redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n" "end\n" - " if added > 0 then \n" - " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" - "end\n"; - m_shaSet = m_pipe->loadRedisScript(luaSet); string luaDel = "local added = redis.call('SADD', KEYS[2], ARGV[2])\n" "redis.call('SADD', KEYS[4], ARGV[2])\n" "redis.call('DEL', KEYS[3])\n" - "if added > 0 then \n" - " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" - "end\n"; - m_shaDel = m_pipe->loadRedisScript(luaDel); string luaBatchedSet = "local added = 0\n" @@ -60,10 +58,6 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta " end\n" " idx = idx + tonumber(ARGV[idx]) * 2 + 1\n" "end\n" - "if added > 0 then \n" - " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" - "end\n"; - m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet); string luaBatchedDel = "local added = 0\n" @@ -72,10 +66,6 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta " redis.call('SADD', KEYS[3], KEYS[5 + i])\n" " redis.call('DEL', KEYS[4] .. KEYS[5 + i])\n" "end\n" - "if added > 0 then \n" - " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" - "end\n"; - m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel); string luaClear = "redis.call('DEL', KEYS[1])\n" @@ -84,6 +74,21 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta " redis.call('DEL', k)\n" "end\n" "redis.call('DEL', KEYS[3])\n"; + + if (!m_flushPub) { + string luaPub = + "if added > 0 then \n" + " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" + "end\n"; + luaSet += luaPub; + luaDel += luaPub; + luaBatchedSet += luaPub; + luaBatchedDel += luaPub; + } + m_shaSet = m_pipe->loadRedisScript(luaSet); + m_shaDel = m_pipe->loadRedisScript(luaDel); + m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet); + m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel); m_shaClear = m_pipe->loadRedisScript(luaClear); string luaApplyView = loadLuaScript("producer_state_table_apply_view.lua"); diff --git a/common/producerstatetable.h b/common/producerstatetable.h index b6fa78684..6c5fc8f51 100644 --- a/common/producerstatetable.h +++ b/common/producerstatetable.h @@ -10,8 +10,8 @@ namespace swss { class ProducerStateTable : public TableBase, public TableName_KeySet { public: - ProducerStateTable(DBConnector *db, const std::string &tableName); - ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false); + ProducerStateTable(DBConnector *db, const std::string &tableName, bool flushPub = false); + ProducerStateTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered = false, bool flushPub = false); virtual ~ProducerStateTable(); void setBuffered(bool buffered); diff --git a/common/redispipeline.cpp b/common/redispipeline.cpp new file mode 100644 index 000000000..e3c742a8e --- /dev/null +++ b/common/redispipeline.cpp @@ -0,0 +1,147 @@ +#include "redispipeline.h" + +namespace swss { + +RedisPipeline::~RedisPipeline() +{ + flush(); + delete m_db; +} + +redisReply *RedisPipeline::push(const RedisCommand& command, int expectedType) +{ + switch (expectedType) + { + case REDIS_REPLY_NIL: + case REDIS_REPLY_STATUS: + case REDIS_REPLY_INTEGER: + { + int rc = redisAppendFormattedCommand(m_db->getContext(), command.c_str(), command.length()); + if (rc != REDIS_OK) + { + // The only reason of error is REDIS_ERR_OOM (Out of memory) + // ref: https://github.com/redis/hiredis/blob/master/hiredis.c + throw std::bad_alloc(); + } + m_expectedTypes.push(expectedType); + m_remaining++; + mayflush(); + return NULL; + } + default: + { + flush(); + RedisReply r(m_db, command, expectedType); + return r.release(); + } + } +} + +redisReply *RedisPipeline::push(const RedisCommand& command) +{ + flush(); + RedisReply r(m_db, command); + return r.release(); +} + +std::string RedisPipeline::loadRedisScript(const std::string& script) +{ + RedisCommand loadcmd; + loadcmd.format("SCRIPT LOAD %s", script.c_str()); + RedisReply r = push(loadcmd, REDIS_REPLY_STRING); + + std::string sha = r.getReply(); + return sha; +} + +void RedisPipeline::flush() { + + lastHeartBeat = std::chrono::steady_clock::now(); + + if (m_remaining == 0) { + return; + } + + while(m_remaining) + { + // Construct an object to use its dtor, so that resource is released + RedisReply r(pop()); + + } + publish(); + +} + +int RedisPipeline::getIdleTime(std::chrono::time_point tcurrent) +{ + return static_cast(std::chrono::duration_cast(tcurrent - lastHeartBeat).count()); +} + +size_t RedisPipeline::size() +{ + return m_remaining; +} + +int RedisPipeline::getDbId() +{ + return m_db->getDbId(); +} + +std::string RedisPipeline::getDbName() +{ + return m_db->getDbName(); +} + +DBConnector *RedisPipeline::getDBConnector() +{ + return m_db; +} + +void RedisPipeline::addChannel(std::string channel) { + m_luaPub += + "redis.call('PUBLISH', '" + channel + "', 'G');"; + + m_shaPub = loadRedisScript(m_luaPub); +} + +redisReply *RedisPipeline::pop() +{ + if (m_remaining == 0) return NULL; + + redisReply *reply; + int rc = redisGetReply(m_db->getContext(), (void**)&reply); + if (rc != REDIS_OK) + { + throw RedisError("Failed to redisGetReply in RedisPipeline::pop", m_db->getContext()); + } + RedisReply r(reply); + m_remaining--; + + int expectedType = m_expectedTypes.front(); + m_expectedTypes.pop(); + r.checkReplyType(expectedType); + if (expectedType == REDIS_REPLY_STATUS) + { + r.checkStatusOK(); + } + return r.release(); +} + +void RedisPipeline::mayflush() +{ + if (m_remaining >= COMMAND_MAX) + flush(); +} + +void RedisPipeline::publish() { + if (m_shaPub == "") { + return; + } + RedisCommand cmd; + cmd.format( + "EVALSHA %s 0", + m_shaPub.c_str()); + RedisReply r(m_db, cmd); +} + +} // namespace swss \ No newline at end of file diff --git a/common/redispipeline.h b/common/redispipeline.h index b8efa3840..543af2388 100644 --- a/common/redispipeline.h +++ b/common/redispipeline.h @@ -1,16 +1,14 @@ #pragma once #include -#include #include +#include +#include +#include +#include "logger.h" #include "redisreply.h" #include "rediscommand.h" #include "dbconnector.h" -#include "logger.h" - -#include "unistd.h" -#include "sys/syscall.h" -#define gettid() syscall(SYS_gettid) namespace swss { @@ -22,140 +20,47 @@ class RedisPipeline { RedisPipeline(const DBConnector *db, size_t sz = 128) : COMMAND_MAX(sz) , m_remaining(0) + , m_shaPub("") { m_db = db->newConnector(NEWCONNECTOR_TIMEOUT); - initializeOwnerTid(); + lastHeartBeat = std::chrono::steady_clock::now(); } - ~RedisPipeline() { - if (m_ownerTid == gettid()) - { - // call flush from different thread will trigger race condition issue. - flush(); - } - else - { - SWSS_LOG_NOTICE("RedisPipeline dtor is called from another thread, possibly due to exit(), Database: %s", getDbName().c_str()); - } - - delete m_db; - } + ~RedisPipeline(); + + redisReply *push(const RedisCommand& command, int expectedType); - redisReply *push(const RedisCommand& command, int expectedType) - { - switch (expectedType) - { - case REDIS_REPLY_NIL: - case REDIS_REPLY_STATUS: - case REDIS_REPLY_INTEGER: - { - int rc = command.appendTo(m_db->getContext()); - if (rc != REDIS_OK) - { - // The only reason of error is REDIS_ERR_OOM (Out of memory) - // ref: https://github.com/redis/hiredis/blob/master/hiredis.c - throw std::bad_alloc(); - } - m_expectedTypes.push(expectedType); - m_remaining++; - mayflush(); - return NULL; - } - default: - { - flush(); - RedisReply r(m_db, command, expectedType); - return r.release(); - } - } - } + redisReply *push(const RedisCommand& command); - redisReply *push(const RedisCommand& command) - { - flush(); - RedisReply r(m_db, command); - return r.release(); - } + std::string loadRedisScript(const std::string& script); - std::string loadRedisScript(const std::string& script) - { - RedisCommand loadcmd; - loadcmd.format("SCRIPT LOAD %s", script.c_str()); - RedisReply r = push(loadcmd, REDIS_REPLY_STRING); + void flush(); - std::string sha = r.getReply(); - return sha; - } + size_t size(); - // The caller is responsible to release the reply object - redisReply *pop() - { - if (m_remaining == 0) return NULL; - - redisReply *reply; - int rc = redisGetReply(m_db->getContext(), (void**)&reply); - if (rc != REDIS_OK) - { - throw RedisError("Failed to redisGetReply in RedisPipeline::pop", m_db->getContext()); - } - RedisReply r(reply); - m_remaining--; - - int expectedType = m_expectedTypes.front(); - m_expectedTypes.pop(); - r.checkReplyType(expectedType); - if (expectedType == REDIS_REPLY_STATUS) - { - r.checkStatusOK(); - } - return r.release(); - } + int getDbId(); - void flush() - { - while(m_remaining) - { - // Construct an object to use its dtor, so that resource is released - RedisReply r(pop()); - } - } + std::string getDbName(); - size_t size() - { - return m_remaining; - } - - int getDbId() - { - return m_db->getDbId(); - } - - std::string getDbName() - { - return m_db->getDbName(); - } + DBConnector *getDBConnector(); - DBConnector *getDBConnector() - { - return m_db; - } + void addChannel(std::string channel); - void initializeOwnerTid() - { - m_ownerTid = gettid(); - } + int getIdleTime(std::chrono::time_point tcurrent=std::chrono::steady_clock::now()); private: DBConnector *m_db; std::queue m_expectedTypes; size_t m_remaining; - long int m_ownerTid; + std::string m_luaPub; + std::string m_shaPub; + std::chrono::time_point lastHeartBeat; - void mayflush() - { - if (m_remaining >= COMMAND_MAX) - flush(); - } + // The caller is responsible to release the reply object + redisReply *pop(); + + void mayflush(); + void publish(); }; }