Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
dc907d2
Add: finish top_k reserve and info implementation
abnbb Oct 23, 2025
ef57197
fix: topk data structure
hidedim Oct 29, 2025
cdb512e
add: cpp test of top_k data structure.
hidedim Oct 30, 2025
d5d2a9d
fix: clang-format code
hidedim Oct 31, 2025
85ec644
Revert "fix: clang-format code"
hidedim Oct 31, 2025
e0882f2
feat: support topk.incrby command
hidedim Nov 13, 2025
4771553
fix: clang-format code
hidedim Nov 13, 2025
b4771e6
fix: issue for conflict
hidedim Nov 17, 2025
dd672ce
Fix typo in comment about topk removal
aleksraiden Nov 20, 2025
3a22be6
Make TopKMetadata constructor explicit
aleksraiden Nov 20, 2025
49b9576
fix clang code format
hidedim Nov 20, 2025
2c8eea9
fix clang code format
hidedim Nov 21, 2025
2591db5
fix topk test
hidedim Nov 21, 2025
f3e1480
fix topk heap_size type
hidedim Nov 21, 2025
d2445d3
fix delete[] in topk.cc
hidedim Nov 23, 2025
80127af
Merge branch 'unstable' into topk
aleksraiden Nov 23, 2025
a7c51db
fix(info): the slave lag in INFO command might be overflowed (#3271)
greatsharp Nov 27, 2025
e939165
fix(ci): the sonar c/cpp action has been deprecated (#3280)
git-hulk Dec 1, 2025
cc9e6bd
fix(ci): use the wrong string in sonar action arguments (#3284)
git-hulk Dec 1, 2025
8127eb6
chore(deps): bump jsoncons to v1.5.0 (#3287)
aleksraiden Dec 4, 2025
74f4a51
chore(deps): Bump zlib-ng to 2.3.2 (#3288)
aleksraiden Dec 4, 2025
645e1a3
chore(ci): bump Golang lint to 2.7.0 (#3289)
aleksraiden Dec 4, 2025
5f6095a
Add: finish top_k reserve and info implementation
abnbb Oct 23, 2025
9ea97f3
fix: topk data structure
hidedim Oct 29, 2025
5c5405e
add: cpp test of top_k data structure.
hidedim Oct 30, 2025
dc30972
fix: clang-format code
hidedim Oct 31, 2025
43b4bb8
Revert "fix: clang-format code"
hidedim Oct 31, 2025
2eb92ce
feat: support topk.incrby command
hidedim Nov 13, 2025
2243057
fix: clang-format code
hidedim Nov 13, 2025
1f8dd39
fix: issue for conflict
hidedim Nov 17, 2025
5c3f3d2
Fix typo in comment about topk removal
aleksraiden Nov 20, 2025
a672327
Make TopKMetadata constructor explicit
aleksraiden Nov 20, 2025
0b8cd39
fix clang code format
hidedim Nov 20, 2025
1df6e40
fix clang code format
hidedim Nov 21, 2025
09f8db3
fix topk test
hidedim Nov 21, 2025
eb79670
fix topk heap_size type
hidedim Nov 21, 2025
87d1add
fix delete[] in topk.cc
hidedim Nov 23, 2025
1d03d59
fix: memory leaks
hidedim Nov 24, 2025
c7c00bb
feat: optimize TopK heapify with dirty tracking
hidedim Dec 8, 2025
138d51d
Merge branch 'topk' of https://github.com/hidedim/kvrocks into topk
hidedim Dec 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/kvrocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ jobs:
run: pip install gcovr==5.0 # 5.1 is not supported
if: ${{ matrix.sonarcloud }}

- name: Install sonar-scanner and build-wrapper
uses: SonarSource/sonarcloud-github-c-cpp@v3
- name: Install Build Wrapper
uses: SonarSource/sonarqube-scan-action/[email protected]
if: ${{ matrix.sonarcloud }}

- name: Build Kvrocks
Expand Down
32 changes: 18 additions & 14 deletions .github/workflows/sonar.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ jobs:
repository: ${{ github.event.workflow_run.head_repository.full_name }}
ref: ${{ github.event.workflow_run.head_sha }}
fetch-depth: 0
- name: Install sonar-scanner and build-wrapper
uses: SonarSource/sonarcloud-github-c-cpp@v3
- name: Install Build Wrapper
uses: SonarSource/sonarqube-scan-action/[email protected]
- name: 'Download code coverage'
uses: actions/github-script@v7
with:
Expand Down Expand Up @@ -66,18 +66,22 @@ jobs:
with:
python-version: 3.x

- name: Run sonar-scanner
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONARCLOUD_TOKEN }}
- name: Extract PR number
run: |
PR_NUMBER=$(jq -r '.number | select (.!=null)' sonarcloud-data/github-event.json)
echo "The PR number is $PR_NUMBER"
echo "PR_NUMBER=$PR_NUMBER" >> "$GITHUB_ENV"
echo "The PR number is ${PR_NUMBER:-<none>}"

sonar-scanner \
--define sonar.cfamily.build-wrapper-output="sonarcloud-data" \
--define sonar.coverageReportPaths=sonarcloud-data/coverage.xml \
--define sonar.projectKey=apache_kvrocks \
--define sonar.organization=apache \
--define sonar.scm.revision=${{ github.event.workflow_run.head_sha }} \
--define sonar.pullrequest.key=$PR_NUMBER
- name: SonarQube Scan
uses: SonarSource/[email protected]
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONARCLOUD_TOKEN }}
with:
args: >
-Dsonar.cfamily.build-wrapper-output=sonarcloud-data
-Dsonar.coverageReportPaths=sonarcloud-data/coverage.xml
-Dsonar.projectKey=apache_kvrocks
-Dsonar.organization=apache
-Dsonar.scm.revision=${{ github.event.workflow_run.head_sha }}
-Dsonar.pullrequest.key=${{ env.PR_NUMBER }}
4 changes: 2 additions & 2 deletions cmake/jsoncons.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ include_guard()
include(cmake/utils.cmake)

FetchContent_DeclareGitHubWithMirror(jsoncons
danielaparker/jsoncons v1.4.3
MD5=62dad69488c5618f56283ef14d6c1e16
danielaparker/jsoncons v1.5.0
MD5=34fabe18f29c4e5c514eaefb5bb50d09
)

FetchContent_MakeAvailableWithArgs(jsoncons
Expand Down
4 changes: 2 additions & 2 deletions cmake/modules/FindZLIB.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ if(zlib_SOURCE_DIR)

add_library(zlib_with_headers INTERFACE) # rocksdb use it
target_include_directories(zlib_with_headers INTERFACE $<BUILD_INTERFACE:${zlib_SOURCE_DIR}> $<BUILD_INTERFACE:${zlib_BINARY_DIR}>)
target_link_libraries(zlib_with_headers INTERFACE zlib)
target_link_libraries(zlib_with_headers INTERFACE zlib-ng)
add_library(ZLIB::ZLIB ALIAS zlib_with_headers)
install(TARGETS zlib zlib_with_headers EXPORT RocksDBTargets) # export for install(...)
install(TARGETS zlib-ng zlib_with_headers EXPORT RocksDBTargets) # export for install(...)
endif()
4 changes: 2 additions & 2 deletions cmake/zlib.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ include_guard()
include(cmake/utils.cmake)

FetchContent_DeclareGitHubWithMirror(zlib
zlib-ng/zlib-ng 2.2.4
MD5=9fbaac3919af8d5a0ad5726ef9c7c30b
zlib-ng/zlib-ng 2.3.2
MD5=7818ea3f3ad80873674faf500fd12a0d
)

FetchContent_MakeAvailableWithArgs(zlib
Expand Down
246 changes: 246 additions & 0 deletions src/commands/cmd_topk.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#include "command_parser.h"
#include "commander.h"
#include "error_constants.h"
#include "server/server.h"
#include "types/redis_topk.h"

namespace {
constexpr const char *errBadK = "Bad K";
constexpr const char *errBadWidth = "Bad width";
constexpr const char *errBadDepth = "Bad depth";
constexpr const char *errBadDecay = "Bad decay";
constexpr const char *errInvalidDecay = "Decay must be between 0 and 1";
} // namespace

namespace redis {

class CommandTopKReserve final : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
if (args.size() < 3) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
auto parse_k = ParseInt<uint32_t>(args[2], 10);
if (!parse_k) {
return {Status::RedisParseErr, errBadK};
}
k_ = *parse_k;
if (args_.size() >= 4) {
auto parse_width = ParseInt<uint32_t>(args[3], 10);
if (!parse_width) {
return {Status::RedisParseErr, errBadWidth};
}
width_ = *parse_width;
}
if (args_.size() >= 5) {
auto parse_depth = ParseInt<uint32_t>(args[4], 10);
if (!parse_depth) {
return {Status::RedisParseErr, errBadDepth};
}
depth_ = *parse_depth;
}
if (args_.size() >= 6) {
auto parse_decay = ParseFloat<double>(args[5]);
if (!parse_decay) {
return {Status::RedisParseErr, errBadDecay};
}
decay_ = *parse_decay;
if (decay_ <= 0.0 || decay_ >= 1.0) {
return {Status::RedisParseErr, errInvalidDecay};
}
}
if (args_.size() > 6) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
return Status::OK();
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::TopK topk(srv->storage, conn->GetNamespace());

auto s = topk.Reserve(ctx, args_[1], k_, width_, depth_, decay_);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::RESP_OK;
return Status::OK();
}

private:
uint32_t k_;
uint32_t width_ = 7;
uint32_t depth_ = 8;
double decay_ = 0.9;
};

class CommandTopKAdd final : public Commander {
public:
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::TopK topk(srv->storage, conn->GetNamespace());
CHECK(args_.size() == 3);

auto s = topk.Add(ctx, args_[1], args_[2]);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = redis::RESP_OK;
return Status::OK();
}
};

class CommandTopKIncrBy final : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
if (args_.size() != 4) {
return {Status::InvalidArgument, "invalid argument"};
}
auto parse_incr = ParseInt<uint32_t>(args[3], 10);
if (!parse_incr) {
return {Status::InvalidArgument, "invalid argument"};
}
incr_ = *parse_incr;
return Status::OK();
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::TopK topk(srv->storage, conn->GetNamespace());
CHECK(args_.size() == 4);

auto s = topk.IncrBy(ctx, args_[1], args_[2], incr_);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = redis::RESP_OK;
return Status::OK();
}

private:
uint32_t incr_;
};

