Skip to content

Commit

Permalink
feat: Catch up ZRANGE options (#1428)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun committed May 7, 2023
1 parent 1837f8f commit a66ad9a
Show file tree
Hide file tree
Showing 9 changed files with 406 additions and 237 deletions.
295 changes: 153 additions & 142 deletions src/commands/cmd_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*
*/

#include "command_parser.h"
#include "commander.h"
#include "commands/scan_base.h"
#include "error_constants.h"
Expand Down Expand Up @@ -125,7 +126,7 @@ Status CommandZAdd::validateFlags() const {
class CommandZCount : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Status s = redis::ZSet::ParseRangeSpec(args[2], args[3], &spec_);
Status s = ParseRangeScoreSpec(args[2], args[3], &spec_);
if (!s.IsOK()) {
return {Status::RedisParseErr, s.Msg()};
}
Expand All @@ -145,7 +146,7 @@ class CommandZCount : public Commander {
}

private:
ZRangeSpec spec_;
CommonRangeScoreSpec spec_;
};

class CommandZCard : public Commander {
Expand Down Expand Up @@ -265,170 +266,190 @@ class CommandZPopMax : public CommandZPop {
CommandZPopMax() : CommandZPop(false) {}
};

class CommandZRange : public Commander {
class CommandZRangeGeneric : public Commander {
public:
explicit CommandZRange(bool reversed = false) : reversed_(reversed) {}
explicit CommandZRangeGeneric(ZRangeType range_type = kZRangeAuto, ZRangeDirection direction = kZRangeDirectionAuto)
: range_type_(range_type), direction_(direction) {}

Status Parse(const std::vector<std::string> &args) override {
auto parse_start = ParseInt<int>(args[2], 10);
auto parse_stop = ParseInt<int>(args[3], 10);
if (!parse_start || !parse_stop) {
return {Status::RedisParseErr, errValueNotInteger};
key_ = args[1];

int64_t offset = 0;
int64_t count = -1;
// skip the <CMD> <src> <min> <max> args and parse remaining optional arguments
CommandParser parser(args, 4);
while (parser.Good()) {
if (parser.EatEqICase("withscores")) {
with_scores_ = true;
} else if (parser.EatEqICase("limit")) {
auto parse_offset = parser.TakeInt<int64_t>();
auto parse_count = parser.TakeInt<int64_t>();
if (!parse_offset || !parse_count) {
return {Status::RedisParseErr, errValueNotInteger};
}
offset = *parse_offset;
count = *parse_count;
} else if (range_type_ == kZRangeAuto && parser.EatEqICase("bylex")) {
range_type_ = kZRangeLex;
} else if (range_type_ == kZRangeAuto && parser.EatEqICase("byscore")) {
range_type_ = kZRangeScore;
} else if (direction_ == kZRangeDirectionAuto && parser.EatEqICase("rev")) {
direction_ = kZRangeDirectionReverse;
} else {
return parser.InvalidSyntax();
}
}

start_ = *parse_start;
stop_ = *parse_stop;
if (args.size() > 4 && (util::ToLower(args[4]) == "withscores")) {
with_scores_ = true;
// use defaults if not overridden by arguments
if (range_type_ == kZRangeAuto) {
range_type_ = kZRangeRank;
}
if (direction_ == kZRangeDirectionAuto) {
direction_ = kZRangeDirectionForward;
}

return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
std::vector<MemberScore> member_scores;
uint8_t flags = !reversed_ ? 0 : kZSetReversed;
auto s = zset_db.Range(args_[1], start_, stop_, flags, &member_scores);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
// check for conflicting arguments
if (with_scores_ && range_type_ == kZRangeLex) {
return {Status::RedisParseErr, "syntax error, WITHSCORES not supported in combination with BYLEX"};
}
if (count != -1 && range_type_ == kZRangeRank) {
return {Status::RedisParseErr,
"syntax error, LIMIT is only supported in combination with either BYSCORE or BYLEX"};
}

if (!with_scores_) {
output->append(redis::MultiLen(member_scores.size()));
} else {
output->append(redis::MultiLen(member_scores.size() * 2));
// resolve index of <min> <max>
int min_idx = 2;
int max_idx = 3;
if (direction_ == kZRangeDirectionReverse && (range_type_ == kZRangeLex || range_type_ == kZRangeScore)) {
min_idx = 3;
max_idx = 2;
}

for (const auto &ms : member_scores) {
output->append(redis::BulkString(ms.member));
if (with_scores_) output->append(redis::BulkString(util::Float2String(ms.score)));
// parse range spec
switch (range_type_) {
case kZRangeAuto:
case kZRangeRank:
GET_OR_RET(ParseRangeRankSpec(args[min_idx], args[max_idx], &rank_spec_));
if (direction_ == kZRangeDirectionReverse) {
rank_spec_.reversed = true;
}
break;
case kZRangeLex:
GET_OR_RET(ParseRangeLexSpec(args[min_idx], args[max_idx], &lex_spec_));
lex_spec_.offset = offset;
lex_spec_.count = count;
if (direction_ == kZRangeDirectionReverse) {
lex_spec_.reversed = true;
}
break;
case kZRangeScore:
GET_OR_RET(ParseRangeScoreSpec(args[min_idx], args[max_idx], &score_spec_));
score_spec_.offset = offset;
score_spec_.count = count;
if (direction_ == kZRangeDirectionReverse) {
score_spec_.reversed = true;
}
break;
}

return Status::OK();
}

private:
int start_ = 0;
int stop_ = 0;
bool reversed_;
bool with_scores_ = false;
};
Status Execute(Server *svr, Connection *conn, std::string *output) override {
if (range_type_ == kZRangeAuto || range_type_ == kZRangeRank) {
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
std::vector<MemberScore> member_scores;
auto s = zset_db.RangeByRank(args_[1], rank_spec_, &member_scores);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

class CommandZRevRange : public CommandZRange {
public:
CommandZRevRange() : CommandZRange(true) {}
};
if (!with_scores_) {
output->append(redis::MultiLen(member_scores.size()));
} else {
output->append(redis::MultiLen(member_scores.size() * 2));
}

class CommandZRangeByLex : public Commander {
public:
explicit CommandZRangeByLex(bool reversed = false) { spec_.reversed = reversed; }
for (const auto &ms : member_scores) {
output->append(redis::BulkString(ms.member));
if (with_scores_) output->append(redis::BulkString(util::Float2String(ms.score)));
}

Status Parse(const std::vector<std::string> &args) override {
Status s;
if (spec_.reversed) {
s = ParseRangeLexSpec(args[3], args[2], &spec_);
} else {
s = ParseRangeLexSpec(args[2], args[3], &spec_);
}
return Status::OK();
} else if (range_type_ == kZRangeLex) {
int size = 0;
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
std::vector<std::string> members;
auto s = zset_db.RangeByLex(args_[1], lex_spec_, &members, &size);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

if (!s.IsOK()) {
return {Status::RedisParseErr, s.Msg()};
}
*output = redis::MultiBulkString(members, false);
return Status::OK();
} else { // range_type == kZRangeScore
int size = 0;
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
std::vector<MemberScore> member_scores;
auto s = zset_db.RangeByScore(args_[1], score_spec_, &member_scores, &size);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

if (args.size() == 7 && util::ToLower(args[4]) == "limit") {
auto parse_offset = ParseInt<int>(args[5], 10);
auto parse_count = ParseInt<int>(args[6], 10);
if (!parse_offset || !parse_count) {
return {Status::RedisParseErr, errValueNotInteger};
if (!with_scores_) {
output->append(redis::MultiLen(member_scores.size()));
} else {
output->append(redis::MultiLen(member_scores.size() * 2));
}

spec_.offset = *parse_offset;
spec_.count = *parse_count;
}
return Commander::Parse(args);
}
for (const auto &ms : member_scores) {
output->append(redis::BulkString(ms.member));
if (with_scores_) output->append(redis::BulkString(util::Float2String(ms.score)));
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
int size = 0;
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
std::vector<std::string> members;
auto s = zset_db.RangeByLex(args_[1], spec_, &members, &size);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
return Status::OK();
}

*output = redis::MultiBulkString(members, false);
return Status::OK();
}

private:
CommonRangeLexSpec spec_;
std::string key_;
ZRangeType range_type_;
ZRangeDirection direction_;
bool with_scores_ = false;

CommonRangeRankSpec rank_spec_;
CommonRangeLexSpec lex_spec_;
CommonRangeScoreSpec score_spec_;
};

class CommandZRangeByScore : public Commander {
class CommandZRange : public CommandZRangeGeneric {
public:
explicit CommandZRangeByScore(bool reversed = false) { spec_.reversed = reversed; }
Status Parse(const std::vector<std::string> &args) override {
Status s;
if (spec_.reversed) {
s = redis::ZSet::ParseRangeSpec(args[3], args[2], &spec_);
} else {
s = redis::ZSet::ParseRangeSpec(args[2], args[3], &spec_);
}

if (!s.IsOK()) {
return {Status::RedisParseErr, s.Msg()};
}

size_t i = 4;
while (i < args.size()) {
if (util::ToLower(args[i]) == "withscores") {
with_scores_ = true;
i++;
} else if (util::ToLower(args[i]) == "limit" && i + 2 < args.size()) {
auto parse_offset = ParseInt<int>(args[i + 1], 10);
auto parse_count = ParseInt<int>(args[i + 2], 10);
if (!parse_offset || !parse_count) {
return {Status::RedisParseErr, errValueNotInteger};
}

spec_.offset = *parse_offset;
spec_.count = *parse_count;
i += 3;
} else {
return {Status::RedisParseErr, errInvalidSyntax};
}
}
return Commander::Parse(args);
}
explicit CommandZRange() = default;
};

Status Execute(Server *svr, Connection *conn, std::string *output) override {
int size = 0;
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
std::vector<MemberScore> member_scores;
auto s = zset_db.RangeByScore(args_[1], spec_, &member_scores, &size);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
class CommandZRevRange : public CommandZRangeGeneric {
public:
CommandZRevRange() : CommandZRangeGeneric(kZRangeRank, kZRangeDirectionReverse) {}
};

if (!with_scores_) {
output->append(redis::MultiLen(member_scores.size()));
} else {
output->append(redis::MultiLen(member_scores.size() * 2));
}
class CommandZRangeByLex : public CommandZRangeGeneric {
public:
explicit CommandZRangeByLex() : CommandZRangeGeneric(kZRangeLex, kZRangeDirectionForward) {}
};

for (const auto &ms : member_scores) {
output->append(redis::BulkString(ms.member));
if (with_scores_) output->append(redis::BulkString(util::Float2String(ms.score)));
}
class CommandZRevRangeByLex : public CommandZRangeGeneric {
public:
CommandZRevRangeByLex() : CommandZRangeGeneric(kZRangeLex, kZRangeDirectionReverse) {}
};

return Status::OK();
}
class CommandZRangeByScore : public CommandZRangeGeneric {
public:
explicit CommandZRangeByScore() : CommandZRangeGeneric(kZRangeScore, kZRangeDirectionForward) {}
};

private:
ZRangeSpec spec_;
bool with_scores_ = false;
class CommandZRevRangeByScore : public CommandZRangeGeneric {
public:
CommandZRevRangeByScore() : CommandZRangeGeneric(kZRangeScore, kZRangeDirectionReverse) {}
};

class CommandZRank : public Commander {
Expand Down Expand Up @@ -459,16 +480,6 @@ class CommandZRevRank : public CommandZRank {
CommandZRevRank() : CommandZRank(true) {}
};

class CommandZRevRangeByScore : public CommandZRangeByScore {
public:
CommandZRevRangeByScore() : CommandZRangeByScore(true) {}
};

class CommandZRevRangeByLex : public CommandZRangeByLex {
public:
CommandZRevRangeByLex() : CommandZRangeByLex(true) {}
};

class CommandZRem : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -524,7 +535,7 @@ class CommandZRemRangeByRank : public Commander {
class CommandZRemRangeByScore : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Status s = redis::ZSet::ParseRangeSpec(args[2], args[3], &spec_);
Status s = ParseRangeScoreSpec(args[2], args[3], &spec_);
if (!s.IsOK()) {
return {Status::RedisParseErr, s.Msg()};
}
Expand All @@ -544,7 +555,7 @@ class CommandZRemRangeByScore : public Commander {
}

private:
ZRangeSpec spec_;
CommonRangeScoreSpec spec_;
};

class CommandZRemRangeByLex : public Commander {
Expand Down
Loading

0 comments on commit a66ad9a

Please sign in to comment.