From d0f435491862b3dd201c3f8ce7c194e3a33f1ffc Mon Sep 17 00:00:00 2001 From: Hauru <85615957+Yangsx-1@users.noreply.github.com> Date: Fri, 15 Mar 2024 08:45:46 +0800 Subject: [PATCH] Add support of the XACK command (#2169) --- src/commands/cmd_stream.cc | 38 +++++++++++++- src/types/redis_stream.cc | 42 +++++++++++++++ src/types/redis_stream.h | 2 + tests/gocase/unit/type/stream/stream_test.go | 54 ++++++++++++++++++++ 4 files changed, 135 insertions(+), 1 deletion(-) diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index a90caece532..f8736168690 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -31,6 +31,41 @@ namespace redis { +class CommandXAck : public Commander { + public: + Status Parse(const std::vector &args) override { + stream_name_ = args[1]; + group_name_ = args[2]; + StreamEntryID tmp_id; + for (size_t i = 3; i < args.size(); ++i) { + auto s = ParseStreamEntryID(args[i], &tmp_id); + if (!s.IsOK()) { + return {Status::RedisParseErr, s.Msg()}; + } + entry_ids_.emplace_back(tmp_id); + } + + return Status::OK(); + } + + Status Execute(Server *srv, Connection *conn, std::string *output) override { + redis::Stream stream_db(srv->storage, conn->GetNamespace()); + uint64_t acknowledged = 0; + auto s = stream_db.DeletePelEntries(stream_name_, group_name_, entry_ids_, &acknowledged); + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + *output = redis::Integer(acknowledged); + + return Status::OK(); + } + + private: + std::string stream_name_; + std::string group_name_; + std::vector entry_ids_; +}; + class CommandXAdd : public Commander { public: Status Parse(const std::vector &args) override { @@ -1496,7 +1531,8 @@ class CommandXSetId : public Commander { std::optional entries_added_; }; -REDIS_REGISTER_COMMANDS(MakeCmdAttr("xadd", -5, "write", 1, 1, 1), +REDIS_REGISTER_COMMANDS(MakeCmdAttr("xack", -4, "write no-dbsize-check", 1, 1, 1), + MakeCmdAttr("xadd", -5, "write", 1, 1, 1), MakeCmdAttr("xdel", -3, "write no-dbsize-check", 1, 1, 1), MakeCmdAttr("xgroup", -4, "write", 2, 2, 1), MakeCmdAttr("xlen", -2, "read-only", 1, 1, 1), diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index afc26a5ff6c..aedcdfe8f7f 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -314,6 +314,48 @@ StreamSubkeyType Stream::identifySubkeyType(const rocksdb::Slice &key) const { return StreamSubkeyType::StreamConsumerMetadata; } +rocksdb::Status Stream::DeletePelEntries(const Slice &stream_name, const std::string &group_name, + const std::vector &entry_ids, uint64_t *acknowledged) { + *acknowledged = 0; + + std::string ns_key = AppendNamespacePrefix(stream_name); + + LockGuard guard(storage_->GetLockManager(), ns_key); + StreamMetadata metadata(false); + rocksdb::Status s = GetMetadata(ns_key, &metadata); + if (!s.ok()) { + return s.IsNotFound() ? rocksdb::Status::OK() : s; + } + + std::string group_key = internalKeyFromGroupName(ns_key, metadata, group_name); + std::string get_group_value; + s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key, &get_group_value); + if (!s.ok()) { + return s.IsNotFound() ? rocksdb::Status::OK() : s; + } + + auto batch = storage_->GetWriteBatchBase(); + WriteBatchLogData log_data(kRedisStream); + batch->PutLogData(log_data.Encode()); + + for (const auto &id : entry_ids) { + std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id); + std::string value; + s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, &value); + if (s.ok()) { + *acknowledged += 1; + batch->Delete(stream_cf_handle_, entry_key); + } + } + if (*acknowledged > 0) { + StreamConsumerGroupMetadata group_metadata = decodeStreamConsumerGroupMetadataValue(get_group_value); + group_metadata.pending_number -= *acknowledged; + std::string group_value = encodeStreamConsumerGroupMetadataValue(group_metadata); + batch->Put(stream_cf_handle_, group_key, group_value); + } + return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const StreamXGroupCreateOptions &options, const std::string &group_name) { if (std::isdigit(group_name[0])) { diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h index 8fa7bb706cf..f569c253f1e 100644 --- a/src/types/redis_stream.h +++ b/src/types/redis_stream.h @@ -49,6 +49,8 @@ class Stream : public SubKeyScanner { rocksdb::Status GroupSetId(const Slice &stream_name, const std::string &group_name, const StreamXGroupCreateOptions &options); rocksdb::Status DeleteEntries(const Slice &stream_name, const std::vector &ids, uint64_t *deleted_cnt); + rocksdb::Status DeletePelEntries(const Slice &stream_name, const std::string &group_name, + const std::vector &entry_ids, uint64_t *acknowledged); rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions &options, uint64_t *size); rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t count, StreamInfo *info); rocksdb::Status GetGroupInfo(const Slice &stream_name, diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index a48ce651aec..ad6189db080 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -1240,6 +1240,60 @@ func TestStreamOffset(t *testing.T) { require.NoError(t, erri) require.Equal(t, int64(1), ri[0].Consumers) }) + + t.Run("XACK with different kinds of commands", func(t *testing.T) { + streamName := "mystream" + groupName := "mygroup" + require.NoError(t, rdb.Del(ctx, streamName).Err()) + r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result() + require.NoError(t, err) + require.Equal(t, int64(0), r) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"field1", "data1"}, + }).Err()) + require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err()) + consumerName := "myconsumer" + err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName, + Streams: []string{streamName, ">"}, + Count: 1, + NoAck: false, + }).Err() + require.NoError(t, err) + r, err = rdb.XAck(ctx, streamName, groupName, "1-0").Result() + require.NoError(t, err) + require.Equal(t, int64(1), r) + + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "2-0", + Values: []string{"field1", "data1"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "3-0", + Values: []string{"field1", "data1"}, + }).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "4-0", + Values: []string{"field1", "data1"}, + }).Err()) + err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName, + Streams: []string{streamName, ">"}, + Count: 3, + NoAck: false, + }).Err() + require.NoError(t, err) + r, err = rdb.XAck(ctx, streamName, groupName, "2-0", "3-0", "4-0").Result() + require.NoError(t, err) + require.Equal(t, int64(3), r) + }) } func parseStreamEntryID(id string) (ts int64, seqNum int64) {