class CommandTopKList final : public Commander {
public:
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::TopK topk(srv->storage, conn->GetNamespace());
CHECK(args_.size() == 2);

std::vector<std::string> items;
auto s = topk.List(ctx, args_[1], items);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = MultiBulkString(redis::RESP::v2, items);
return Status::OK();
}
};

class CommandTopKInfo final : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
if (args.size() > 3) {
return {Status::InvalidArgument, errWrongNumOfArguments};
}

CommandParser parser(args, 2);
if (parser.Good()) {
if (args.size() == 3) {
if (args[2] == "topk") {
type_ = TopKInfoType::kTopK;
} else if (args[2] == "width") {
type_ = TopKInfoType::kWidth;
} else if (args[2] == "depth") {
type_ = TopKInfoType::kDepth;
} else if (args[2] == "decay") {
type_ = TopKInfoType::kDecay;
} else {
return {Status::InvalidArgument, "Invalid info type"};
}
}
}

return Commander::Parse(args);
}

Status Execute(engine::Context &ctx, Server *srv, Connection *conn, [[maybe_unused]] std::string *output) override {
redis::TopK topk_db(srv->storage, conn->GetNamespace());
TopKInfo info;

auto s = topk_db.Info(ctx, args_[1], &info);
if (s.IsNotFound()) return {Status::RedisExecErr, "key does not exist"};
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

switch (type_) {
case TopKInfoType::kAll:
*output = redis::MultiLen(2 * 4);
*output += redis::SimpleString("K");
*output += redis::Integer(info.k);
*output += redis::SimpleString("Width");
*output += redis::Integer(info.width);
*output += redis::SimpleString("Depth");
*output += redis::Integer(info.depth);
*output += redis::SimpleString("Decay");
*output += redis::Double(redis::RESP::v2, info.decay);
break;
case TopKInfoType::kTopK:
*output = redis::Integer(info.k);
break;
case TopKInfoType::kWidth:
*output = redis::Integer(info.width);
break;
case TopKInfoType::kDepth:
*output = redis::Integer(info.depth);
break;
case TopKInfoType::kDecay:
*output = redis::Double(redis::RESP::v2, info.decay);
break;
}
return Status::OK();
}

