diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index 884c8ace7aa..a9c466a43bc 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -334,15 +334,17 @@ Status SlotMigrator::sendSnapshotByCmd() { LOG(INFO) << "[migrate] Start migrating snapshot of slot " << slot; + // Construct key prefix to iterate the keys belong to the target slot + std::string prefix = ComposeSlotKeyPrefix(namespace_, slot); + LOG(INFO) << "[migrate] Iterate keys of slot, key's prefix: " << prefix; + rocksdb::ReadOptions read_options = storage_->DefaultScanOptions(); read_options.snapshot = slot_snapshot_; + Slice prefix_slice(prefix); + read_options.iterate_lower_bound = &prefix_slice; rocksdb::ColumnFamilyHandle *cf_handle = storage_->GetCFHandle(engine::kMetadataColumnFamilyName); auto iter = util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle)); - // Construct key prefix to iterate the keys belong to the target slot - std::string prefix = ComposeSlotKeyPrefix(namespace_, slot); - LOG(INFO) << "[migrate] Iterate keys of slot, key's prefix: " << prefix; - // Seek to the beginning of keys start with 'prefix' and iterate all these keys for (iter->Seek(prefix); iter->Valid(); iter->Next()) { // The migrating task has to be stopped, if server role is changed from master to slave @@ -738,14 +740,16 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata cmd = type_to_cmd[metadata.Type()]; std::vector user_cmd = {cmd, key.ToString()}; + // Construct key prefix to iterate values of the complex type user key + std::string slot_key = AppendNamespacePrefix(key); + std::string prefix_subkey = InternalKey(slot_key, "", metadata.version, true).Encode(); rocksdb::ReadOptions read_options = storage_->DefaultScanOptions(); read_options.snapshot = slot_snapshot_; + Slice prefix_slice(prefix_subkey); + read_options.iterate_lower_bound = &prefix_slice; // Should use th raw db iterator to avoid reading uncommitted writes in transaction mode auto iter = util::UniqueIterator(storage_->GetDB()->NewIterator(read_options)); - // Construct key prefix to iterate values of the complex type user key - std::string slot_key = AppendNamespacePrefix(key); - std::string prefix_subkey = InternalKey(slot_key, "", metadata.version, true).Encode(); int item_count = 0; for (iter->Seek(prefix_subkey); iter->Valid(); iter->Next()) { @@ -840,13 +844,15 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metadata, std::string *restore_cmds) { rocksdb::ReadOptions read_options = storage_->DefaultScanOptions(); read_options.snapshot = slot_snapshot_; - // Should use th raw db iterator to avoid reading uncommitted writes in transaction mode - auto iter = util::UniqueIterator( - storage_->GetDB()->NewIterator(read_options, storage_->GetCFHandle(engine::kStreamColumnFamilyName))); - std::string ns_key = AppendNamespacePrefix(key); // Construct key prefix to iterate values of the stream std::string prefix_key = InternalKey(ns_key, "", metadata.version, true).Encode(); + rocksdb::Slice prefix_key_slice(prefix_key); + read_options.iterate_lower_bound = &prefix_key_slice; + + // Should use th raw db iterator to avoid reading uncommitted writes in transaction mode + auto iter = util::UniqueIterator( + storage_->GetDB()->NewIterator(read_options, storage_->GetCFHandle(engine::kStreamColumnFamilyName))); std::vector user_cmd = {type_to_cmd[metadata.Type()], key.ToString()}; @@ -1197,10 +1203,12 @@ Status SlotMigrator::sendSnapshotByRawKV() { uint64_t start_ts = util::GetTimeStampMS(); LOG(INFO) << "[migrate] Migrating snapshot of slot " << migrating_slot_ << " by raw key value"; + auto prefix = ComposeSlotKeyPrefix(namespace_, migrating_slot_); rocksdb::ReadOptions read_options = storage_->DefaultScanOptions(); read_options.snapshot = slot_snapshot_; + rocksdb::Slice prefix_slice(prefix); + read_options.iterate_lower_bound = &prefix_slice; engine::DBIterator iter(storage_, read_options); - auto prefix = ComposeSlotKeyPrefix(namespace_, migrating_slot_); BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_, migrate_batch_bytes_per_sec_); diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc index a0e06615e76..3ff1fa0a374 100644 --- a/src/storage/redis_db.cc +++ b/src/storage/redis_db.cc @@ -433,12 +433,7 @@ rocksdb::Status Database::FlushDB() { if (!s.ok()) { return rocksdb::Status::OK(); } - s = storage_->DeleteRange(begin_key, end_key); - if (!s.ok()) { - return s; - } - - return rocksdb::Status::OK(); + return storage_->DeleteRange(begin_key, end_key); } rocksdb::Status Database::FlushAll() { @@ -456,11 +451,7 @@ rocksdb::Status Database::FlushAll() { return rocksdb::Status::OK(); } auto last_key = iter->key().ToString(); - auto s = storage_->DeleteRange(first_key, last_key); - if (!s.ok()) { - return s; - } - return rocksdb::Status::OK(); + return storage_->DeleteRange(first_key, last_key); } rocksdb::Status Database::Dump(const Slice &user_key, std::vector *infos) { diff --git a/src/storage/storage.cc b/src/storage/storage.cc index 2133645c40c..1759d2e7a8c 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -101,11 +101,11 @@ void Storage::CloseDB() { } void Storage::SetWriteOptions(const Config::RocksDB::WriteOptions &config) { - write_opts_.sync = config.sync; - write_opts_.disableWAL = config.disable_wal; - write_opts_.no_slowdown = config.no_slowdown; - write_opts_.low_pri = config.low_pri; - write_opts_.memtable_insert_hint_per_batch = config.memtable_insert_hint_per_batch; + default_write_opts_.sync = config.sync; + default_write_opts_.disableWAL = config.disable_wal; + default_write_opts_.no_slowdown = config.no_slowdown; + default_write_opts_.low_pri = config.low_pri; + default_write_opts_.memtable_insert_hint_per_batch = config.memtable_insert_hint_per_batch; } rocksdb::ReadOptions Storage::DefaultScanOptions() const { @@ -523,7 +523,7 @@ Status Storage::RestoreFromCheckpoint() { bool Storage::IsEmptyDB() { std::unique_ptr iter( - db_->NewIterator(rocksdb::ReadOptions(), GetCFHandle(kMetadataColumnFamilyName))); + db_->NewIterator(DefaultScanOptions(), GetCFHandle(kMetadataColumnFamilyName))); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { Metadata metadata(kRedisNone, false); // If cannot decode the metadata we think the key is alive, so the db is not empty @@ -688,7 +688,7 @@ rocksdb::Status Storage::DeleteRange(const std::string &first_key, const std::st return s; } - return Write(write_opts_, batch->GetWriteBatch()); + return Write(default_write_opts_, batch->GetWriteBatch()); } rocksdb::Status Storage::FlushScripts(const rocksdb::WriteOptions &options, rocksdb::ColumnFamilyHandle *cf_handle) { @@ -707,7 +707,7 @@ rocksdb::Status Storage::FlushScripts(const rocksdb::WriteOptions &options, rock } Status Storage::ReplicaApplyWriteBatch(std::string &&raw_batch) { - return ApplyWriteBatch(write_opts_, std::move(raw_batch)); + return ApplyWriteBatch(default_write_opts_, std::move(raw_batch)); } Status Storage::ApplyWriteBatch(const rocksdb::WriteOptions &options, std::string &&raw_batch) { @@ -845,7 +845,7 @@ Status Storage::CommitTxn() { return Status{Status::NotOK, "cannot commit while not in transaction mode"}; } - auto s = writeToDB(write_opts_, txn_write_batch_->GetWriteBatch()); + auto s = writeToDB(default_write_opts_, txn_write_batch_->GetWriteBatch()); is_txn_mode_ = false; txn_write_batch_ = nullptr; @@ -869,7 +869,7 @@ Status Storage::WriteToPropagateCF(const std::string &key, const std::string &va auto batch = GetWriteBatchBase(); auto cf = GetCFHandle(kPropagateColumnFamilyName); batch->Put(cf, key, value); - auto s = Write(write_opts_, batch->GetWriteBatch()); + auto s = Write(default_write_opts_, batch->GetWriteBatch()); if (!s.ok()) { return {Status::NotOK, s.ToString()}; } diff --git a/src/storage/storage.h b/src/storage/storage.h index 44888f1ca4a..208499b58fc 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -163,7 +163,7 @@ class Storage { rocksdb::Iterator *NewIterator(const rocksdb::ReadOptions &options); [[nodiscard]] rocksdb::Status Write(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates); - const rocksdb::WriteOptions &DefaultWriteOptions() { return write_opts_; } + const rocksdb::WriteOptions &DefaultWriteOptions() { return default_write_opts_; } rocksdb::ReadOptions DefaultScanOptions() const; rocksdb::ReadOptions DefaultMultiGetOptions() const; [[nodiscard]] rocksdb::Status Delete(const rocksdb::WriteOptions &options, rocksdb::ColumnFamilyHandle *cf_handle, @@ -279,7 +279,7 @@ class Storage { // command, so it won't have multi transactions to be executed at the same time. std::unique_ptr txn_write_batch_; - rocksdb::WriteOptions write_opts_ = rocksdb::WriteOptions(); + rocksdb::WriteOptions default_write_opts_ = rocksdb::WriteOptions(); rocksdb::Status writeToDB(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates); void recordKeyspaceStat(const rocksdb::ColumnFamilyHandle *column_family, const rocksdb::Status &s); diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc index 3651f910f88..9a08c1fe5fd 100644 --- a/src/types/redis_bitmap.cc +++ b/src/types/redis_bitmap.cc @@ -157,6 +157,8 @@ rocksdb::Status Bitmap::GetString(const Slice &user_key, const uint32_t max_btos rocksdb::ReadOptions read_options = storage_->DefaultScanOptions(); read_options.snapshot = ss.GetSnapShot(); + Slice prefix_key_slice(prefix_key); + read_options.iterate_lower_bound = &prefix_key_slice; auto iter = util::UniqueIterator(storage_, read_options); for (iter->Seek(prefix_key); iter->Valid() && iter->key().starts_with(prefix_key); iter->Next()) {