Skip to content

Commit

Permalink
Trying to set rocksdb iterator lower_bound if possible (#2275)
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Apr 27, 2024
1 parent ae5784d commit 83c0c0e
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 35 deletions.
32 changes: 20 additions & 12 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,15 +334,17 @@ Status SlotMigrator::sendSnapshotByCmd() {

LOG(INFO) << "[migrate] Start migrating snapshot of slot " << slot;

// Construct key prefix to iterate the keys belong to the target slot
std::string prefix = ComposeSlotKeyPrefix(namespace_, slot);
LOG(INFO) << "[migrate] Iterate keys of slot, key's prefix: " << prefix;

rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = slot_snapshot_;
Slice prefix_slice(prefix);
read_options.iterate_lower_bound = &prefix_slice;
rocksdb::ColumnFamilyHandle *cf_handle = storage_->GetCFHandle(engine::kMetadataColumnFamilyName);
auto iter = util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle));

// Construct key prefix to iterate the keys belong to the target slot
std::string prefix = ComposeSlotKeyPrefix(namespace_, slot);
LOG(INFO) << "[migrate] Iterate keys of slot, key's prefix: " << prefix;

// Seek to the beginning of keys start with 'prefix' and iterate all these keys
for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
// The migrating task has to be stopped, if server role is changed from master to slave
Expand Down Expand Up @@ -738,14 +740,16 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata
cmd = type_to_cmd[metadata.Type()];

std::vector<std::string> user_cmd = {cmd, key.ToString()};
// Construct key prefix to iterate values of the complex type user key
std::string slot_key = AppendNamespacePrefix(key);
std::string prefix_subkey = InternalKey(slot_key, "", metadata.version, true).Encode();
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = slot_snapshot_;
Slice prefix_slice(prefix_subkey);
read_options.iterate_lower_bound = &prefix_slice;
// Should use th raw db iterator to avoid reading uncommitted writes in transaction mode
auto iter = util::UniqueIterator(storage_->GetDB()->NewIterator(read_options));

// Construct key prefix to iterate values of the complex type user key
std::string slot_key = AppendNamespacePrefix(key);
std::string prefix_subkey = InternalKey(slot_key, "", metadata.version, true).Encode();
int item_count = 0;

