Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): add support of the XAUTOCLAIM command #2373

Merged
merged 12 commits into from
Jun 27, 2024
Merged
94 changes: 94 additions & 0 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*
*/

#include <algorithm>
#include <limits>
#include <memory>
#include <stdexcept>

Expand Down Expand Up @@ -358,6 +360,97 @@ class CommandXClaim : public Commander {
}
};

class CommandAutoClaim : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 1);
key_name_ = GET_OR_RET(parser.TakeStr());
group_name_ = GET_OR_RET(parser.TakeStr());
consumer_name_ = GET_OR_RET(parser.TakeStr());
if (auto parse_status = parser.TakeInt<uint64_t>(); !parse_status.IsOK()) {
return {Status::RedisParseErr, "Invalid min-idle-time argument for XAUTOCLAIM"};
} else {
options_.min_idle_time_ms = parse_status.GetValue();
}

auto start_str = GET_OR_RET(parser.TakeStr());
if (!start_str.empty() && start_str.front() == '(') {
options_.exclude_start = true;
start_str = start_str.substr(1);
}
if (!options_.exclude_start && start_str == "-") {
options_.start_id = StreamEntryID::Minimum();
} else {
auto parse_status = ParseRangeStart(start_str, &options_.start_id);
if (!parse_status.IsOK()) {
return parse_status;
}
}

if (parser.EatEqICase("count")) {
uint64_t count = GET_OR_RET(parser.TakeInt<uint64_t>());
constexpr uint64_t min_count = 1;
uint64_t max_count = std::numeric_limits<int64_t>::max() /
(std::max(static_cast<uint64_t>(sizeof(StreamEntryID)), options_.attempts_factors));
if (count < min_count || count > max_count) {
return {Status::RedisParseErr, "COUNT must be > 0"};
}
options_.count = count;
}

if (parser.Good() && parser.EatEqICase("justid")) {
options_.just_id = true;
}

return Status::OK();
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::Stream stream_db(srv->storage, conn->GetNamespace());
StreamAutoClaimResult result;
auto s = stream_db.AutoClaim(key_name_, group_name_, consumer_name_, options_, &result);
if (!s.ok()) {
if (s.IsNotFound()) {
return {Status::RedisExecErr,
"NOGROUP No such key '" + key_name_ + "' or consumer group '" + group_name_ + "'"};
}
return {Status::RedisExecErr, s.ToString()};
}
return sendResults(conn, result, output);
}

private:
Status sendResults(Connection *conn, const StreamAutoClaimResult &result, std::string *output) const {
output->append(redis::MultiLen(3));
output->append(redis::BulkString(result.next_claim_id));
output->append(redis::MultiLen(result.entries.size()));
for (const auto &item : result.entries) {
if (options_.just_id) {
output->append(redis::BulkString(item.key));
} else {
output->append(redis::MultiLen(2));
output->append(redis::BulkString(item.key));
output->append(redis::MultiLen(item.values.size()));
for (const auto &value_item : item.values) {
output->append(redis::BulkString(value_item));
}
}
}

output->append(redis::MultiLen(result.deleted_ids.size()));
for (const auto &item : result.deleted_ids) {
output->append(redis::BulkString(item));
}

return Status::OK();
}

std::string key_name_;
std::string group_name_;
std::string consumer_name_;
StreamAutoClaimOptions options_;
};

class CommandXGroup : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Expand Down Expand Up @@ -1647,6 +1740,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAck>("xack", -4, "write no-dbsize-ch
MakeCmdAttr<CommandXAdd>("xadd", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandXDel>("xdel", -3, "write no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandXClaim>("xclaim", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandAutoClaim>("xautoclaim", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 2, 1),
MakeCmdAttr<CommandXLen>("xlen", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandXInfo>("xinfo", -2, "read-only", 0, 0, 0),
Expand Down
160 changes: 160 additions & 0 deletions src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,166 @@ rocksdb::Status Stream::ClaimPelEntries(const Slice &stream_name, const std::str
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status Stream::AutoClaim(const Slice &stream_name, const std::string &group_name,
const std::string &consumer_name, const StreamAutoClaimOptions &options,
StreamAutoClaimResult *result) {
if (options.exclude_start && options.start_id.IsMaximum()) {
return rocksdb::Status::InvalidArgument("invalid start ID for the interval");
}

std::string ns_key = AppendNamespacePrefix(stream_name);
StreamMetadata metadata(false);

LockGuard guard(storage_->GetLockManager(), ns_key);
auto s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) { // not found will be caught by outside with no such key or consumer group
return s;
}

std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name);
std::string get_consumer_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (s.IsNotFound()) {
int created_number = 0;
s = createConsumerWithoutLock(stream_name, group_name, consumer_name, &created_number);
if (!s.ok()) {
return s;
}
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
if (!s.ok()) {
return s;
}
}

StreamConsumerMetadata current_consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
std::map<std::string, uint64_t> claimed_consumer_entity_count;
std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.start_id);
std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, StreamEntryID::Maximum());

LatestSnapShot ss{storage_};
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = ss.GetSnapShot();
rocksdb::Slice lower_bound(prefix_key);
rocksdb::Slice upper_bound(end_key);
read_options.iterate_lower_bound = &lower_bound;
read_options.iterate_upper_bound = &upper_bound;

auto count = options.count;
uint64_t attempts = options.attempts_factors * count;
auto now_ms = util::GetTimeStampMS();
std::vector<StreamEntryID> deleted_entries;
std::vector<StreamEntry> pending_entries;

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisStream);
batch->PutLogData(log_data.Encode());

auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
uint64_t total_claimed_count = 0;
for (iter->SeekToFirst(); iter->Valid() && count > 0 && attempts > 0; iter->Next()) {
if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
std::string tmp_group_name;
StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
if (tmp_group_name != group_name) {
continue;
}

if (options.exclude_start && entry_id == options.start_id) {
continue;
}

attempts--;

StreamPelEntry penl_entry = decodeStreamPelEntryValue(iter->value().ToString());
if ((now_ms - penl_entry.last_delivery_time_ms) < options.min_idle_time_ms) {
continue;
}

auto entry_key = internalKeyFromEntryID(ns_key, metadata, entry_id);
std::string entry_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, &entry_value);
if (!s.ok()) {
if (s.IsNotFound()) {
deleted_entries.push_back(entry_id);
batch->Delete(stream_cf_handle_, iter->key());
--count;
continue;
}
return s;
}

StreamEntry entry(entry_id.ToString(), {});
if (!options.just_id) {
auto rv_status = DecodeRawStreamEntryValue(entry_value, &entry.values);
if (!rv_status.OK()) {
return rocksdb::Status::InvalidArgument(rv_status.Msg());
}
}

pending_entries.emplace_back(std::move(entry));
--count;

if (penl_entry.consumer_name != consumer_name) {
++total_claimed_count;
claimed_consumer_entity_count[penl_entry.consumer_name] += 1;
penl_entry.consumer_name = consumer_name;
penl_entry.last_delivery_time_ms = now_ms;
// Increment the delivery attempts counter unless JUSTID option provided
if (!options.just_id) {
penl_entry.last_delivery_count += 1;
}
batch->Put(stream_cf_handle_, iter->key(), encodeStreamPelEntryValue(penl_entry));
}
}
}

