diff --git a/src/commands/cmd_key.cc b/src/commands/cmd_key.cc index 093e0cc2bfe..256a773b9c8 100644 --- a/src/commands/cmd_key.cc +++ b/src/commands/cmd_key.cc @@ -23,6 +23,7 @@ #include "commander.h" #include "commands/ttl_util.h" #include "error_constants.h" +#include "rocksdb/slice.h" #include "server/redis_reply.h" #include "server/server.h" #include "storage/redis_db.h" @@ -356,6 +357,49 @@ class CommandDel : public Commander { } }; +class CommandDelPrefix : public Commander { + public: + Status Execute(engine::Context & /*ctx*/, Server *srv, Connection * /*conn*/, std::string *output) override { + if (args_.size() < 2) return {Status::NotOK, "Missing prefix argument"}; + + std::string prefix = args_[1]; + if (prefix.empty()) return {Status::NotOK, "Prefix cannot be empty"}; + + auto db = srv->storage->GetDB(); + if (!db) return {Status::NotOK, "DB not initialized"}; + + // Creating an iterator with ReadOptions + rocksdb::ReadOptions read_options; + std::unique_ptr it(db->NewIterator(read_options)); + if (!it) return {Status::NotOK, "Failed to create iterator"}; + + rocksdb::WriteBatch batch; + int delete_count = 0; + + for (it->Seek(prefix); it->Valid(); it->Next()) { + if (!it->key().starts_with(rocksdb::Slice(prefix))) break; + batch.Delete(it->key()); + delete_count++; + } + + if (!it->status().ok()) { + return {Status::NotOK, "Iterator error: " + it->status().ToString()}; + } + + // Only write batch if there are keys to delete + if (delete_count > 0) { + rocksdb::WriteOptions write_options; + rocksdb::Status s = db->Write(write_options, &batch); + if (!s.ok()) { + return {Status::NotOK, "Write batch error: " + s.ToString()}; + } + } + + *output = std::to_string(delete_count); + return Status::OK(); + } +}; + class CommandRename : public Commander { public: Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { @@ -598,6 +642,7 @@ REDIS_REGISTER_COMMANDS(Key, MakeCmdAttr("ttl", 2, "read-only", 1, 1 MakeCmdAttr("rename", 3, "write", 1, 2, 1), MakeCmdAttr("renamenx", 3, "write", 1, 2, 1), MakeCmdAttr("copy", -3, "write", 1, 2, 1), + MakeCmdAttr("delprefix", 2, "write", 1, 1, 1), MakeCmdAttr>("sort", -2, "write slow", 1, 1, 1), MakeCmdAttr>("sort_ro", -2, "read-only slow", 1, 1, 1), MakeCmdAttr("kmetadata", 2, "read-only", 1, 1, 1)) diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc index 1ff52c8b008..7e43aaf65ff 100644 --- a/src/storage/redis_db.cc +++ b/src/storage/redis_db.cc @@ -28,8 +28,10 @@ #include "common/string_util.h" #include "db_util.h" #include "parse_util.h" +#include "rocksdb/db.h" #include "rocksdb/iterator.h" #include "rocksdb/status.h" +#include "rocksdb/write_batch.h" #include "storage/iterator.h" #include "storage/redis_metadata.h" #include "storage/storage.h" @@ -934,4 +936,36 @@ bool RedisSortObject::SortCompare(const RedisSortObject &a, const RedisSortObjec } } +rocksdb::Status Database::DeletePrefix(engine::Context & /*ctx*/, const Slice &prefix, uint64_t *deleted_cnt) { + *deleted_cnt = 0; + + auto db = storage_->GetDB(); + if (!db) return rocksdb::Status::NotFound("DB not initialized"); + + rocksdb::ReadOptions read_options; + std::unique_ptr it(db->NewIterator(read_options)); + if (!it) return rocksdb::Status::NotFound("Failed to create iterator"); + + rocksdb::WriteBatch batch; + for (it->Seek(prefix); it->Valid(); it->Next()) { + if (!it->key().starts_with(prefix)) break; + batch.Delete(it->key()); + (*deleted_cnt)++; + } + + if (!it->status().ok()) { + return rocksdb::Status::NotFound("Iterator error: " + it->status().ToString()); + } + + if (*deleted_cnt > 0) { + rocksdb::WriteOptions write_options; + rocksdb::Status s = db->Write(write_options, &batch); + if (!s.ok()) { + return rocksdb::Status::NotFound("Write batch error: " + s.ToString()); + } + } + + return rocksdb::Status::OK(); +} + } // namespace redis diff --git a/src/storage/redis_db.h b/src/storage/redis_db.h index 9563b1e1d6a..0bea0333842 100644 --- a/src/storage/redis_db.h +++ b/src/storage/redis_db.h @@ -113,6 +113,7 @@ class Database { [[nodiscard]] rocksdb::Status MDel(engine::Context &ctx, const std::vector &keys, uint64_t *deleted_cnt); [[nodiscard]] rocksdb::Status Exists(engine::Context &ctx, const std::vector &keys, int *ret); [[nodiscard]] rocksdb::Status TTL(engine::Context &ctx, const Slice &user_key, int64_t *ttl); + [[nodiscard]] rocksdb::Status DeletePrefix(engine::Context &ctx, const Slice &prefix, uint64_t *deleted_cnt); [[nodiscard]] rocksdb::Status GetExpireTime(engine::Context &ctx, const Slice &user_key, uint64_t *timestamp); [[nodiscard]] rocksdb::Status Type(engine::Context &ctx, const Slice &key, RedisType *type); [[nodiscard]] rocksdb::Status Dump(engine::Context &ctx, const Slice &user_key, std::vector *infos); diff --git a/tests/gocase/unit/delprefix/cmd_delprefix_test.go b/tests/gocase/unit/delprefix/cmd_delprefix_test.go new file mode 100644 index 00000000000..8fcce2e5639 --- /dev/null +++ b/tests/gocase/unit/delprefix/cmd_delprefix_test.go @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package deleteprefix + +import ( + "context" + "testing" + "time" + + "github.com/apache/kvrocks/tests/gocase/util" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" +) + +var instance *util.KvrocksServer + +func setup(t *testing.T) *redis.Client { + instance = util.StartServer(t, map[string]string{}) + + // Initialize client with authentication if needed + client := instance.NewClientWithOption(&redis.Options{ + Addr: instance.HostPort(), + }) + require.Eventually(t, func() bool { + err := client.Ping(context.Background()).Err() + return err == nil || err.Error() == "NOAUTH Authentication required." + }, time.Minute, time.Second) + + return client +} + +func teardown() { + if instance != nil { + instance.Close() + } +} + +func TestDelPrefix(t *testing.T) { + client := setup(t) + defer teardown() + + // Test cases + t.Run("DELPREFIX_ALL", func(t *testing.T) { + require.NoError(t, client.Set(context.Background(), "test:key1", "value1", 0).Err()) + require.NoError(t, client.Set(context.Background(), "test:key2", "value2", 0).Err()) + + _, err := client.Do(context.Background(), "DELPREFIX", "test").Result() + require.NoError(t, err) + }) + + t.Run("DELPREFIX_BY_PREFIX", func(t *testing.T) { + require.NoError(t, client.Set(context.Background(), "sample:key1", "value1", 0).Err()) + require.NoError(t, client.Set(context.Background(), "sample:key2", "value2", 0).Err()) + + _, err := client.Do(context.Background(), "DELPREFIX", "sample").Result() + require.NoError(t, err) + }) + + t.Run("Delprefix_reject_invalid_input", func(t *testing.T) { + _, err := client.Do(context.Background(), "DELPREFIX").Result() + require.Error(t, err) + }) +}