Skip to content

Commit

Permalink
refactor: zset ranges (#1429)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun committed May 7, 2023
1 parent a66ad9a commit c985137
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 178 deletions.
2 changes: 1 addition & 1 deletion src/commands/cmd_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ class CommandHRangeByLex : public Commander {
}

private:
CommonRangeLexSpec spec_;
RangeLexSpec spec_;
};

class CommandHScan : public CommandSubkeyScanBase {
Expand Down
129 changes: 60 additions & 69 deletions src/commands/cmd_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class CommandZCount : public Commander {
}

private:
CommonRangeScoreSpec spec_;
RangeScoreSpec spec_;
};

class CommandZCard : public Commander {
Expand Down Expand Up @@ -215,7 +215,7 @@ class CommandZLexCount : public Commander {
}

private:
CommonRangeLexSpec spec_;
RangeLexSpec spec_;
};

class CommandZPop : public Commander {
Expand Down Expand Up @@ -356,58 +356,41 @@ class CommandZRangeGeneric : public Commander {
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
if (range_type_ == kZRangeAuto || range_type_ == kZRangeRank) {
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
std::vector<MemberScore> member_scores;
auto s = zset_db.RangeByRank(args_[1], rank_spec_, &member_scores);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

if (!with_scores_) {
output->append(redis::MultiLen(member_scores.size()));
} else {
output->append(redis::MultiLen(member_scores.size() * 2));
}

for (const auto &ms : member_scores) {
output->append(redis::BulkString(ms.member));
if (with_scores_) output->append(redis::BulkString(util::Float2String(ms.score)));
}

return Status::OK();
} else if (range_type_ == kZRangeLex) {
int size = 0;
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
std::vector<std::string> members;
auto s = zset_db.RangeByLex(args_[1], lex_spec_, &members, &size);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::MultiBulkString(members, false);
return Status::OK();
} else { // range_type == kZRangeScore
int size = 0;
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
std::vector<MemberScore> member_scores;
auto s = zset_db.RangeByScore(args_[1], score_spec_, &member_scores, &size);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
redis::ZSet zset_db(svr->storage, conn->GetNamespace());

if (!with_scores_) {
output->append(redis::MultiLen(member_scores.size()));
} else {
output->append(redis::MultiLen(member_scores.size() * 2));
}
std::vector<MemberScore> member_scores;
std::vector<std::string> members;

for (const auto &ms : member_scores) {
output->append(redis::BulkString(ms.member));
if (with_scores_) output->append(redis::BulkString(util::Float2String(ms.score)));
}
rocksdb::Status s;
switch (range_type_) {
case kZRangeAuto:
case kZRangeRank:
s = zset_db.RangeByRank(key_, rank_spec_, &member_scores, nullptr);
break;
case kZRangeScore:
s = zset_db.RangeByScore(key_, score_spec_, &member_scores, nullptr);
break;
case kZRangeLex:
s = zset_db.RangeByLex(key_, lex_spec_, &members, nullptr);
break;
}
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

return Status::OK();
switch (range_type_) {
case kZRangeLex:
output->append(redis::MultiBulkString(members, false));
return Status::OK();
case kZRangeAuto:
case kZRangeRank:
case kZRangeScore:
output->append(redis::MultiLen(member_scores.size() * (with_scores_ ? 2 : 1)));
for (const auto &ms : member_scores) {
output->append(redis::BulkString(ms.member));
if (with_scores_) output->append(redis::BulkString(util::Float2String(ms.score)));
}
return Status::OK();
}
}

Expand All @@ -417,9 +400,9 @@ class CommandZRangeGeneric : public Commander {
ZRangeDirection direction_;
bool with_scores_ = false;

CommonRangeRankSpec rank_spec_;
CommonRangeLexSpec lex_spec_;
CommonRangeScoreSpec score_spec_;
RangeRankSpec rank_spec_;
RangeLexSpec lex_spec_;
RangeScoreSpec score_spec_;
};

class CommandZRange : public CommandZRangeGeneric {
Expand Down Expand Up @@ -509,27 +492,29 @@ class CommandZRemRangeByRank : public Commander {
return {Status::RedisParseErr, errValueNotInteger};
}

start_ = *parse_start;
stop_ = *parse_stop;
spec_.start = *parse_start;
spec_.stop = *parse_stop;

return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
int ret = 0;
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
auto s = zset_db.RemoveRangeByRank(args_[1], start_, stop_, &ret);

int cnt = 0;
spec_.with_deletion = true;

auto s = zset_db.RangeByRank(args_[1], spec_, nullptr, &cnt);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::Integer(ret);
*output = redis::Integer(cnt);
return Status::OK();
}

private:
int start_ = 0;
int stop_ = 0;
RangeRankSpec spec_;
};

class CommandZRemRangeByScore : public Commander {
Expand All @@ -543,19 +528,22 @@ class CommandZRemRangeByScore : public Commander {
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
int size = 0;
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
auto s = zset_db.RemoveRangeByScore(args_[1], spec_, &size);

int cnt = 0;
spec_.with_deletion = true;

auto s = zset_db.RangeByScore(args_[1], spec_, nullptr, &cnt);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::Integer(size);
*output = redis::Integer(cnt);
return Status::OK();
}

private:
CommonRangeScoreSpec spec_;
RangeScoreSpec spec_;
};

class CommandZRemRangeByLex : public Commander {
Expand All @@ -569,19 +557,22 @@ class CommandZRemRangeByLex : public Commander {
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
int size = 0;
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
auto s = zset_db.RemoveRangeByLex(args_[1], spec_, &size);

int cnt = 0;
spec_.with_deletion = true;

auto s = zset_db.RangeByLex(args_[1], spec_, nullptr, &cnt);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::Integer(size);
*output = redis::Integer(cnt);
return Status::OK();
}

private:
CommonRangeLexSpec spec_;
RangeLexSpec spec_;
};

class CommandZScore : public Commander {
Expand Down
6 changes: 3 additions & 3 deletions src/common/range_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include "commands/error_constants.h"
#include "parse_util.h"

Status ParseRangeLexSpec(const std::string &min, const std::string &max, CommonRangeLexSpec *spec) {
Status ParseRangeLexSpec(const std::string &min, const std::string &max, RangeLexSpec *spec) {
if (min == "+" || max == "-") {
return {Status::NotOK, "min > max"};
}
Expand Down Expand Up @@ -56,7 +56,7 @@ Status ParseRangeLexSpec(const std::string &min, const std::string &max, CommonR
return Status::OK();
}

Status ParseRangeRankSpec(const std::string &min, const std::string &max, CommonRangeRankSpec *spec) {
Status ParseRangeRankSpec(const std::string &min, const std::string &max, RangeRankSpec *spec) {
auto parse_start = ParseInt<int>(min, 10);
auto parse_stop = ParseInt<int>(max, 10);
if (!parse_start || !parse_stop) {
Expand All @@ -67,7 +67,7 @@ Status ParseRangeRankSpec(const std::string &min, const std::string &max, Common
return Status::OK();
}

Status ParseRangeScoreSpec(const std::string &min, const std::string &max, CommonRangeScoreSpec *spec) {
Status ParseRangeScoreSpec(const std::string &min, const std::string &max, RangeScoreSpec *spec) {
char *eptr = nullptr;

if (min == "+inf" || max == "-inf") {
Expand Down
26 changes: 13 additions & 13 deletions src/common/range_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,36 @@

#include "status.h"

struct CommonRangeLexSpec {
struct RangeLexSpec {
std::string min, max;
bool minex = false, maxex = false; /* are min or max exclusive */
bool max_infinite = false; /* are max infinite */
int64_t offset = -1, count = -1;
bool removed = false, reversed = false;
explicit CommonRangeLexSpec() = default;
bool with_deletion = false, reversed = false;
explicit RangeLexSpec() = default;
};

Status ParseRangeLexSpec(const std::string &min, const std::string &max, CommonRangeLexSpec *spec);
Status ParseRangeLexSpec(const std::string &min, const std::string &max, RangeLexSpec *spec);

struct CommonRangeRankSpec {
int start, stop;
bool removed = false, reversed = false;
explicit CommonRangeRankSpec() = default;
struct RangeRankSpec {
int start = 0, stop = -1;
bool with_deletion = false, reversed = false;
explicit RangeRankSpec() = default;
};

Status ParseRangeRankSpec(const std::string &min, const std::string &max, CommonRangeRankSpec *spec);
Status ParseRangeRankSpec(const std::string &min, const std::string &max, RangeRankSpec *spec);

const double kMinScore = (std::numeric_limits<float>::is_iec559 ? -std::numeric_limits<double>::infinity()
: std::numeric_limits<double>::lowest());
const double kMaxScore = (std::numeric_limits<float>::is_iec559 ? std::numeric_limits<double>::infinity()
: std::numeric_limits<double>::max());

struct CommonRangeScoreSpec {
struct RangeScoreSpec {
double min = kMinScore, max = kMaxScore;
bool minex = false, maxex = false; /* are min or max exclusive */
int64_t offset = -1, count = -1;
bool removed = false, reversed = false;
explicit CommonRangeScoreSpec() = default;
bool with_deletion = false, reversed = false;
explicit RangeScoreSpec() = default;
};

Status ParseRangeScoreSpec(const std::string &min, const std::string &max, CommonRangeScoreSpec *spec);
Status ParseRangeScoreSpec(const std::string &min, const std::string &max, RangeScoreSpec *spec);
2 changes: 1 addition & 1 deletion src/types/redis_geo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ int Geo::getPointsInRange(const Slice &user_key, double min, double max, double
std::vector<GeoPoint> *geo_points) {
/* include min in range; exclude max in range */
/* That's: min <= val < max */
CommonRangeScoreSpec spec;
RangeScoreSpec spec;
spec.min = min;
spec.max = max;
spec.maxex = true;
Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ rocksdb::Status Hash::MSet(const Slice &user_key, const std::vector<FieldValue>
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status Hash::RangeByLex(const Slice &user_key, const CommonRangeLexSpec &spec,
rocksdb::Status Hash::RangeByLex(const Slice &user_key, const RangeLexSpec &spec,
std::vector<FieldValue> *field_values) {
field_values->clear();
if (spec.count == 0) {
Expand Down
3 changes: 1 addition & 2 deletions src/types/redis_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ class Hash : public SubKeyScanner {
rocksdb::Status IncrBy(const Slice &user_key, const Slice &field, int64_t increment, int64_t *ret);
rocksdb::Status IncrByFloat(const Slice &user_key, const Slice &field, double increment, double *ret);
rocksdb::Status MSet(const Slice &user_key, const std::vector<FieldValue> &field_values, bool nx, int *ret);
rocksdb::Status RangeByLex(const Slice &user_key, const CommonRangeLexSpec &spec,
std::vector<FieldValue> *field_values);
rocksdb::Status RangeByLex(const Slice &user_key, const RangeLexSpec &spec, std::vector<FieldValue> *field_values);
rocksdb::Status MGet(const Slice &user_key, const std::vector<Slice> &fields, std::vector<std::string> *values,
std::vector<rocksdb::Status> *statuses);
rocksdb::Status GetAll(const Slice &user_key, std::vector<FieldValue> *field_values,
Expand Down
Loading

0 comments on commit c985137

Please sign in to comment.