Skip to content

Commit

Permalink
Add support of the command COPY (#2238)
Browse files Browse the repository at this point in the history
  • Loading branch information
Chiro11 committed Apr 12, 2024
1 parent ab3a61d commit ed5937c
Show file tree
Hide file tree
Showing 5 changed files with 1,205 additions and 40 deletions.
84 changes: 65 additions & 19 deletions src/commands/cmd_key.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,13 @@ class CommandMoveX : public Commander {
break;
}

bool ret = true;
bool key_exist = true;
Database::CopyResult res = Database::CopyResult::DONE;
std::string ns_key = redis.AppendNamespacePrefix(key);
std::string new_ns_key = ComposeNamespaceKey(ns, key, srv->storage->IsSlotIdEncoded());
auto s = redis.Move(ns_key, new_ns_key, true, &ret, &key_exist);
auto s = redis.Copy(ns_key, new_ns_key, true, true, &res);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

if (ret && key_exist) {
if (res == Database::CopyResult::DONE) {
*output = redis::Integer(1);
} else {
*output = redis::Integer(0);
Expand Down Expand Up @@ -222,7 +221,7 @@ class CommandExpireAt : public Commander {

timestamp_ = *parse_result;

return Commander::Parse(args);
return Status::OK();
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -346,14 +345,12 @@ class CommandRename : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::Database redis(srv->storage, conn->GetNamespace());
bool ret = true;
bool key_exist = true;
Database::CopyResult res = Database::CopyResult::DONE;
std::string ns_key = redis.AppendNamespacePrefix(args_[1]);
std::string new_ns_key = redis.AppendNamespacePrefix(args_[2]);
auto s = redis.Move(ns_key, new_ns_key, false, &ret, &key_exist);
auto s = redis.Copy(ns_key, new_ns_key, false, true, &res);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
if (!key_exist) return {Status::RedisExecErr, rocksdb::Status::InvalidArgument("ERR no such key").ToString()};

if (res == Database::CopyResult::KEY_NOT_EXIST) return {Status::RedisExecErr, "no such key"};
*output = redis::SimpleString("OK");
return Status::OK();
}
Expand All @@ -363,20 +360,68 @@ class CommandRenameNX : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::Database redis(srv->storage, conn->GetNamespace());
bool ret = true;
bool key_exist = true;
Database::CopyResult res = Database::CopyResult::DONE;
std::string ns_key = redis.AppendNamespacePrefix(args_[1]);
std::string new_ns_key = redis.AppendNamespacePrefix(args_[2]);
auto s = redis.Move(ns_key, new_ns_key, true, &ret, &key_exist);
auto s = redis.Copy(ns_key, new_ns_key, true, true, &res);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
if (!key_exist) return {Status::RedisExecErr, rocksdb::Status::InvalidArgument("ERR no such key").ToString()};
if (ret) {
*output = redis::Integer(1);
} else {
*output = redis::Integer(0);
switch (res) {
case Database::CopyResult::KEY_NOT_EXIST:
return {Status::RedisExecErr, "no such key"};
case Database::CopyResult::DONE:
*output = redis::Integer(1);
break;
case Database::CopyResult::KEY_ALREADY_EXIST:
*output = redis::Integer(0);
break;
}
return Status::OK();
}
};

class CommandCopy : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 3);
while (parser.Good()) {
if (parser.EatEqICase("db")) {
auto db_num = GET_OR_RET(parser.TakeInt());
// There's only one database in Kvrocks, so the DB must be 0 here.
if (db_num != 0) {
return {Status::RedisParseErr, errInvalidSyntax};
}
} else if (parser.EatEqICase("replace")) {
replace_ = true;
} else {
return parser.InvalidSyntax();
}
}

return Status::OK();
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::Database redis(srv->storage, conn->GetNamespace());
Database::CopyResult res = Database::CopyResult::DONE;
std::string ns_key = redis.AppendNamespacePrefix(args_[1]);
std::string new_ns_key = redis.AppendNamespacePrefix(args_[2]);
auto s = redis.Copy(ns_key, new_ns_key, !replace_, false, &res);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
switch (res) {
case Database::CopyResult::KEY_NOT_EXIST:
return {Status::RedisExecErr, "no such key"};
case Database::CopyResult::DONE:
*output = redis::Integer(1);
break;
case Database::CopyResult::KEY_ALREADY_EXIST:
*output = redis::Integer(0);
break;
}
return Status::OK();
}

