Skip to content

Commit

Permalink
add scan_false_limit in all scan
Browse files Browse the repository at this point in the history
  • Loading branch information
root authored and root committed May 4, 2024
1 parent 58d5b0c commit 65a8cd5
Show file tree
Hide file tree
Showing 17 changed files with 153 additions and 93 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ version.h
.idea
.vscode
.cache
.gitignore

compactdb
test
testdb

build
cmake-build-*
debug-build
test
test
8 changes: 5 additions & 3 deletions src/commands/cmd_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*
*/

#include <string>

#include "commander.h"
#include "commands/command_parser.h"
#include "error_constants.h"
Expand Down Expand Up @@ -367,12 +369,12 @@ class CommandHScan : public CommandSubkeyScanBase {
std::vector<std::string> fields;
std::vector<std::string> values;
auto key_name = srv->GetKeyNameFromCursor(cursor_, CursorType::kTypeHash);
auto s = hash_db.Scan(key_, key_name, limit_, prefix_, &fields, &values, pm_);
auto s = hash_db.Scan(key_, key_name, limit_, srv->GetConfig()->max_scan_num, prefix_, &fields, &values);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}

auto cursor = GetNextCursor(srv, fields, CursorType::kTypeHash);
auto &end_cursor = hash_db.end_cursor;
const auto &cursor = srv->GenerateCursorFromKeyName(end_cursor, CursorType::kTypeHash);
std::vector<std::string> entries;
entries.reserve(2 * fields.size());
for (size_t i = 0; i < fields.size(); i++) {
Expand Down
3 changes: 2 additions & 1 deletion src/commands/cmd_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,8 @@ class CommandScan : public CommandScanBase {

std::vector<std::string> keys;
std::string end_key;
auto s = redis_db.Scan(key_name, limit_, prefix_, &keys, &end_key, type_, pm_);
auto s =
redis_db.Scan(key_name, limit_, srv->GetConfig()->max_scan_num, prefix_, &keys, &end_key, type_, match_mode_);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand Down
5 changes: 2 additions & 3 deletions src/commands/cmd_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -427,12 +427,11 @@ class CommandSScan : public CommandSubkeyScanBase {
redis::Set set_db(srv->storage, conn->GetNamespace());
std::vector<std::string> members;
auto key_name = srv->GetKeyNameFromCursor(cursor_, CursorType::kTypeSet);
auto s = set_db.Scan(key_, key_name, limit_, prefix_, &members);
auto s = set_db.Scan(key_, key_name, limit_, srv->GetConfig()->max_scan_num, prefix_, &members, match_mode_);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = CommandScanBase::GenerateOutput(srv, conn, members, CursorType::kTypeSet);
*output = CommandScanBase::GenerateOutput(srv, conn, members, set_db.end_cursor);
return Status::OK();
}
};
Expand Down
6 changes: 3 additions & 3 deletions src/commands/cmd_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1350,12 +1350,12 @@ class CommandZScan : public CommandSubkeyScanBase {
std::vector<std::string> members;
std::vector<double> scores;
auto key_name = srv->GetKeyNameFromCursor(cursor_, CursorType::kTypeZSet);
auto s = zset_db.Scan(key_, key_name, limit_, prefix_, &members, &scores, pm_);
auto s =
zset_db.Scan(key_, key_name, limit_, srv->GetConfig()->max_scan_num, prefix_, &members, &scores, match_mode_);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}

auto cursor = GetNextCursor(srv, members, CursorType::kTypeZSet);
auto cursor = srv->GenerateCursorFromKeyName(zset_db.end_cursor, CursorType::kTypeZSet);
std::vector<std::string> entries;
entries.reserve(2 * members.size());
for (size_t i = 0; i < members.size(); i++) {
Expand Down
33 changes: 14 additions & 19 deletions src/commands/scan_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#pragma once

#include <vector>

#include "commander.h"
#include "commands/command_parser.h"
#include "error_constants.h"
Expand All @@ -44,17 +46,16 @@ class CommandScanBase : public Commander {
while (parser.Good()) {
if (parser.EatEqICase("match")) {
prefix_ = GET_OR_RET(parser.TakeStr());
if(prefix_.size()>=2&&prefix_.front() == '*'&&prefix_.back() =='*'){
prefix_ = prefix_.substr(1,prefix_.size()-2);
pm_ = 2;
}else if (!prefix_.empty() && prefix_.back() == '*') {
if (prefix_.size() >= 2 && prefix_.front() == '*' && prefix_.back() == '*') {
prefix_ = prefix_.substr(1, prefix_.size() - 2);
match_mode_ = MatchType::SUBSTRING;
} else if (!prefix_.empty() && prefix_.back() == '*') {
prefix_ = prefix_.substr(0, prefix_.size() - 1);
pm_ = 0;
} else if(!prefix_.empty()&& prefix_.front()=='*'){
match_mode_ = MatchType::PREFIX;
} else if (!prefix_.empty() && prefix_.front() == '*') {
prefix_ = prefix_.substr(1, prefix_.size() - 1);
pm_ = 1;
}
else {
match_mode_ = MatchType::SUBSTRING;
} else {
return {Status::RedisParseErr, "currently only key prefix matching is supported"};
}
} else if (parser.EatEqICase("count")) {
Expand Down Expand Up @@ -87,16 +88,10 @@ class CommandScanBase : public Commander {
}
}

std::string GenerateOutput(Server *srv, const Connection *conn, const std::vector<std::string> &keys,
CursorType cursor_type) const {
static std::string GenerateOutput(Server *srv, const Connection *conn, const std::vector<std::string> &keys,
const std::string &end_cursor) {
std::vector<std::string> list;
if (keys.size() == static_cast<size_t>(limit_)) {
auto end_cursor = srv->GenerateCursorFromKeyName(keys.back(), cursor_type);
list.emplace_back(redis::BulkString(end_cursor));
} else {
list.emplace_back(redis::BulkString("0"));
}

list.emplace_back(end_cursor);
list.emplace_back(ArrayOfBulkStrings(keys));

return redis::Array(list);
Expand All @@ -107,7 +102,7 @@ class CommandScanBase : public Commander {
std::string prefix_;
int limit_ = 20;
RedisType type_ = kRedisNone;
int pm_ = 0;
MatchType match_mode_;
};

class CommandSubkeyScanBase : public CommandScanBase {
Expand Down
2 changes: 2 additions & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <strings.h>

#include <algorithm>
#include <climits>
#include <cstring>
#include <fstream>
#include <iostream>
Expand Down Expand Up @@ -152,6 +153,7 @@ Config::Config() {
{"pidfile", true, new StringField(&pidfile, kDefaultPidfile)},
{"max-io-mb", false, new IntField(&max_io_mb, 0, 0, INT_MAX)},
{"max-bitmap-to-string-mb", false, new IntField(&max_bitmap_to_string_mb, 16, 0, INT_MAX)},
{"max-scan-num", false, new IntField(&max_scan_num, 2, 0, INT_MAX)},
{"max-db-size", false, new IntField(&max_db_size, 0, 0, INT_MAX)},
{"max-replication-mb", false, new IntField(&max_replication_mb, 0, 0, INT_MAX)},
{"supervised", true, new EnumField<SupervisedMode>(&supervised_mode, supervised_modes, kSupervisedNone)},
Expand Down
1 change: 1 addition & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ struct Config {
int max_replication_mb = 0;
int max_io_mb = 0;
int max_bitmap_to_string_mb = 16;
int max_scan_num = 2;
bool master_use_repl_port = false;
bool purge_backup_on_fullsync = false;
bool auto_resize_block_and_sst = true;
Expand Down
111 changes: 74 additions & 37 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
#include "redis_db.h"

#include <cassert>
#include <cstdint>
#include <ctime>
#include <map>
#include <string>
#include <utility>

#include "cluster/redis_slot.h"
#include "common/scope_exit.h"
#include "config/config.h"
#include "db_util.h"
#include "parse_util.h"
#include "rocksdb/iterator.h"
Expand Down Expand Up @@ -312,10 +314,12 @@ rocksdb::Status Database::Keys(const std::string &prefix, std::vector<std::strin
return rocksdb::Status::OK();
}

rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const std::string &fix,
std::vector<std::string> *keys, std::string *end_cursor, RedisType type, const int pm) {
rocksdb::Status Database::Scan(const std::string &cursor, uint64_t scan_success_limit, uint64_t scan_false_limit,
const std::string &match_str, std::vector<std::string> *keys, std::string *end_cursor,
RedisType type, const MatchType match_mode) {
end_cursor->clear();
uint64_t cnt = 0;
uint64_t cnt_success = 0;
uint64_t cnt_false = 0;
uint16_t slot_start = 0;
std::string ns_prefix;
std::string user_key;
Expand All @@ -329,11 +333,11 @@ rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const
ns_prefix = ComposeNamespaceKey(namespace_, "", false);
if (storage_->IsSlotIdEncoded()) {
slot_start = cursor.empty() ? 0 : GetSlotIdFromKey(cursor);
if (!fix.empty()) {
if (!match_str.empty()) {
PutFixed16(&ns_prefix, slot_start);
}
}
if (pm == 0) ns_prefix.append(fix);
if (match_mode == MatchType::PREFIX) ns_prefix.append(match_str);
if (!cursor.empty()) {
iter->Seek(ns_cursor);
if (iter->Valid()) {
Expand All @@ -346,17 +350,36 @@ rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const
}

uint16_t slot_id = slot_start;

auto scan_match_check = [&]() -> bool {
if (ns_prefix.empty()) {
return true;
}
if (!iter->key().starts_with(ns_prefix)) {
return false;
}
auto key_view = iter->key().ToStringView();
auto sub_key_view = static_cast<Slice>(key_view.substr(ns_prefix.size(), key_view.size() - ns_prefix.size()));
if (match_mode == MatchType::SUFFIX && !sub_key_view.ends_with(match_str)) {
return false;
} else if (match_mode == MatchType::SUBSTRING && sub_key_view.ToStringView().find(match_str) == std::string::npos) {
return false;
}
return true;
};

while (true) {
for (; iter->Valid() && cnt < limit; iter->Next()) {
if (!ns_prefix.empty()) {
if (!iter->key().starts_with(ns_prefix)) break;
auto key_view = iter->key().ToStringView();
auto sub_key_view = static_cast<Slice>(key_view.substr(ns_prefix.size(), key_view.size() - ns_prefix.size()));
if (pm == 1 && !sub_key_view.ends_with(fix))
continue;
else if (pm == 2 && sub_key_view.ToStringView().find(fix) == std::string::npos)
for (; iter->Valid() && cnt_success < scan_success_limit; iter->Next()) {
if (!scan_match_check()) {
cnt_false++;
if (cnt_false < scan_false_limit) {
continue;
} else {
break;
}
}
// The user_key must be updated either when the scan is successful, or when the number of consecutive failures
// reaches its limit
Metadata metadata(kRedisNone, false);
auto s = metadata.Decode(iter->value());
if (!s.ok()) continue;
Expand All @@ -366,16 +389,19 @@ rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const
if (metadata.Expired()) continue;
std::tie(std::ignore, user_key) = ExtractNamespaceKey<std::string>(iter->key(), storage_->IsSlotIdEncoded());
keys->emplace_back(user_key);
cnt++;
cnt_success++;
}
if (!storage_->IsSlotIdEncoded() || fix.empty()) {
if (!keys->empty() && cnt >= limit) {
if (!storage_->IsSlotIdEncoded() || match_str.empty()) {
if (!keys->empty() && (cnt_success >= scan_success_limit)) {
end_cursor->append(user_key);
}
if (cnt_false >= scan_false_limit) {
end_cursor->append(user_key);
}
break;
}

if (cnt >= limit) {
if (cnt_success >= scan_success_limit || cnt_false >= scan_false_limit) {
end_cursor->append(user_key);
break;
}
Expand All @@ -388,8 +414,8 @@ rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const
if (keys->empty()) {
if (iter->Valid()) {
std::tie(std::ignore, user_key) = ExtractNamespaceKey<std::string>(iter->key(), storage_->IsSlotIdEncoded());
auto res = std::mismatch(fix.begin(), fix.end(), user_key.begin());
if (res.first == fix.end()) {
auto res = std::mismatch(match_str.begin(), match_str.end(), user_key.begin());
if (res.first == match_str.end()) {
keys->emplace_back(user_key);
}

Expand All @@ -402,7 +428,7 @@ rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const

ns_prefix = ComposeNamespaceKey(namespace_, "", false);
PutFixed16(&ns_prefix, slot_id);
if (pm == 0) ns_prefix.append(fix);
if (match_mode == MatchType::PREFIX) ns_prefix.append(match_str);
iter->Seek(ns_prefix);
}
return rocksdb::Status::OK();
Expand All @@ -413,13 +439,13 @@ rocksdb::Status Database::RandomKey(const std::string &cursor, std::string *key)

std::string end_cursor;
std::vector<std::string> keys;
auto s = Scan(cursor, RANDOM_KEY_SCAN_LIMIT, "", &keys, &end_cursor);
auto s = Scan(cursor, RANDOM_KEY_SCAN_LIMIT, 1, "", &keys, &end_cursor);
if (!s.ok()) {
return s;
}
if (keys.empty() && !cursor.empty()) {
// if reach the end, restart from beginning
s = Scan("", RANDOM_KEY_SCAN_LIMIT, "", &keys, &end_cursor);
s = Scan("", RANDOM_KEY_SCAN_LIMIT, 1, "", &keys, &end_cursor);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -606,10 +632,13 @@ rocksdb::Status Database::KeyExist(const std::string &key) {
return rocksdb::Status::OK();
}

rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const std::string &cursor, uint64_t limit,
const std::string &subkey_fix, std::vector<std::string> *keys,
std::vector<std::string> *values, const int pm) {
uint64_t cnt = 0;
rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const std::string &cursor,
uint64_t scan_success_limit, uint64_t scan_false_limit,
const std::string &match_string, std::vector<std::string> *keys,
std::vector<std::string> *values, const MatchType match_mode) {
end_cursor = "0";
uint64_t success_cnt = 0;
uint64_t false_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
Metadata metadata(type, false);
LatestSnapShot ss(storage_);
Expand All @@ -618,8 +647,9 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const

rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
auto iter = util::UniqueIterator(storage_, read_options);
std::string match_prefix_key =
InternalKey(ns_key, pm == 0 ? subkey_fix : "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string match_prefix_key = InternalKey(ns_key, match_mode == MatchType::PREFIX ? match_string : "",
metadata.version, storage_->IsSlotIdEncoded())
.Encode();

std::string start_key;
if (!cursor.empty()) {
Expand All @@ -633,23 +663,30 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const
// because we already return that key in the last scan
continue;
}
if (pm == 0 && !iter->key().starts_with(match_prefix_key)) {
std::cout << iter->key() << " " << match_prefix_key << "\n";
continue;
}
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
auto sub_key = ikey.GetSubKey();
if (pm == 1 && !sub_key.ends_with(subkey_fix)) {
if (scan_false_limit > 0 && false_cnt >= scan_false_limit) {
end_cursor = sub_key.ToString();
break;
}
if (match_mode == MatchType::PREFIX && !sub_key.starts_with(match_prefix_key)) {
false_cnt++;
break;
}
if (match_mode == MatchType::SUFFIX && !sub_key.ends_with(match_string)) {
false_cnt++;
continue;
} else if (pm == 2 && sub_key.ToString().find(subkey_fix) == std::string::npos) {
} else if (match_mode == MatchType::SUBSTRING && sub_key.ToString().find(match_string) == std::string::npos) {
false_cnt++;
continue;
}
keys->emplace_back(ikey.GetSubKey().ToString());
keys->emplace_back(sub_key.ToString());
if (values != nullptr) {
values->emplace_back(iter->value().ToString());
}
cnt++;
if (limit > 0 && cnt >= limit) {
success_cnt++;
if (scan_success_limit > 0 && success_cnt >= scan_success_limit) {
end_cursor = sub_key.ToString();
break;
}
}
Expand Down
Loading

0 comments on commit 65a8cd5

Please sign in to comment.