for (iter->Seek(prefix_subkey); iter->Valid(); iter->Next()) {
Expand Down Expand Up @@ -840,13 +844,15 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata
Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metadata, std::string *restore_cmds) {
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = slot_snapshot_;
// Should use th raw db iterator to avoid reading uncommitted writes in transaction mode
auto iter = util::UniqueIterator(
storage_->GetDB()->NewIterator(read_options, storage_->GetCFHandle(engine::kStreamColumnFamilyName)));

std::string ns_key = AppendNamespacePrefix(key);
// Construct key prefix to iterate values of the stream
std::string prefix_key = InternalKey(ns_key, "", metadata.version, true).Encode();
rocksdb::Slice prefix_key_slice(prefix_key);
read_options.iterate_lower_bound = &prefix_key_slice;

// Should use th raw db iterator to avoid reading uncommitted writes in transaction mode
auto iter = util::UniqueIterator(
storage_->GetDB()->NewIterator(read_options, storage_->GetCFHandle(engine::kStreamColumnFamilyName)));

std::vector<std::string> user_cmd = {type_to_cmd[metadata.Type()], key.ToString()};

Expand Down Expand Up @@ -1197,10 +1203,12 @@ Status SlotMigrator::sendSnapshotByRawKV() {
uint64_t start_ts = util::GetTimeStampMS();
LOG(INFO) << "[migrate] Migrating snapshot of slot " << migrating_slot_ << " by raw key value";

auto prefix = ComposeSlotKeyPrefix(namespace_, migrating_slot_);
rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = slot_snapshot_;
rocksdb::Slice prefix_slice(prefix);
read_options.iterate_lower_bound = &prefix_slice;
engine::DBIterator iter(storage_, read_options);
auto prefix = ComposeSlotKeyPrefix(namespace_, migrating_slot_);

BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_, migrate_batch_bytes_per_sec_);

Expand Down
13 changes: 2 additions & 11 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -433,12 +433,7 @@ rocksdb::Status Database::FlushDB() {
if (!s.ok()) {
return rocksdb::Status::OK();
}
s = storage_->DeleteRange(begin_key, end_key);
if (!s.ok()) {
return s;
}

return rocksdb::Status::OK();
return storage_->DeleteRange(begin_key, end_key);
}

rocksdb::Status Database::FlushAll() {
Expand All @@ -456,11 +451,7 @@ rocksdb::Status Database::FlushAll() {
return rocksdb::Status::OK();
}
auto last_key = iter->key().ToString();
auto s = storage_->DeleteRange(first_key, last_key);
if (!s.ok()) {
return s;
}
return rocksdb::Status::OK();
return storage_->DeleteRange(first_key, last_key);
}

rocksdb::Status Database::Dump(const Slice &user_key, std::vector<std::string> *infos) {
Expand Down
20 changes: 10 additions & 10 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ void Storage::CloseDB() {
}

void Storage::SetWriteOptions(const Config::RocksDB::WriteOptions &config) {
write_opts_.sync = config.sync;
write_opts_.disableWAL = config.disable_wal;
write_opts_.no_slowdown = config.no_slowdown;
write_opts_.low_pri = config.low_pri;
write_opts_.memtable_insert_hint_per_batch = config.memtable_insert_hint_per_batch;
default_write_opts_.sync = config.sync;
default_write_opts_.disableWAL = config.disable_wal;
default_write_opts_.no_slowdown = config.no_slowdown;
default_write_opts_.low_pri = config.low_pri;
default_write_opts_.memtable_insert_hint_per_batch = config.memtable_insert_hint_per_batch;
}

rocksdb::ReadOptions Storage::DefaultScanOptions() const {
Expand Down Expand Up @@ -523,7 +523,7 @@ Status Storage::RestoreFromCheckpoint() {

bool Storage::IsEmptyDB() {
std::unique_ptr<rocksdb::Iterator> iter(
db_->NewIterator(rocksdb::ReadOptions(), GetCFHandle(kMetadataColumnFamilyName)));
db_->NewIterator(DefaultScanOptions(), GetCFHandle(kMetadataColumnFamilyName)));
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Metadata metadata(kRedisNone, false);
// If cannot decode the metadata we think the key is alive, so the db is not empty
Expand Down Expand Up @@ -688,7 +688,7 @@ rocksdb::Status Storage::DeleteRange(const std::string &first_key, const std::st
return s;
}

return Write(write_opts_, batch->GetWriteBatch());
return Write(default_write_opts_, batch->GetWriteBatch());
}

rocksdb::Status Storage::FlushScripts(const rocksdb::WriteOptions &options, rocksdb::ColumnFamilyHandle *cf_handle) {
Expand All @@ -707,7 +707,7 @@ rocksdb::Status Storage::FlushScripts(const rocksdb::WriteOptions &options, rock
}

Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) {
return ApplyWriteBatch(write_opts_, std::move(raw_batch));
return ApplyWriteBatch(default_write_opts_, std::move(raw_batch));
}

Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch) {
Expand Down Expand Up @@ -845,7 +845,7 @@ Status Storage::CommitTxn() {
return Status{Status::NotOK, "cannot commit while not in transaction mode"};
}

auto s = writeToDB(write_opts_, txn_write_batch_->GetWriteBatch());
auto s = writeToDB(default_write_opts_, txn_write_batch_->GetWriteBatch());

is_txn_mode_ = false;
txn_write_batch_ = nullptr;
Expand All @@ -869,7 +869,7 @@ Status Storage::WriteToPropagateCF(const std::string &key, const std::string &va
auto batch = GetWriteBatchBase();
auto cf = GetCFHandle(kPropagateColumnFamilyName);
batch->Put(cf, key, value);
auto s = Write(write_opts_, batch->GetWriteBatch());
auto s = Write(default_write_opts_, batch->GetWriteBatch());
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}
Expand Down
4 changes: 2 additions & 2 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class Storage {
rocksdb::Iterator *NewIterator(const rocksdb::ReadOptions &options);

[[nodiscard]] rocksdb::Status Write(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates);
const rocksdb::WriteOptions &DefaultWriteOptions() { return write_opts_; }
const rocksdb::WriteOptions &DefaultWriteOptions() { return default_write_opts_; }
rocksdb::ReadOptions DefaultScanOptions() const;
rocksdb::ReadOptions DefaultMultiGetOptions() const;
[[nodiscard]] rocksdb::Status Delete(const rocksdb::WriteOptions &options, rocksdb::ColumnFamilyHandle *cf_handle,
Expand Down Expand Up @@ -279,7 +279,7 @@ class Storage {
// command, so it won't have multi transactions to be executed at the same time.
std::unique_ptr<rocksdb::WriteBatchWithIndex> txn_write_batch_;

rocksdb::WriteOptions write_opts_ = rocksdb::WriteOptions();
rocksdb::WriteOptions default_write_opts_ = rocksdb::WriteOptions();

rocksdb::Status writeToDB(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates);
void recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Status &s);
Expand Down
2 changes: 2 additions & 0 deletions src/types/redis_bitmap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ rocksdb::Status Bitmap::GetString(const Slice &user_key, const uint32_t max_btos

rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = ss.GetSnapShot();
Slice prefix_key_slice(prefix_key);
read_options.iterate_lower_bound = &prefix_key_slice;

auto iter = util::UniqueIterator(storage_, read_options);
for (iter->Seek(prefix_key); iter->Valid() && iter->key().starts_with(prefix_key); iter->Next()) {
Expand Down

0 comments on commit 83c0c0e

Please sign in to comment.