private:
bool replace_ = false;
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandTTL>("ttl", 2, "read-only", 1, 1, 1),
Expand All @@ -396,6 +441,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandTTL>("ttl", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandDel>("del", -2, "write no-dbsize-check", 1, -1, 1),
MakeCmdAttr<CommandDel>("unlink", -2, "write no-dbsize-check", 1, -1, 1),
MakeCmdAttr<CommandRename>("rename", 3, "write", 1, 2, 1),
MakeCmdAttr<CommandRenameNX>("renamenx", 3, "write", 1, 2, 1), )
MakeCmdAttr<CommandRenameNX>("renamenx", 3, "write", 1, 2, 1),
MakeCmdAttr<CommandCopy>("copy", -3, "write", 1, 2, 1), )

} // namespace redis
16 changes: 9 additions & 7 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -708,30 +708,30 @@ rocksdb::Status Database::typeInternal(const Slice &key, RedisType *type) {
return rocksdb::Status::OK();
}

rocksdb::Status Database::Move(const std::string &key, const std::string &new_key, bool nx, bool *ret,
bool *key_exist) {
*ret = true;
*key_exist = true;
rocksdb::Status Database::Copy(const std::string &key, const std::string &new_key, bool nx, bool delete_old,
CopyResult *res) {
std::vector<std::string> lock_keys = {key, new_key};
MultiLockGuard guard(storage_->GetLockManager(), lock_keys);

RedisType type = kRedisNone;
auto s = typeInternal(key, &type);
if (!s.ok()) return s;
if (type == kRedisNone) {
*key_exist = false;
*res = CopyResult::KEY_NOT_EXIST;
return rocksdb::Status::OK();
}

if (nx) {
int exist = 0;
if (s = existsInternal({new_key}, &exist), !s.ok()) return s;
if (exist > 0) {
*ret = false;
*res = CopyResult::KEY_ALREADY_EXIST;
return rocksdb::Status::OK();
}
}

*res = CopyResult::DONE;

if (key == new_key) return rocksdb::Status::OK();

auto batch = storage_->GetWriteBatchBase();
Expand All @@ -741,8 +741,10 @@ rocksdb::Status Database::Move(const std::string &key, const std::string &new_ke
engine::DBIterator iter(storage_, rocksdb::ReadOptions());
iter.Seek(key);

if (delete_old) {
batch->Delete(metadata_cf_handle_, key);
}
// copy metadata
batch->Delete(metadata_cf_handle_, key);
batch->Put(metadata_cf_handle_, new_key, iter.Value());

auto subkey_iter = iter.GetSubKeyIterator();
Expand Down
8 changes: 5 additions & 3 deletions src/storage/redis_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,11 @@ class Database {
rocksdb::ColumnFamilyHandle *cf_handle = nullptr);
[[nodiscard]] rocksdb::Status ClearKeysOfSlot(const rocksdb::Slice &ns, int slot);
[[nodiscard]] rocksdb::Status KeyExist(const std::string &key);
// Move <key,value> to <new_key,value> (already an internal key)
[[nodiscard]] rocksdb::Status Move(const std::string &key, const std::string &new_key, bool nx, bool *ret,
bool *key_exist);

// Copy <key,value> to <new_key,value> (already an internal key)
enum class CopyResult { KEY_NOT_EXIST, KEY_ALREADY_EXIST, DONE };
[[nodiscard]] rocksdb::Status Copy(const std::string &key, const std::string &new_key, bool nx, bool delete_old,
CopyResult *res);

protected:
engine::Storage *storage_;
Expand Down
Loading

0 comments on commit ed5937c

Please sign in to comment.