if (total_claimed_count > 0 && !pending_entries.empty()) {
current_consumer_metadata.pending_number += total_claimed_count;
current_consumer_metadata.last_attempted_interaction_ms = now_ms;

batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(current_consumer_metadata));

for (const auto &[consumer, count] : claimed_consumer_entity_count) {
std::string tmp_consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer);
std::string tmp_consumer_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, tmp_consumer_key, &tmp_consumer_value);
if (!s.ok()) {
return s;
}
StreamConsumerMetadata tmp_consumer_metadata = decodeStreamConsumerMetadataValue(tmp_consumer_value);
tmp_consumer_metadata.pending_number -= count;
batch->Put(stream_cf_handle_, tmp_consumer_key, encodeStreamConsumerMetadataValue(tmp_consumer_metadata));
}
}

bool has_next_entry = false;
for (; iter->Valid(); iter->Next()) {
if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
has_next_entry = true;
break;
}
}

if (has_next_entry) {
std::string tmp_group_name;
StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
result->next_claim_id = entry_id.ToString();
} else {
result->next_claim_id = StreamEntryID::Minimum().ToString();
}

result->entries = std::move(pending_entries);
result->deleted_ids.clear();
torwig marked this conversation as resolved.
Show resolved Hide resolved
result->deleted_ids.reserve(deleted_entries.size());
std::transform(deleted_entries.cbegin(), deleted_entries.cend(), std::back_inserter(result->deleted_ids),
[](const StreamEntryID &id) { return id.ToString(); });

return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const StreamXGroupCreateOptions &options,
const std::string &group_name) {
if (std::isdigit(group_name[0])) {
Expand Down
2 changes: 2 additions & 0 deletions src/types/redis_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class Stream : public SubKeyScanner {
const std::string &consumer_name, uint64_t min_idle_time_ms,
const std::vector<StreamEntryID> &entry_ids, const StreamClaimOptions &options,
StreamClaimResult *result);
rocksdb::Status AutoClaim(const Slice &stream_name, const std::string &group_name, const std::string &consumer_name,
const StreamAutoClaimOptions &options, StreamAutoClaimResult *result);
rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions &options, uint64_t *size);
rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t count, StreamInfo *info);
rocksdb::Status GetGroupInfo(const Slice &stream_name,
Expand Down
15 changes: 15 additions & 0 deletions src/types/redis_stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ struct StreamClaimOptions {
StreamEntryID last_delivered_id;
};

struct StreamAutoClaimOptions {
uint64_t min_idle_time_ms;
uint64_t count = 100;
uint64_t attempts_factors = 10;
StreamEntryID start_id;
bool just_id = false;
bool exclude_start = false;
};

struct StreamConsumerGroupMetadata {
uint64_t consumer_number = 0;
uint64_t pending_number = 0;
Expand Down Expand Up @@ -224,6 +233,12 @@ struct StreamClaimResult {
std::vector<StreamEntry> entries;
};

struct StreamAutoClaimResult {
std::string next_claim_id;
std::vector<StreamEntry> entries;
std::vector<std::string> deleted_ids;
};

Status IncrementStreamEntryID(StreamEntryID *id);
Status ParseStreamEntryID(const std::string &input, StreamEntryID *id);
StatusOr<std::unique_ptr<NextStreamEntryIDGenerationStrategy>> ParseNextStreamEntryIDStrategy(const std::string &input);
Expand Down
Loading
Loading