Skip to content

Commit

Permalink
add scan substring,suffix
Browse files Browse the repository at this point in the history
  • Loading branch information
root authored and root committed May 3, 2024
1 parent ae5784d commit 58d5b0c
Show file tree
Hide file tree
Showing 12 changed files with 111 additions and 37 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,5 @@ testdb

build
cmake-build-*
debug-build
test
2 changes: 1 addition & 1 deletion src/commands/cmd_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ class CommandHScan : public CommandSubkeyScanBase {
std::vector<std::string> fields;
std::vector<std::string> values;
auto key_name = srv->GetKeyNameFromCursor(cursor_, CursorType::kTypeHash);
auto s = hash_db.Scan(key_, key_name, limit_, prefix_, &fields, &values);
auto s = hash_db.Scan(key_, key_name, limit_, prefix_, &fields, &values, pm_);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand Down
2 changes: 1 addition & 1 deletion src/commands/cmd_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ class CommandScan : public CommandScanBase {

std::vector<std::string> keys;
std::string end_key;
auto s = redis_db.Scan(key_name, limit_, prefix_, &keys, &end_key, type_);
auto s = redis_db.Scan(key_name, limit_, prefix_, &keys, &end_key, type_, pm_);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand Down
2 changes: 1 addition & 1 deletion src/commands/cmd_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ class CommandZScan : public CommandSubkeyScanBase {
std::vector<std::string> members;
std::vector<double> scores;
auto key_name = srv->GetKeyNameFromCursor(cursor_, CursorType::kTypeZSet);
auto s = zset_db.Scan(key_, key_name, limit_, prefix_, &members, &scores);
auto s = zset_db.Scan(key_, key_name, limit_, prefix_, &members, &scores, pm_);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand Down
14 changes: 11 additions & 3 deletions src/commands/scan_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,22 @@ class CommandScanBase : public Commander {

return ParseAdditionalFlags<true>(parser);
}

template <bool IsScan, typename Parser>
Status ParseAdditionalFlags(Parser &parser) {
while (parser.Good()) {
if (parser.EatEqICase("match")) {
prefix_ = GET_OR_RET(parser.TakeStr());
if (!prefix_.empty() && prefix_.back() == '*') {
if(prefix_.size()>=2&&prefix_.front() == '*'&&prefix_.back() =='*'){
prefix_ = prefix_.substr(1,prefix_.size()-2);
pm_ = 2;
}else if (!prefix_.empty() && prefix_.back() == '*') {
prefix_ = prefix_.substr(0, prefix_.size() - 1);
} else {
pm_ = 0;
} else if(!prefix_.empty()&& prefix_.front()=='*'){
prefix_ = prefix_.substr(1, prefix_.size() - 1);
pm_ = 1;
}
else {
return {Status::RedisParseErr, "currently only key prefix matching is supported"};
}
} else if (parser.EatEqICase("count")) {
Expand Down Expand Up @@ -100,6 +107,7 @@ class CommandScanBase : public Commander {
std::string prefix_;
int limit_ = 20;
RedisType type_ = kRedisNone;
int pm_ = 0;
};

class CommandSubkeyScanBase : public CommandScanBase {
Expand Down
52 changes: 32 additions & 20 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@

#include "redis_db.h"

#include <cassert>
#include <ctime>
#include <map>
#include <string>
#include <utility>

#include "cluster/redis_slot.h"
#include "common/scope_exit.h"
#include "db_util.h"
#include "parse_util.h"
#include "rocksdb/iterator.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "server/server.h"
#include "storage/iterator.h"
Expand Down Expand Up @@ -309,8 +312,8 @@ rocksdb::Status Database::Keys(const std::string &prefix, std::vector<std::strin
return rocksdb::Status::OK();
}

rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const std::string &prefix,
std::vector<std::string> *keys, std::string *end_cursor, RedisType type) {
rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const std::string &fix,
std::vector<std::string> *keys, std::string *end_cursor, RedisType type, const int pm) {
end_cursor->clear();
uint64_t cnt = 0;
uint16_t slot_start = 0;
Expand All @@ -323,17 +326,14 @@ rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const
auto iter = util::UniqueIterator(storage_, read_options, metadata_cf_handle_);

std::string ns_cursor = AppendNamespacePrefix(cursor);
ns_prefix = ComposeNamespaceKey(namespace_, "", false);
if (storage_->IsSlotIdEncoded()) {
slot_start = cursor.empty() ? 0 : GetSlotIdFromKey(cursor);
ns_prefix = ComposeNamespaceKey(namespace_, "", false);
if (!prefix.empty()) {
if (!fix.empty()) {
PutFixed16(&ns_prefix, slot_start);
ns_prefix.append(prefix);
}
} else {
ns_prefix = AppendNamespacePrefix(prefix);
}

if (pm == 0) ns_prefix.append(fix);
if (!cursor.empty()) {
iter->Seek(ns_cursor);
if (iter->Valid()) {
Expand All @@ -348,8 +348,14 @@ rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const
uint16_t slot_id = slot_start;
while (true) {
for (; iter->Valid() && cnt < limit; iter->Next()) {
if (!ns_prefix.empty() && !iter->key().starts_with(ns_prefix)) {
break;
if (!ns_prefix.empty()) {
if (!iter->key().starts_with(ns_prefix)) break;
auto key_view = iter->key().ToStringView();
auto sub_key_view = static_cast<Slice>(key_view.substr(ns_prefix.size(), key_view.size() - ns_prefix.size()));
if (pm == 1 && !sub_key_view.ends_with(fix))
continue;
else if (pm == 2 && sub_key_view.ToStringView().find(fix) == std::string::npos)
continue;
}
Metadata metadata(kRedisNone, false);
auto s = metadata.Decode(iter->value());
Expand All @@ -362,7 +368,7 @@ rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const
keys->emplace_back(user_key);
cnt++;
}
if (!storage_->IsSlotIdEncoded() || prefix.empty()) {
if (!storage_->IsSlotIdEncoded() || fix.empty()) {
if (!keys->empty() && cnt >= limit) {
end_cursor->append(user_key);
}
Expand All @@ -382,22 +388,21 @@ rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const
if (keys->empty()) {
if (iter->Valid()) {
std::tie(std::ignore, user_key) = ExtractNamespaceKey<std::string>(iter->key(), storage_->IsSlotIdEncoded());
auto res = std::mismatch(prefix.begin(), prefix.end(), user_key.begin());
if (res.first == prefix.end()) {
auto res = std::mismatch(fix.begin(), fix.end(), user_key.begin());
if (res.first == fix.end()) {
keys->emplace_back(user_key);
}

end_cursor->append(user_key);
}
} else {
end_cursor->append(user_key);
}
break;
}

ns_prefix = ComposeNamespaceKey(namespace_, "", false);
PutFixed16(&ns_prefix, slot_id);
ns_prefix.append(prefix);
if (pm == 0) ns_prefix.append(fix);
iter->Seek(ns_prefix);
}
return rocksdb::Status::OK();
Expand Down Expand Up @@ -602,8 +607,8 @@ rocksdb::Status Database::KeyExist(const std::string &key) {
}

rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const std::string &cursor, uint64_t limit,
const std::string &subkey_prefix, std::vector<std::string> *keys,
std::vector<std::string> *values) {
const std::string &subkey_fix, std::vector<std::string> *keys,
std::vector<std::string> *values, const int pm) {
uint64_t cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
Metadata metadata(type, false);
Expand All @@ -614,7 +619,7 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
auto iter = util::UniqueIterator(storage_, read_options);
std::string match_prefix_key =
InternalKey(ns_key, subkey_prefix, metadata.version, storage_->IsSlotIdEncoded()).Encode();
InternalKey(ns_key, pm == 0 ? subkey_fix : "", metadata.version, storage_->IsSlotIdEncoded()).Encode();

std::string start_key;
if (!cursor.empty()) {
Expand All @@ -628,10 +633,17 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const
// because we already return that key in the last scan
continue;
}
if (!iter->key().starts_with(match_prefix_key)) {
break;
if (pm == 0 && !iter->key().starts_with(match_prefix_key)) {
std::cout << iter->key() << " " << match_prefix_key << "\n";
continue;
}
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
auto sub_key = ikey.GetSubKey();
if (pm == 1 && !sub_key.ends_with(subkey_fix)) {
continue;
} else if (pm == 2 && sub_key.ToString().find(subkey_fix) == std::string::npos) {
continue;
}
keys->emplace_back(ikey.GetSubKey().ToString());
if (values != nullptr) {
values->emplace_back(iter->value().ToString());
Expand Down
4 changes: 2 additions & 2 deletions src/storage/redis_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class Database {
KeyNumStats *stats = nullptr);
[[nodiscard]] rocksdb::Status Scan(const std::string &cursor, uint64_t limit, const std::string &prefix,
std::vector<std::string> *keys, std::string *end_cursor = nullptr,
RedisType type = kRedisNone);
RedisType type = kRedisNone, int pm = 0);
[[nodiscard]] rocksdb::Status RandomKey(const std::string &cursor, std::string *key);
std::string AppendNamespacePrefix(const Slice &user_key);
[[nodiscard]] rocksdb::Status FindKeyRangeWithPrefix(const std::string &prefix, const std::string &prefix_end,
Expand Down Expand Up @@ -139,7 +139,7 @@ class SubKeyScanner : public redis::Database {
explicit SubKeyScanner(engine::Storage *storage, const std::string &ns) : Database(storage, ns) {}
rocksdb::Status Scan(RedisType type, const Slice &user_key, const std::string &cursor, uint64_t limit,
const std::string &subkey_prefix, std::vector<std::string> *keys,
std::vector<std::string> *values = nullptr);
std::vector<std::string> *values = nullptr, int pm = 0);
};

class WriteBatchLogData {
Expand Down
4 changes: 2 additions & 2 deletions src/types/redis_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,8 @@ rocksdb::Status Hash::GetAll(const Slice &user_key, std::vector<FieldValue> *fie

rocksdb::Status Hash::Scan(const Slice &user_key, const std::string &cursor, uint64_t limit,
const std::string &field_prefix, std::vector<std::string> *fields,
std::vector<std::string> *values) {
return SubKeyScanner::Scan(kRedisHash, user_key, cursor, limit, field_prefix, fields, values);
std::vector<std::string> *values, const int pm) {
return SubKeyScanner::Scan(kRedisHash, user_key, cursor, limit, field_prefix, fields, values, pm);
}

rocksdb::Status Hash::RandField(const Slice &user_key, int64_t command_count, std::vector<FieldValue> *field_values,
Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class Hash : public SubKeyScanner {
HashFetchType type = HashFetchType::kAll);
rocksdb::Status Scan(const Slice &user_key, const std::string &cursor, uint64_t limit,
const std::string &field_prefix, std::vector<std::string> *fields,
std::vector<std::string> *values = nullptr);
std::vector<std::string> *values = nullptr, int pm = 0);
rocksdb::Status RandField(const Slice &user_key, int64_t command_count, std::vector<FieldValue> *field_values,
HashFetchType type = HashFetchType::kOnlyKey);

Expand Down
6 changes: 3 additions & 3 deletions src/types/redis_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -810,10 +810,10 @@ rocksdb::Status ZSet::Union(const std::vector<KeyWeight> &keys_weights, Aggregat

rocksdb::Status ZSet::Scan(const Slice &user_key, const std::string &cursor, uint64_t limit,
const std::string &member_prefix, std::vector<std::string> *members,
std::vector<double> *scores) {
std::vector<double> *scores, const int pm) {
if (scores != nullptr) {
std::vector<std::string> values;
auto s = SubKeyScanner::Scan(kRedisZSet, user_key, cursor, limit, member_prefix, members, &values);
auto s = SubKeyScanner::Scan(kRedisZSet, user_key, cursor, limit, member_prefix, members, &values, pm);
if (!s.ok()) return s;

for (const auto &value : values) {
Expand All @@ -822,7 +822,7 @@ rocksdb::Status ZSet::Scan(const Slice &user_key, const std::string &cursor, uin
}
return s;
}
return SubKeyScanner::Scan(kRedisZSet, user_key, cursor, limit, member_prefix, members);
return SubKeyScanner::Scan(kRedisZSet, user_key, cursor, limit, member_prefix, members, nullptr, pm);
}

rocksdb::Status ZSet::MGet(const Slice &user_key, const std::vector<Slice> &members,
Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_zset.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class ZSet : public SubKeyScanner {
rocksdb::Status Score(const Slice &user_key, const Slice &member, double *score);
rocksdb::Status Scan(const Slice &user_key, const std::string &cursor, uint64_t limit,
const std::string &member_prefix, std::vector<std::string> *members,
std::vector<double> *scores = nullptr);
std::vector<double> *scores = nullptr, int pm = 0);
rocksdb::Status Overwrite(const Slice &user_key, const MemberScores &mscores);
rocksdb::Status InterStore(const Slice &dst, const std::vector<KeyWeight> &keys_weights,
AggregateMethod aggregate_method, uint64_t *saved_cnt);
Expand Down
56 changes: 54 additions & 2 deletions tests/gocase/unit/scan/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func ScanTest(t *testing.T, rdb *redis.Client, ctx context.Context) {
require.Equal(t, []string{"foo", "foobar"}, keys)
})

t.Run("HSCAN with PATTERN", func(t *testing.T) {
t.Run("HSCAN with prefix PATTERN", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "mykey").Err())
require.NoError(t, rdb.HMSet(ctx, "mykey", "foo", 1, "fab", 2, "fiz", 3, "foobar", 10, 1, "a", 2, "b", 3, "c", 4, "d").Err())
keys, _, err := rdb.HScan(ctx, "mykey", 0, "foo*", 10000).Result()
Expand All @@ -187,7 +187,27 @@ func ScanTest(t *testing.T, rdb *redis.Client, ctx context.Context) {
require.Equal(t, []string{"1", "10", "foo", "foobar"}, keys)
})

t.Run("ZSCAN with PATTERN", func(t *testing.T) {
t.Run("HSCAN with subkey PATTERN", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "mykey").Err())
require.NoError(t, rdb.HMSet(ctx, "mykey", "foo", 1, "fab", 2, "fiz", 3, "foobar", 10, "a", 2, "b", 3, "c", 4).Err())
keys, _, err := rdb.HScan(ctx, "mykey", 0, "*oo*", 10000).Result()
require.NoError(t, err)
slices.Sort(keys)
keys = slices.Compact(keys)
require.Equal(t, []string{"1", "10", "foo", "foobar"}, keys)
})

t.Run("HSCAN with suffix PATTERN", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "mykey").Err())
require.NoError(t, rdb.HMSet(ctx, "mykey", "foo", 1, "fab", 2, "fiz", 3, "foobar", 10, "a", 2, "b", 3, "c", 4).Err())
keys, _, err := rdb.HScan(ctx, "mykey", 0, "*b", 10000).Result()
require.NoError(t, err)
slices.Sort(keys)
keys = slices.Compact(keys)
require.Equal(t, []string{"2", "3", "b", "fab"}, keys)
})

t.Run("ZSCAN with prefix PATTERN", func(t *testing.T) {
members := []redis.Z{
{Score: 1, Member: "foo"},
{Score: 2, Member: "fab"},
Expand All @@ -203,6 +223,38 @@ func ScanTest(t *testing.T, rdb *redis.Client, ctx context.Context) {
require.Equal(t, []string{"1", "10", "foo", "foobar"}, keys)
})

t.Run("ZSCAN with subkey PATTERN", func(t *testing.T) {
members := []redis.Z{
{Score: 1, Member: "foo"},
{Score: 2, Member: "fab"},
{Score: 3, Member: "fiz"},
{Score: 10, Member: "foobar"},
}
require.NoError(t, rdb.Del(ctx, "mykey").Err())
require.NoError(t, rdb.ZAdd(ctx, "mykey", members...).Err())
keys, _, err := rdb.ZScan(ctx, "mykey", 0, "*oo*", 10000).Result()
require.NoError(t, err)
slices.Sort(keys)
keys = slices.Compact(keys)
require.Equal(t, []string{"1", "10", "foo", "foobar"}, keys)
})

t.Run("ZSCAN with suffix PATTERN", func(t *testing.T) {
members := []redis.Z{
{Score: 1, Member: "foo"},
{Score: 2, Member: "fab"},
{Score: 3, Member: "fiz"},
{Score: 10, Member: "foobar"},
}
require.NoError(t, rdb.Del(ctx, "mykey").Err())
require.NoError(t, rdb.ZAdd(ctx, "mykey", members...).Err())
keys, _, err := rdb.ZScan(ctx, "mykey", 0, "*b", 10000).Result()
require.NoError(t, err)
slices.Sort(keys)
keys = slices.Compact(keys)
require.Equal(t, []string{"2", "fab"}, keys)
})

for _, test := range []struct {
name string
keyGen func(int) interface{}
Expand Down

0 comments on commit 58d5b0c

Please sign in to comment.