Skip to content

Commit

Permalink
Add support of the XACK command (#2169)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yangsx-1 authored Mar 15, 2024
1 parent ca1d5e0 commit d0f4354
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 1 deletion.
38 changes: 37 additions & 1 deletion src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,41 @@

namespace redis {

class CommandXAck : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
stream_name_ = args[1];
group_name_ = args[2];
StreamEntryID tmp_id;
for (size_t i = 3; i < args.size(); ++i) {
auto s = ParseStreamEntryID(args[i], &tmp_id);
if (!s.IsOK()) {
return {Status::RedisParseErr, s.Msg()};
}
entry_ids_.emplace_back(tmp_id);
}

return Status::OK();
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::Stream stream_db(srv->storage, conn->GetNamespace());
uint64_t acknowledged = 0;
auto s = stream_db.DeletePelEntries(stream_name_, group_name_, entry_ids_, &acknowledged);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = redis::Integer(acknowledged);

return Status::OK();
}

private:
std::string stream_name_;
std::string group_name_;
std::vector<StreamEntryID> entry_ids_;
};

class CommandXAdd : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Expand Down Expand Up @@ -1496,7 +1531,8 @@ class CommandXSetId : public Commander {
std::optional<uint64_t> entries_added_;
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAck>("xack", -4, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandXDel>("xdel", -3, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 2, 1),
MakeCmdAttr<CommandXLen>("xlen", -2, "read-only", 1, 1, 1),
Expand Down
42 changes: 42 additions & 0 deletions src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,48 @@ StreamSubkeyType Stream::identifySubkeyType(const rocksdb::Slice &key) const {
return StreamSubkeyType::StreamConsumerMetadata;
}

rocksdb::Status Stream::DeletePelEntries(const Slice &stream_name, const std::string &group_name,
const std::vector<StreamEntryID> &entry_ids, uint64_t *acknowledged) {
*acknowledged = 0;

std::string ns_key = AppendNamespacePrefix(stream_name);

LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata(false);
rocksdb::Status s = GetMetadata(ns_key, &metadata);
if (!s.ok()) {
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}

std::string group_key = internalKeyFromGroupName(ns_key, metadata, group_name);
std::string get_group_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key, &get_group_value);
if (!s.ok()) {
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisStream);
batch->PutLogData(log_data.Encode());

for (const auto &id : entry_ids) {
std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id);
std::string value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, &value);
if (s.ok()) {
*acknowledged += 1;
batch->Delete(stream_cf_handle_, entry_key);
}
}
if (*acknowledged > 0) {
StreamConsumerGroupMetadata group_metadata = decodeStreamConsumerGroupMetadataValue(get_group_value);
group_metadata.pending_number -= *acknowledged;
std::string group_value = encodeStreamConsumerGroupMetadataValue(group_metadata);
batch->Put(stream_cf_handle_, group_key, group_value);
}
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const StreamXGroupCreateOptions &options,
const std::string &group_name) {
if (std::isdigit(group_name[0])) {
Expand Down
2 changes: 2 additions & 0 deletions src/types/redis_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class Stream : public SubKeyScanner {
rocksdb::Status GroupSetId(const Slice &stream_name, const std::string &group_name,
const StreamXGroupCreateOptions &options);
rocksdb::Status DeleteEntries(const Slice &stream_name, const std::vector<StreamEntryID> &ids, uint64_t *deleted_cnt);
rocksdb::Status DeletePelEntries(const Slice &stream_name, const std::string &group_name,
const std::vector<StreamEntryID> &entry_ids, uint64_t *acknowledged);
rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions &options, uint64_t *size);
rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t count, StreamInfo *info);
rocksdb::Status GetGroupInfo(const Slice &stream_name,
Expand Down
54 changes: 54 additions & 0 deletions tests/gocase/unit/type/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,60 @@ func TestStreamOffset(t *testing.T) {
require.NoError(t, erri)
require.Equal(t, int64(1), ri[0].Consumers)
})

t.Run("XACK with different kinds of commands", func(t *testing.T) {
streamName := "mystream"
groupName := "mygroup"
require.NoError(t, rdb.Del(ctx, streamName).Err())
r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result()
require.NoError(t, err)
require.Equal(t, int64(0), r)
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "1-0",
Values: []string{"field1", "data1"},
}).Err())
require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err())
consumerName := "myconsumer"
err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamName, ">"},
Count: 1,
NoAck: false,
}).Err()
require.NoError(t, err)
r, err = rdb.XAck(ctx, streamName, groupName, "1-0").Result()
require.NoError(t, err)
require.Equal(t, int64(1), r)

require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "2-0",
Values: []string{"field1", "data1"},
}).Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "3-0",
Values: []string{"field1", "data1"},
}).Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "4-0",
Values: []string{"field1", "data1"},
}).Err())
err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamName, ">"},
Count: 3,
NoAck: false,
}).Err()
require.NoError(t, err)
r, err = rdb.XAck(ctx, streamName, groupName, "2-0", "3-0", "4-0").Result()
require.NoError(t, err)
require.Equal(t, int64(3), r)
})
}

func parseStreamEntryID(id string) (ts int64, seqNum int64) {
Expand Down

0 comments on commit d0f4354

Please sign in to comment.