private:
TopKInfoType type_ = TopKInfoType::kAll;
};

class CommandTopKQuery final : public Commander {
public:
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
redis::TopK topk(srv->storage, conn->GetNamespace());
CHECK(args_.size() == 3);

bool is_exists = false;
auto s = topk.Query(ctx, args_[1], args_[2], &is_exists);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = redis::Bool(redis::RESP::v2, is_exists);
return Status::OK();
}
};

REDIS_REGISTER_COMMANDS(TopK, MakeCmdAttr<CommandTopKAdd>("topk.add", 3, "write", 1, 1, 1),
MakeCmdAttr<CommandTopKList>("topk.list", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTopKInfo>("topk.info", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTopKQuery>("topk.query", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTopKReserve>("topk.reserve", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandTopKIncrBy>("topk.incrby", 4, "write", 1, 1, 1));

} // namespace redis
1 change: 1 addition & 0 deletions src/commands/commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ enum class CommandCategory : uint8_t {
Txn,
ZSet,
Timeseries,
TopK,
// this is a special category for disabling commands,
// basically can be used for version releasing or debugging
Disabled,
Expand Down
9 changes: 5 additions & 4 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1215,24 +1215,25 @@ Server::InfoEntries Server::GetReplicationInfo() {
}

int idx = 0;
rocksdb::SequenceNumber latest_seq = storage->LatestSeqNumber();

{
std::shared_lock<std::shared_mutex> guard(slave_threads_mu_);
entries.emplace_back("connected_slaves", slave_threads_.size());
rocksdb::SequenceNumber latest_seq = storage->LatestSeqNumber();
for (const auto &slave : slave_threads_) {
if (slave->IsStopped()) continue;

auto slave_ack_seq = slave->GetAckSeq();
entries.emplace_back(
"slave" + std::to_string(idx),
fmt::format("ip={},port={},offset={},lag={}", slave->GetConn()->GetAnnounceIP(),
slave->GetConn()->GetAnnouncePort(), slave->GetAckSeq(), latest_seq - slave->GetAckSeq()));
slave->GetConn()->GetAnnouncePort(), slave_ack_seq >= latest_seq ? latest_seq : slave_ack_seq,
slave_ack_seq >= latest_seq ? 0 : latest_seq - slave_ack_seq));
++idx;
}
entries.emplace_back("master_repl_offset", latest_seq);
}

entries.emplace_back("master_repl_offset", latest_seq);

return entries;
}

Expand Down
Loading
Loading