From c98db957a3926507ceb1d93899d0a6336ed00617 Mon Sep 17 00:00:00 2001 From: Yijiao Qin Date: Mon, 22 Jul 2024 15:34:22 -0700 Subject: [PATCH] redispipeline publish at flush --- common/producerstatetable.cpp | 17 ++++--------- common/redispipeline.h | 44 ++++++++++++++++++++++++++++++++++ tests/redis_piped_state_ut.cpp | 34 ++++++++++++++++---------- 3 files changed, 69 insertions(+), 26 deletions(-) diff --git a/common/producerstatetable.cpp b/common/producerstatetable.cpp index d0db5e2a5..093759478 100644 --- a/common/producerstatetable.cpp +++ b/common/producerstatetable.cpp @@ -27,25 +27,22 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta , m_tempViewActive(false) , m_pipe(pipeline) { + + m_pipe->addChannel(getChannelName(m_pipe->getDbId())); + // num in luaSet and luaDel means number of elements that were added to the key set, // not including all the elements already present into the set. string luaSet = "local added = redis.call('SADD', KEYS[2], ARGV[2])\n" "for i = 0, #KEYS - 3 do\n" " redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n" - "end\n" - " if added > 0 then \n" - " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" "end\n"; m_shaSet = m_pipe->loadRedisScript(luaSet); string luaDel = "local added = redis.call('SADD', KEYS[2], ARGV[2])\n" "redis.call('SADD', KEYS[4], ARGV[2])\n" - "redis.call('DEL', KEYS[3])\n" - "if added > 0 then \n" - " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" - "end\n"; + "redis.call('DEL', KEYS[3])\n"; m_shaDel = m_pipe->loadRedisScript(luaDel); string luaBatchedSet = @@ -59,9 +56,6 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta " redis.call('HSET', KEYS[3] .. KEYS[4 + i], attr, val)\n" " end\n" " idx = idx + tonumber(ARGV[idx]) * 2 + 1\n" - "end\n" - "if added > 0 then \n" - " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" "end\n"; m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet); @@ -71,9 +65,6 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta " added = added + redis.call('SADD', KEYS[2], KEYS[5 + i])\n" " redis.call('SADD', KEYS[3], KEYS[5 + i])\n" " redis.call('DEL', KEYS[4] .. KEYS[5 + i])\n" - "end\n" - "if added > 0 then \n" - " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" "end\n"; m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel); diff --git a/common/redispipeline.h b/common/redispipeline.h index b8efa3840..be7561b6b 100644 --- a/common/redispipeline.h +++ b/common/redispipeline.h @@ -2,7 +2,10 @@ #include #include +#include #include +#include +#include #include "redisreply.h" #include "rediscommand.h" #include "dbconnector.h" @@ -22,9 +25,11 @@ class RedisPipeline { RedisPipeline(const DBConnector *db, size_t sz = 128) : COMMAND_MAX(sz) , m_remaining(0) + , m_shaPub("") { m_db = db->newConnector(NEWCONNECTOR_TIMEOUT); initializeOwnerTid(); + lastHeartBeat = std::chrono::steady_clock::now(); } ~RedisPipeline() { @@ -113,11 +118,19 @@ class RedisPipeline { void flush() { + lastHeartBeat = std::chrono::steady_clock::now(); + + if (m_remaining == 0) { + return; + } + while(m_remaining) { // Construct an object to use its dtor, so that resource is released RedisReply r(pop()); } + + publish(); } size_t size() @@ -145,12 +158,43 @@ class RedisPipeline { m_ownerTid = gettid(); } + void addChannel(std::string channel) + { + if (m_channels.find(channel) != m_channels.end()) + return; + + m_channels.insert(channel); + m_luaPub += "redis.call('PUBLISH', '" + channel + "', 'G');"; + m_shaPub = loadRedisScript(m_luaPub); + } + + int getIdleTime(std::chrono::time_point tcurrent=std::chrono::steady_clock::now()) + { + return static_cast(std::chrono::duration_cast(tcurrent - lastHeartBeat).count()); + } + + void publish() { + if (m_shaPub.empty()) { + return; + } + RedisCommand cmd; + cmd.format( + "EVALSHA %s 0", + m_shaPub.c_str()); + RedisReply r(m_db, cmd); + } + private: DBConnector *m_db; std::queue m_expectedTypes; size_t m_remaining; long int m_ownerTid; + std::string m_luaPub; + std::string m_shaPub; + std::chrono::time_point lastHeartBeat; // marks the timestamp of latest pipeline flush being invoked + std::unordered_set m_channels; + void mayflush() { if (m_remaining >= COMMAND_MAX) diff --git a/tests/redis_piped_state_ut.cpp b/tests/redis_piped_state_ut.cpp index ca3291907..6aa3f0b72 100644 --- a/tests/redis_piped_state_ut.cpp +++ b/tests/redis_piped_state_ut.cpp @@ -106,29 +106,37 @@ static void consumerWorker(int index) { string tableName = "UT_REDIS_THREAD_" + to_string(index); DBConnector db(TEST_DB, 0, true); - ConsumerStateTable c(&db, tableName); + ConsumerStateTable c(&db, tableName, NUMBER_OF_OPS); Select cs; Selectable *selectcs; int numberOfKeysSet = 0; int numberOfKeyDeleted = 0; int ret, i = 0; - KeyOpFieldsValuesTuple kco; + std::deque entries; cs.addSelectable(&c); - while ((ret = cs.select(&selectcs)) == Select::OBJECT) + while (true) { - c.pop(kco); - if (kfvOp(kco) == "SET") - { - numberOfKeysSet++; - validateFields(kfvKey(kco), kfvFieldsValues(kco)); - } else if (kfvOp(kco) == "DEL") + ret = cs.select(&selectcs); + c.pops(entries); + + for (auto& kco: entries) { - numberOfKeyDeleted++; - } + if (kfvOp(kco) == "SET") + { + numberOfKeysSet++; + validateFields(kfvKey(kco), kfvFieldsValues(kco)); + } else if (kfvOp(kco) == "DEL") + { + numberOfKeyDeleted++; + } + + if ((i++ % 100) == 0) + cout << "-" << flush; - if ((i++ % 100) == 0) - cout << "-" << flush; + if (numberOfKeyDeleted == NUMBER_OF_OPS) + break; + } if (numberOfKeyDeleted == NUMBER_OF_OPS) break;