Skip to content

Commit

Permalink
Merge branch 'unstable' into explainsql-dot
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice committed Jun 15, 2024
2 parents 8156705 + 8f1d2ad commit bf278bd
Show file tree
Hide file tree
Showing 24 changed files with 162 additions and 218 deletions.
19 changes: 9 additions & 10 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ Status Cluster::ImportSlot(redis::Connection *conn, int slot, int state) {

Status Cluster::GetClusterInfo(std::string *cluster_infos) {
if (version_ < 0) {
return {Status::ClusterDown, errClusterNoInitialized};
return {Status::RedisClusterDown, errClusterNoInitialized};
}

cluster_infos->clear();
Expand Down Expand Up @@ -421,7 +421,7 @@ Status Cluster::GetClusterInfo(std::string *cluster_infos) {
// ... continued until done
Status Cluster::GetSlotsInfo(std::vector<SlotInfo> *slots_infos) {
if (version_ < 0) {
return {Status::ClusterDown, errClusterNoInitialized};
return {Status::RedisClusterDown, errClusterNoInitialized};
}

slots_infos->clear();
Expand Down Expand Up @@ -464,7 +464,7 @@ SlotInfo Cluster::genSlotNodeInfo(int start, int end, const std::shared_ptr<Clus
// $version $connected $slot_range
Status Cluster::GetClusterNodes(std::string *nodes_str) {
if (version_ < 0) {
return {Status::ClusterDown, errClusterNoInitialized};
return {Status::RedisClusterDown, errClusterNoInitialized};
}

*nodes_str = genNodesDescription();
Expand All @@ -473,7 +473,7 @@ Status Cluster::GetClusterNodes(std::string *nodes_str) {

StatusOr<std::string> Cluster::GetReplicas(const std::string &node_id) {
if (version_ < 0) {
return {Status::ClusterDown, errClusterNoInitialized};
return {Status::RedisClusterDown, errClusterNoInitialized};
}

auto item = nodes_.find(node_id);
Expand Down Expand Up @@ -821,26 +821,26 @@ Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, cons
int cur_slot = GetSlotIdFromKey(cmd_tokens[i]);
if (slot == -1) slot = cur_slot;
if (slot != cur_slot) {
return {Status::RedisExecErr, "CROSSSLOT Attempted to access keys that don't hash to the same slot"};
return {Status::RedisCrossSlot, "Attempted to access keys that don't hash to the same slot"};
}
}
if (slot == -1) return Status::OK();

if (slots_nodes_[slot] == nullptr) {
return {Status::ClusterDown, "CLUSTERDOWN Hash slot not served"};
return {Status::RedisClusterDown, "Hash slot not served"};
}

if (myself_ && myself_ == slots_nodes_[slot]) {
// We use central controller to manage the topology of the cluster.
// Server can't change the topology directly, so we record the migrated slots
// to move the requests of the migrated slots to the destination node.
if (migrated_slots_.count(slot) > 0) { // I'm not serving the migrated slot
return {Status::RedisExecErr, fmt::format("MOVED {} {}", slot, migrated_slots_[slot])};
return {Status::RedisMoved, fmt::format("{} {}", slot, migrated_slots_[slot])};
}
// To keep data consistency, slot will be forbidden write while sending the last incremental data.
// During this phase, the requests of the migrating slot has to be rejected.
if ((attributes->flags & redis::kCmdWrite) && IsWriteForbiddenSlot(slot)) {
return {Status::RedisExecErr, "TRYAGAIN Can't write to slot being migrated which is in write forbidden phase"};
return {Status::RedisTryAgain, "Can't write to slot being migrated which is in write forbidden phase"};
}

return Status::OK(); // I'm serving this slot
Expand Down Expand Up @@ -868,8 +868,7 @@ Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, cons
return Status::OK(); // My master is serving this slot
}

return {Status::RedisExecErr,
fmt::format("MOVED {} {}:{}", slot, slots_nodes_[slot]->host, slots_nodes_[slot]->port)};
return {Status::RedisMoved, fmt::format("{} {}:{}", slot, slots_nodes_[slot]->host, slots_nodes_[slot]->port)};
}

// Only HARD mode is meaningful to the Kvrocks cluster,
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/cluster_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ inline constexpr const char *errSlotOutOfRange = "Slot is out of range";
inline constexpr const char *errInvalidClusterVersion = "Invalid cluster version";
inline constexpr const char *errSlotOverlapped = "Slot distribution is overlapped";
inline constexpr const char *errNoMasterNode = "The node isn't a master";
inline constexpr const char *errClusterNoInitialized = "CLUSTERDOWN The cluster is not initialized";
inline constexpr const char *errClusterNoInitialized = "The cluster is not initialized";
inline constexpr const char *errInvalidClusterNodeInfo = "Invalid cluster nodes info";
inline constexpr const char *errInvalidImportState = "Invalid import state";

Expand Down
9 changes: 6 additions & 3 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1000,15 +1000,18 @@ Status ReplicationThread::parseWriteBatch(const std::string &batch_string) {
}

bool ReplicationThread::isRestoringError(std::string_view err) {
return err == std::string(RESP_PREFIX_ERROR) + redis::errRestoringBackup;
// err doesn't contain the CRLF, so cannot use redis::Error here.
return err == RESP_PREFIX_ERROR + redis::StatusToRedisErrorMsg({Status::RedisLoading, redis::errRestoringBackup});
}

bool ReplicationThread::isWrongPsyncNum(std::string_view err) {
return err == std::string(RESP_PREFIX_ERROR) + redis::errWrongNumArguments;
// err doesn't contain the CRLF, so cannot use redis::Error here.
return err == RESP_PREFIX_ERROR + redis::StatusToRedisErrorMsg({Status::NotOK, redis::errWrongNumArguments});
}

bool ReplicationThread::isUnknownOption(std::string_view err) {
return err == fmt::format("{}ERR {}", RESP_PREFIX_ERROR, redis::errUnknownOption);
// err doesn't contain the CRLF, so cannot use redis::Error here.
return err == RESP_PREFIX_ERROR + redis::StatusToRedisErrorMsg({Status::NotOK, redis::errUnknownOption});
}

rocksdb::Status WriteBatchHandler::PutCF(uint32_t column_family_id, const rocksdb::Slice &key,
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/sync_migrate_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void SyncMigrateContext::OnWrite(bufferevent *bev) {
if (migrate_result_) {
conn_->Reply(redis::SimpleString("OK"));
} else {
conn_->Reply(redis::Error("ERR " + migrate_result_.Msg()));
conn_->Reply(redis::Error(migrate_result_));
}

timer_.reset();
Expand Down
8 changes: 4 additions & 4 deletions src/commands/cmd_bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ constexpr const char *errInvalidErrorRate = "error rate should be between 0 and
constexpr const char *errInvalidCapacity = "capacity should be larger than 0";
constexpr const char *errInvalidExpansion = "expansion should be greater or equal to 1";
constexpr const char *errNonscalingButExpand = "nonscaling filters cannot expand";
constexpr const char *errFilterFull = "ERR nonscaling filter is full";
constexpr const char *errFilterFull = "nonscaling filter is full";
} // namespace

namespace redis {
Expand Down Expand Up @@ -119,7 +119,7 @@ class CommandBFAdd : public Commander {
*output = redis::Integer(0);
break;
case BloomFilterAddResult::kFull:
*output = redis::Error(errFilterFull);
*output = redis::Error({Status::NotOK, errFilterFull});
break;
}
return Status::OK();
Expand Down Expand Up @@ -152,7 +152,7 @@ class CommandBFMAdd : public Commander {
*output += redis::Integer(0);
break;
case BloomFilterAddResult::kFull:
*output += redis::Error(errFilterFull);
*output += redis::Error({Status::NotOK, errFilterFull});
break;
}
}
Expand Down Expand Up @@ -248,7 +248,7 @@ class CommandBFInsert : public Commander {
*output += redis::Integer(0);
break;
case BloomFilterAddResult::kFull:
*output += redis::Error(errFilterFull);
*output += redis::Error({Status::NotOK, errFilterFull});
break;
}
}
Expand Down
20 changes: 10 additions & 10 deletions src/commands/cmd_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,45 +90,45 @@ class CommandCluster : public Commander {
}
}
} else {
return {Status::RedisExecErr, s.Msg()};
return s;
}
} else if (subcommand_ == "nodes") {
std::string nodes_desc;
Status s = srv->cluster->GetClusterNodes(&nodes_desc);
if (s.IsOK()) {
*output = conn->VerbatimString("txt", nodes_desc);
} else {
return {Status::RedisExecErr, s.Msg()};
return s;
}
} else if (subcommand_ == "info") {
std::string cluster_info;
Status s = srv->cluster->GetClusterInfo(&cluster_info);
if (s.IsOK()) {
*output = conn->VerbatimString("txt", cluster_info);
} else {
return {Status::RedisExecErr, s.Msg()};
return s;
}
} else if (subcommand_ == "import") {
Status s = srv->cluster->ImportSlot(conn, static_cast<int>(slot_), state_);
if (s.IsOK()) {
*output = redis::SimpleString("OK");
} else {
return {Status::RedisExecErr, s.Msg()};
return s;
}
} else if (subcommand_ == "reset") {
Status s = srv->cluster->Reset();
if (s.IsOK()) {
*output = redis::SimpleString("OK");
} else {
return {Status::RedisExecErr, s.Msg()};
return s;
}
} else if (subcommand_ == "replicas") {
auto node_id = args_[2];
StatusOr<std::string> s = srv->cluster->GetReplicas(node_id);
if (s.IsOK()) {
*output = conn->VerbatimString("txt", s.GetValue());
} else {
return {Status::RedisExecErr, s.Msg()};
return s;
}
} else {
return {Status::RedisExecErr, "Invalid cluster command options"};
Expand Down Expand Up @@ -252,23 +252,23 @@ class CommandClusterX : public Commander {
need_persist_nodes_info = true;
*output = redis::SimpleString("OK");
} else {
return {Status::RedisExecErr, s.Msg()};
return s;
}
} else if (subcommand_ == "setnodeid") {
Status s = srv->cluster->SetNodeId(args_[2]);
if (s.IsOK()) {
need_persist_nodes_info = true;
*output = redis::SimpleString("OK");
} else {
return {Status::RedisExecErr, s.Msg()};
return s;
}
} else if (subcommand_ == "setslot") {
Status s = srv->cluster->SetSlotRanges(slot_ranges_, args_[4], set_version_);
if (s.IsOK()) {
need_persist_nodes_info = true;
*output = redis::SimpleString("OK");
} else {
return {Status::RedisExecErr, s.Msg()};
return s;
}
} else if (subcommand_ == "version") {
int64_t v = srv->cluster->GetVersion();
Expand All @@ -287,7 +287,7 @@ class CommandClusterX : public Commander {
}
*output = redis::SimpleString("OK");
} else {
return {Status::RedisExecErr, s.Msg()};
return s;
}
} else {
return {Status::RedisExecErr, "Invalid cluster command options"};
Expand Down
9 changes: 2 additions & 7 deletions src/commands/cmd_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -326,16 +326,11 @@ class CommandHRangeByLex : public Commander {
return parser.InvalidSyntax();
}
}
Status s;
if (spec_.reversed) {
s = ParseRangeLexSpec(args[3], args[2], &spec_);
return ParseRangeLexSpec(args[3], args[2], &spec_);
} else {
s = ParseRangeLexSpec(args[2], args[3], &spec_);
return ParseRangeLexSpec(args[2], args[3], &spec_);
}
if (!s.IsOK()) {
return {Status::RedisParseErr, s.Msg()};
}
return Status::OK();
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
Expand Down
14 changes: 5 additions & 9 deletions src/commands/cmd_key.cc
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,7 @@ class CommandSort : public Commander {
}

if (type != RedisType::kRedisList && type != RedisType::kRedisSet && type != RedisType::kRedisZSet) {
*output = Error("WRONGTYPE Operation against a key holding the wrong kind of value");
return Status::OK();
return {Status::RedisWrongType, "Operation against a key holding the wrong kind of value"};
}

/* When sorting a set with no sort specified, we must sort the output
Expand All @@ -508,15 +507,12 @@ class CommandSort : public Commander {

switch (res) {
case Database::SortResult::UNKNOWN_TYPE:
*output = redis::Error("Unknown Type");
break;
return {Status::RedisErrorNoPrefix, "Unknown Type"};
case Database::SortResult::DOUBLE_CONVERT_ERROR:
*output = redis::Error("One or more scores can't be converted into double");
break;
return {Status::RedisErrorNoPrefix, "One or more scores can't be converted into double"};
case Database::SortResult::LIMIT_EXCEEDED:
*output = redis::Error("The number of elements to be sorted exceeds SORT_LENGTH_LIMIT = " +
std::to_string(SORT_LENGTH_LIMIT));
break;
return {Status::RedisErrorNoPrefix,
"The number of elements to be sorted exceeds SORT_LENGTH_LIMIT = " + std::to_string(SORT_LENGTH_LIMIT)};
case Database::SortResult::DONE:
if (sort_argument_.storekey.empty()) {
std::vector<std::string> output_vec;
Expand Down
6 changes: 3 additions & 3 deletions src/commands/cmd_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ class CommandBPop : public BlockingCommander {
conn_->Reply(conn_->MultiBulkString({*last_key_ptr, std::move(elem)}));
}
} else if (!s.IsNotFound()) {
conn_->Reply(redis::Error("ERR " + s.ToString()));
conn_->Reply(redis::Error({Status::NotOK, s.ToString()}));
}

return s;
Expand Down Expand Up @@ -414,7 +414,7 @@ class CommandBLMPop : public BlockingCommander {
conn_->Reply(redis::Array({redis::BulkString(chosen_key), std::move(elems_bulk)}));
}
} else if (!s.IsNotFound()) {
conn_->Reply(redis::Error("ERR " + s.ToString()));
conn_->Reply(redis::Error({Status::NotOK, s.ToString()}));
}

return s;
Expand Down Expand Up @@ -757,7 +757,7 @@ class CommandBLMove : public BlockingCommander {
std::string elem;
auto s = list_db.LMove(args_[1], args_[2], src_left_, dst_left_, &elem);
if (!s.ok() && !s.IsNotFound()) {
conn_->Reply(redis::Error("ERR " + s.ToString()));
conn_->Reply(redis::Error({Status::NotOK, s.ToString()}));
return true;
}

Expand Down
4 changes: 2 additions & 2 deletions src/commands/cmd_replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class CommandPSync : public Commander {
srv->stats.IncrPSyncOKCount();
s = srv->AddSlave(conn, next_repl_seq_);
if (!s.IsOK()) {
std::string err = redis::Error(s.Msg());
std::string err = redis::Error(s);
s = util::SockSend(conn->GetFD(), err, conn->GetBufferEvent());
if (!s.IsOK()) {
LOG(WARNING) << "failed to send error message to the replica: " << s.Msg();
Expand Down Expand Up @@ -230,7 +230,7 @@ class CommandFetchMeta : public Commander {
std::string files;
auto s = engine::Storage::ReplDataManager::GetFullReplDataInfo(srv->storage, &files);
if (!s.IsOK()) {
s = util::SockSend(repl_fd, redis::Error("can't create db checkpoint"), bev);
s = util::SockSend(repl_fd, redis::Error({Status::RedisErrorNoPrefix, "can't create db checkpoint"}), bev);
if (!s.IsOK()) {
LOG(WARNING) << "[replication] Failed to send error response: " << s.Msg();
}
Expand Down
3 changes: 1 addition & 2 deletions src/commands/cmd_script.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ class CommandEvalImpl : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
if (evalsha && args_[1].size() != 40) {
*output = redis::Error(errNoMatchingScript);
return Status::OK();
return {Status::RedisNoScript, errNoMatchingScript};
}

int64_t numkeys = GET_OR_RET(ParseInt<int64_t>(args_[2], 10));
Expand Down
Loading

0 comments on commit bf278bd

Please sign in to comment.