Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(stream): change the encoding of stream consumer group #2384

Merged
merged 14 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 108 additions & 112 deletions src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

namespace redis {

std::string_view consumerGroupMetadataDelimiter = "METADATA";
const char *errSetEntryIdSmallerThanLastGenerated =
"The ID specified in XSETID is smaller than the target stream top item";
const char *errEntriesAddedSmallerThanStreamSize =
Expand Down Expand Up @@ -169,16 +168,21 @@ rocksdb::Status Stream::Add(const Slice &stream_name, const StreamAddOptions &op
std::string Stream::internalKeyFromGroupName(const std::string &ns_key, const StreamMetadata &metadata,
const std::string &group_name) const {
std::string sub_key;
PutFixed64(&sub_key, (uint64_t)(-1));
PutFixed8(&sub_key, (uint8_t)StreamSubkeyType::StreamConsumerGroupMetadata);
PutFixed64(&sub_key, group_name.size());
sub_key += group_name;
sub_key += consumerGroupMetadataDelimiter;
std::string entry_key = InternalKey(ns_key, sub_key, metadata.version, storage_->IsSlotIdEncoded()).Encode();
return entry_key;
}

std::string Stream::groupNameFromInternalKey(rocksdb::Slice key) const {
InternalKey ikey(key, storage_->IsSlotIdEncoded());
Slice group_name_metadata = ikey.GetSubKey();
uint64_t entry_delimiter = 0;
GetFixed64(&group_name_metadata, &entry_delimiter);
uint8_t type_delimiter = 0;
GetFixed8(&group_name_metadata, &type_delimiter);
uint64_t len = 0;
GetFixed64(&group_name_metadata, &len);
std::string group_name;
Expand Down Expand Up @@ -214,18 +218,23 @@ StreamConsumerGroupMetadata Stream::decodeStreamConsumerGroupMetadataValue(const
std::string Stream::internalKeyFromConsumerName(const std::string &ns_key, const StreamMetadata &metadata,
const std::string &group_name, const std::string &consumer_name) const {
std::string sub_key;
PutFixed64(&sub_key, (uint64_t)(-1));
PutFixed8(&sub_key, (uint8_t)StreamSubkeyType::StreamConsumerMetadata);
PutFixed64(&sub_key, group_name.size());
sub_key += group_name;
PutFixed64(&sub_key, consumer_name.size());
sub_key += consumer_name;
sub_key += consumerGroupMetadataDelimiter;
std::string entry_key = InternalKey(ns_key, sub_key, metadata.version, storage_->IsSlotIdEncoded()).Encode();
return entry_key;
}

std::string Stream::consumerNameFromInternalKey(rocksdb::Slice key) const {
InternalKey ikey(key, storage_->IsSlotIdEncoded());
Slice subkey = ikey.GetSubKey();
uint64_t entry_delimiter = 0;
GetFixed64(&subkey, &entry_delimiter);
uint8_t type_delimiter = 0;
GetFixed8(&subkey, &type_delimiter);
uint64_t group_name_len = 0;
GetFixed64(&subkey, &group_name_len);
subkey.remove_prefix(group_name_len);
Expand Down Expand Up @@ -254,6 +263,8 @@ StreamConsumerMetadata Stream::decodeStreamConsumerMetadataValue(const std::stri
std::string Stream::internalPelKeyFromGroupAndEntryId(const std::string &ns_key, const StreamMetadata &metadata,
const std::string &group_name, const StreamEntryID &id) {
std::string sub_key;
PutFixed64(&sub_key, (uint64_t)(-1));
PutFixed8(&sub_key, (uint8_t)StreamSubkeyType::StreamPelEntry);
PutFixed64(&sub_key, group_name.size());
sub_key += group_name;
PutFixed64(&sub_key, id.ms);
Expand All @@ -265,6 +276,10 @@ std::string Stream::internalPelKeyFromGroupAndEntryId(const std::string &ns_key,
StreamEntryID Stream::groupAndEntryIdFromPelInternalKey(rocksdb::Slice key, std::string &group_name) {
InternalKey ikey(key, storage_->IsSlotIdEncoded());
Slice subkey = ikey.GetSubKey();
uint64_t entry_delimiter = 0;
GetFixed64(&subkey, &entry_delimiter);
uint8_t type_delimiter = 0;
GetFixed8(&subkey, &type_delimiter);
uint64_t group_name_len = 0;
GetFixed64(&subkey, &group_name_len);
group_name = subkey.ToString().substr(0, group_name_len);
Expand Down Expand Up @@ -298,21 +313,14 @@ StreamPelEntry Stream::decodeStreamPelEntryValue(const std::string &value) {
StreamSubkeyType Stream::identifySubkeyType(const rocksdb::Slice &key) const {
InternalKey ikey(key, storage_->IsSlotIdEncoded());
Slice subkey = ikey.GetSubKey();
const size_t entry_id_size = sizeof(StreamEntryID);
if (subkey.size() <= entry_id_size) {
uint64_t entry_delimiter = 0;
GetFixed64(&subkey, &entry_delimiter);
if (entry_delimiter != (uint64_t)(-1)) {
return StreamSubkeyType::StreamEntry;
}
uint64_t group_name_len = 0;
GetFixed64(&subkey, &group_name_len);
std::string without_group_name = subkey.ToString().substr(group_name_len);
const size_t metadata_delimiter_size = consumerGroupMetadataDelimiter.size();
if (without_group_name.size() <= metadata_delimiter_size) {
return StreamSubkeyType::StreamConsumerGroupMetadata;
}
if (without_group_name.size() <= entry_id_size) {
return StreamSubkeyType::StreamPelEntry;
}
return StreamSubkeyType::StreamConsumerMetadata;
uint8_t type_delimiter = 0;
GetFixed8(&subkey, &type_delimiter);
return (StreamSubkeyType)type_delimiter;
}

rocksdb::Status Stream::DeletePelEntries(const Slice &stream_name, const std::string &group_name,
Expand Down Expand Up @@ -571,59 +579,54 @@ rocksdb::Status Stream::AutoClaim(const Slice &stream_name, const std::string &g
auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
uint64_t total_claimed_count = 0;
for (iter->SeekToFirst(); iter->Valid() && count > 0 && attempts > 0; iter->Next()) {
if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
std::string tmp_group_name;
StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
if (tmp_group_name != group_name) {
continue;
}
std::string tmp_group_name;
StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);

if (options.exclude_start && entry_id == options.start_id) {
continue;
}
if (options.exclude_start && entry_id == options.start_id) {
continue;
}

attempts--;
attempts--;

StreamPelEntry penl_entry = decodeStreamPelEntryValue(iter->value().ToString());
if ((now_ms - penl_entry.last_delivery_time_ms) < options.min_idle_time_ms) {
StreamPelEntry penl_entry = decodeStreamPelEntryValue(iter->value().ToString());
if ((now_ms - penl_entry.last_delivery_time_ms) < options.min_idle_time_ms) {
continue;
}

auto entry_key = internalKeyFromEntryID(ns_key, metadata, entry_id);
std::string entry_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, &entry_value);
if (!s.ok()) {
if (s.IsNotFound()) {
deleted_entries.push_back(entry_id);
batch->Delete(stream_cf_handle_, iter->key());
--count;
continue;
}
return s;
}

auto entry_key = internalKeyFromEntryID(ns_key, metadata, entry_id);
std::string entry_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, &entry_value);
if (!s.ok()) {
if (s.IsNotFound()) {
deleted_entries.push_back(entry_id);
batch->Delete(stream_cf_handle_, iter->key());
--count;
continue;
}
return s;
StreamEntry entry(entry_id.ToString(), {});
if (!options.just_id) {
auto rv_status = DecodeRawStreamEntryValue(entry_value, &entry.values);
if (!rv_status.OK()) {
return rocksdb::Status::InvalidArgument(rv_status.Msg());
}
}

StreamEntry entry(entry_id.ToString(), {});
if (!options.just_id) {
auto rv_status = DecodeRawStreamEntryValue(entry_value, &entry.values);
if (!rv_status.OK()) {
return rocksdb::Status::InvalidArgument(rv_status.Msg());
}
}
pending_entries.emplace_back(std::move(entry));
--count;

pending_entries.emplace_back(std::move(entry));
--count;

if (penl_entry.consumer_name != consumer_name) {
++total_claimed_count;
claimed_consumer_entity_count[penl_entry.consumer_name] += 1;
penl_entry.consumer_name = consumer_name;
penl_entry.last_delivery_time_ms = now_ms;
// Increment the delivery attempts counter unless JUSTID option provided
if (!options.just_id) {
penl_entry.last_delivery_count += 1;
}
batch->Put(stream_cf_handle_, iter->key(), encodeStreamPelEntryValue(penl_entry));
if (penl_entry.consumer_name != consumer_name) {
++total_claimed_count;
claimed_consumer_entity_count[penl_entry.consumer_name] += 1;
penl_entry.consumer_name = consumer_name;
penl_entry.last_delivery_time_ms = now_ms;
// Increment the delivery attempts counter unless JUSTID option provided
if (!options.just_id) {
penl_entry.last_delivery_count += 1;
}
batch->Put(stream_cf_handle_, iter->key(), encodeStreamPelEntryValue(penl_entry));
}
}

Expand All @@ -648,10 +651,8 @@ rocksdb::Status Stream::AutoClaim(const Slice &stream_name, const std::string &g

bool has_next_entry = false;
for (; iter->Valid(); iter->Next()) {
if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
has_next_entry = true;
break;
}
has_next_entry = true;
break;
}

if (has_next_entry) {
Expand Down Expand Up @@ -885,14 +886,9 @@ rocksdb::Status Stream::DestroyConsumer(const Slice &stream_name, const std::str

auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
std::string tmp_group_name;
FMT_MAYBE_UNUSED StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
if (tmp_group_name != group_name) continue;
StreamPelEntry pel_entry = decodeStreamPelEntryValue(iter->value().ToString());
if (pel_entry.consumer_name == consumer_name) {
batch->Delete(stream_cf_handle_, iter->key());
}
StreamPelEntry pel_entry = decodeStreamPelEntryValue(iter->value().ToString());
if (pel_entry.consumer_name == consumer_name) {
batch->Delete(stream_cf_handle_, iter->key());
}
}
batch->Delete(stream_cf_handle_, consumer_key);
Expand Down Expand Up @@ -1319,9 +1315,12 @@ rocksdb::Status Stream::GetGroupInfo(const Slice &stream_name,
rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s;

std::string subkey_type_delimiter = std::to_string((uint64_t)(-1));
PutFixed8(&subkey_type_delimiter, (uint8_t)StreamSubkeyType::StreamConsumerGroupMetadata);
std::string next_version_prefix_key =
InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
InternalKey(ns_key, subkey_type_delimiter, metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
std::string prefix_key =
InternalKey(ns_key, subkey_type_delimiter, metadata.version, storage_->IsSlotIdEncoded()).Encode();

rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
LatestSnapShot ss(storage_);
Expand All @@ -1333,13 +1332,11 @@ rocksdb::Status Stream::GetGroupInfo(const Slice &stream_name,

auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamConsumerGroupMetadata) {
std::string group_name = groupNameFromInternalKey(iter->key());
StreamConsumerGroupMetadata cg_metadata = decodeStreamConsumerGroupMetadataValue(iter->value().ToString());
CheckLagValid(metadata, cg_metadata);
std::pair<std::string, StreamConsumerGroupMetadata> tmp_item(group_name, cg_metadata);
group_metadata.push_back(tmp_item);
}
std::string group_name = groupNameFromInternalKey(iter->key());
StreamConsumerGroupMetadata cg_metadata = decodeStreamConsumerGroupMetadataValue(iter->value().ToString());
CheckLagValid(metadata, cg_metadata);
std::pair<std::string, StreamConsumerGroupMetadata> tmp_item(group_name, cg_metadata);
group_metadata.push_back(tmp_item);
}
return rocksdb::Status::OK();
}
Expand All @@ -1352,9 +1349,15 @@ rocksdb::Status Stream::GetConsumerInfo(
rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s;

std::string subkey_type_delimiter;
PutFixed64(&subkey_type_delimiter, (uint64_t)(-1));
PutFixed8(&subkey_type_delimiter, (uint8_t)StreamSubkeyType::StreamConsumerMetadata);
PutFixed64(&subkey_type_delimiter, group_name.size());
subkey_type_delimiter += group_name;
std::string next_version_prefix_key =
InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
InternalKey(ns_key, subkey_type_delimiter, metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
std::string prefix_key =
InternalKey(ns_key, subkey_type_delimiter, metadata.version, storage_->IsSlotIdEncoded()).Encode();

rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
LatestSnapShot ss(storage_);
Expand All @@ -1366,14 +1369,10 @@ rocksdb::Status Stream::GetConsumerInfo(

auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamConsumerMetadata) {
std::string cur_group_name = groupNameFromInternalKey(iter->key());
if (cur_group_name != group_name) continue;
std::string consumer_name = consumerNameFromInternalKey(iter->key());
StreamConsumerMetadata c_metadata = decodeStreamConsumerMetadataValue(iter->value().ToString());
std::pair<std::string, StreamConsumerMetadata> tmp_item(consumer_name, c_metadata);
consumer_metadata.push_back(tmp_item);
}
std::string consumer_name = consumerNameFromInternalKey(iter->key());
StreamConsumerMetadata c_metadata = decodeStreamConsumerMetadataValue(iter->value().ToString());
std::pair<std::string, StreamConsumerMetadata> tmp_item(consumer_name, c_metadata);
consumer_metadata.push_back(tmp_item);
}
return rocksdb::Status::OK();
}
Expand Down Expand Up @@ -1515,29 +1514,26 @@ rocksdb::Status Stream::RangeWithPending(const Slice &stream_name, StreamRangeOp
auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
uint64_t count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
std::string tmp_group_name;
StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
if (tmp_group_name != group_name) continue;
StreamPelEntry pel_entry = decodeStreamPelEntryValue(iter->value().ToString());
if (pel_entry.consumer_name != consumer_name) continue;
std::string raw_value;
rocksdb::Status st = getEntryRawValue(ns_key, metadata, entry_id, &raw_value);
if (!st.ok() && !st.IsNotFound()) {
return st;
}
std::vector<std::string> values;
auto rv = DecodeRawStreamEntryValue(raw_value, &values);
if (!rv.IsOK()) {
return rocksdb::Status::InvalidArgument(rv.Msg());
}
entries->emplace_back(entry_id.ToString(), std::move(values));
pel_entry.last_delivery_count += 1;
pel_entry.last_delivery_time_ms = now_ms;
batch->Put(stream_cf_handle_, iter->key(), encodeStreamPelEntryValue(pel_entry));
++count;
if (count >= options.count) break;
std::string tmp_group_name;
StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
StreamPelEntry pel_entry = decodeStreamPelEntryValue(iter->value().ToString());
if (pel_entry.consumer_name != consumer_name) continue;
std::string raw_value;
rocksdb::Status st = getEntryRawValue(ns_key, metadata, entry_id, &raw_value);
if (!st.ok() && !st.IsNotFound()) {
return st;
}
std::vector<std::string> values;
auto rv = DecodeRawStreamEntryValue(raw_value, &values);
if (!rv.IsOK()) {
return rocksdb::Status::InvalidArgument(rv.Msg());
}
entries->emplace_back(entry_id.ToString(), std::move(values));
pel_entry.last_delivery_count += 1;
pel_entry.last_delivery_time_ms = now_ms;
batch->Put(stream_cf_handle_, iter->key(), encodeStreamPelEntryValue(pel_entry));
++count;
if (count >= options.count) break;
}
}
batch->Put(stream_cf_handle_, group_key, encodeStreamConsumerGroupMetadataValue(consumergroup_metadata));
Expand Down
2 changes: 1 addition & 1 deletion tests/gocase/unit/type/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ var streamTests = func(t *testing.T, enabledRESP3 string) {
})

t.Run("XRANGE exclusive ranges", func(t *testing.T) {
ids := []string{"0-1", "0-18446744073709551615", "1-0", "42-0", "42-42", "18446744073709551615-18446744073709551614", "18446744073709551615-18446744073709551615"}
ids := []string{"0-1", "0-18446744073709551615", "1-0", "42-0", "42-42"}
total := len(ids)
require.NoError(t, rdb.Do(ctx, "MULTI").Err())
// DEL returns "QUEUED" here, so we use Do to avoid ParseInt.
Expand Down
Loading