diff --git a/.github/workflows/kvrocks.yaml b/.github/workflows/kvrocks.yaml index af40ea450a8..34ae3279020 100644 --- a/.github/workflows/kvrocks.yaml +++ b/.github/workflows/kvrocks.yaml @@ -329,8 +329,8 @@ jobs: run: pip install gcovr==5.0 # 5.1 is not supported if: ${{ matrix.sonarcloud }} - - name: Install sonar-scanner and build-wrapper - uses: SonarSource/sonarcloud-github-c-cpp@v3 + - name: Install Build Wrapper + uses: SonarSource/sonarqube-scan-action/install-build-wrapper@v6.0.0 if: ${{ matrix.sonarcloud }} - name: Build Kvrocks diff --git a/.github/workflows/sonar.yaml b/.github/workflows/sonar.yaml index d9ad8feffa7..d7b5c3be2c3 100644 --- a/.github/workflows/sonar.yaml +++ b/.github/workflows/sonar.yaml @@ -33,8 +33,8 @@ jobs: repository: ${{ github.event.workflow_run.head_repository.full_name }} ref: ${{ github.event.workflow_run.head_sha }} fetch-depth: 0 - - name: Install sonar-scanner and build-wrapper - uses: SonarSource/sonarcloud-github-c-cpp@v3 + - name: Install Build Wrapper + uses: SonarSource/sonarqube-scan-action/install-build-wrapper@v6.0.0 - name: 'Download code coverage' uses: actions/github-script@v7 with: @@ -66,18 +66,22 @@ jobs: with: python-version: 3.x - - name: Run sonar-scanner - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - SONAR_TOKEN: ${{ secrets.SONARCLOUD_TOKEN }} + - name: Extract PR number run: | PR_NUMBER=$(jq -r '.number | select (.!=null)' sonarcloud-data/github-event.json) - echo "The PR number is $PR_NUMBER" + echo "PR_NUMBER=$PR_NUMBER" >> "$GITHUB_ENV" + echo "The PR number is ${PR_NUMBER:-}" - sonar-scanner \ - --define sonar.cfamily.build-wrapper-output="sonarcloud-data" \ - --define sonar.coverageReportPaths=sonarcloud-data/coverage.xml \ - --define sonar.projectKey=apache_kvrocks \ - --define sonar.organization=apache \ - --define sonar.scm.revision=${{ github.event.workflow_run.head_sha }} \ - --define sonar.pullrequest.key=$PR_NUMBER + - name: SonarQube Scan + uses: SonarSource/sonarqube-scan-action@v6.0.0 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + SONAR_TOKEN: ${{ secrets.SONARCLOUD_TOKEN }} + with: + args: > + -Dsonar.cfamily.build-wrapper-output=sonarcloud-data + -Dsonar.coverageReportPaths=sonarcloud-data/coverage.xml + -Dsonar.projectKey=apache_kvrocks + -Dsonar.organization=apache + -Dsonar.scm.revision=${{ github.event.workflow_run.head_sha }} + -Dsonar.pullrequest.key=${{ env.PR_NUMBER }} diff --git a/cmake/jsoncons.cmake b/cmake/jsoncons.cmake index d29a3a8ce5b..707af3c4912 100644 --- a/cmake/jsoncons.cmake +++ b/cmake/jsoncons.cmake @@ -20,8 +20,8 @@ include_guard() include(cmake/utils.cmake) FetchContent_DeclareGitHubWithMirror(jsoncons - danielaparker/jsoncons v1.4.3 - MD5=62dad69488c5618f56283ef14d6c1e16 + danielaparker/jsoncons v1.5.0 + MD5=34fabe18f29c4e5c514eaefb5bb50d09 ) FetchContent_MakeAvailableWithArgs(jsoncons diff --git a/cmake/modules/FindZLIB.cmake b/cmake/modules/FindZLIB.cmake index 53c2269be3c..27579eb28ef 100644 --- a/cmake/modules/FindZLIB.cmake +++ b/cmake/modules/FindZLIB.cmake @@ -22,7 +22,7 @@ if(zlib_SOURCE_DIR) add_library(zlib_with_headers INTERFACE) # rocksdb use it target_include_directories(zlib_with_headers INTERFACE $ $) - target_link_libraries(zlib_with_headers INTERFACE zlib) + target_link_libraries(zlib_with_headers INTERFACE zlib-ng) add_library(ZLIB::ZLIB ALIAS zlib_with_headers) - install(TARGETS zlib zlib_with_headers EXPORT RocksDBTargets) # export for install(...) + install(TARGETS zlib-ng zlib_with_headers EXPORT RocksDBTargets) # export for install(...) endif() diff --git a/cmake/zlib.cmake b/cmake/zlib.cmake index bc7ef713ff4..341dfc58c69 100644 --- a/cmake/zlib.cmake +++ b/cmake/zlib.cmake @@ -20,8 +20,8 @@ include_guard() include(cmake/utils.cmake) FetchContent_DeclareGitHubWithMirror(zlib - zlib-ng/zlib-ng 2.2.4 - MD5=9fbaac3919af8d5a0ad5726ef9c7c30b + zlib-ng/zlib-ng 2.3.2 + MD5=7818ea3f3ad80873674faf500fd12a0d ) FetchContent_MakeAvailableWithArgs(zlib diff --git a/src/commands/cmd_topk.cc b/src/commands/cmd_topk.cc new file mode 100644 index 00000000000..4375c555889 --- /dev/null +++ b/src/commands/cmd_topk.cc @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "command_parser.h" +#include "commander.h" +#include "error_constants.h" +#include "server/server.h" +#include "types/redis_topk.h" + +namespace { +constexpr const char *errBadK = "Bad K"; +constexpr const char *errBadWidth = "Bad width"; +constexpr const char *errBadDepth = "Bad depth"; +constexpr const char *errBadDecay = "Bad decay"; +constexpr const char *errInvalidDecay = "Decay must be between 0 and 1"; +} // namespace + +namespace redis { + +class CommandTopKReserve final : public Commander { + public: + Status Parse(const std::vector &args) override { + if (args.size() < 3) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + auto parse_k = ParseInt(args[2], 10); + if (!parse_k) { + return {Status::RedisParseErr, errBadK}; + } + k_ = *parse_k; + if (args_.size() >= 4) { + auto parse_width = ParseInt(args[3], 10); + if (!parse_width) { + return {Status::RedisParseErr, errBadWidth}; + } + width_ = *parse_width; + } + if (args_.size() >= 5) { + auto parse_depth = ParseInt(args[4], 10); + if (!parse_depth) { + return {Status::RedisParseErr, errBadDepth}; + } + depth_ = *parse_depth; + } + if (args_.size() >= 6) { + auto parse_decay = ParseFloat(args[5]); + if (!parse_decay) { + return {Status::RedisParseErr, errBadDecay}; + } + decay_ = *parse_decay; + if (decay_ <= 0.0 || decay_ >= 1.0) { + return {Status::RedisParseErr, errInvalidDecay}; + } + } + if (args_.size() > 6) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + return Status::OK(); + } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + redis::TopK topk(srv->storage, conn->GetNamespace()); + + auto s = topk.Reserve(ctx, args_[1], k_, width_, depth_, decay_); + if (!s.ok()) return {Status::RedisExecErr, s.ToString()}; + + *output = redis::RESP_OK; + return Status::OK(); + } + + private: + uint32_t k_; + uint32_t width_ = 7; + uint32_t depth_ = 8; + double decay_ = 0.9; +}; + +class CommandTopKAdd final : public Commander { + public: + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + redis::TopK topk(srv->storage, conn->GetNamespace()); + CHECK(args_.size() == 3); + + auto s = topk.Add(ctx, args_[1], args_[2]); + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + *output = redis::RESP_OK; + return Status::OK(); + } +}; + +class CommandTopKIncrBy final : public Commander { + public: + Status Parse(const std::vector &args) override { + if (args_.size() != 4) { + return {Status::InvalidArgument, "invalid argument"}; + } + auto parse_incr = ParseInt(args[3], 10); + if (!parse_incr) { + return {Status::InvalidArgument, "invalid argument"}; + } + incr_ = *parse_incr; + return Status::OK(); + } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + redis::TopK topk(srv->storage, conn->GetNamespace()); + CHECK(args_.size() == 4); + + auto s = topk.IncrBy(ctx, args_[1], args_[2], incr_); + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + *output = redis::RESP_OK; + return Status::OK(); + } + + private: + uint32_t incr_; +}; + +class CommandTopKList final : public Commander { + public: + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + redis::TopK topk(srv->storage, conn->GetNamespace()); + CHECK(args_.size() == 2); + + std::vector items; + auto s = topk.List(ctx, args_[1], items); + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + *output = MultiBulkString(redis::RESP::v2, items); + return Status::OK(); + } +}; + +class CommandTopKInfo final : public Commander { + public: + Status Parse(const std::vector &args) override { + if (args.size() > 3) { + return {Status::InvalidArgument, errWrongNumOfArguments}; + } + + CommandParser parser(args, 2); + if (parser.Good()) { + if (args.size() == 3) { + if (args[2] == "topk") { + type_ = TopKInfoType::kTopK; + } else if (args[2] == "width") { + type_ = TopKInfoType::kWidth; + } else if (args[2] == "depth") { + type_ = TopKInfoType::kDepth; + } else if (args[2] == "decay") { + type_ = TopKInfoType::kDecay; + } else { + return {Status::InvalidArgument, "Invalid info type"}; + } + } + } + + return Commander::Parse(args); + } + + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, [[maybe_unused]] std::string *output) override { + redis::TopK topk_db(srv->storage, conn->GetNamespace()); + TopKInfo info; + + auto s = topk_db.Info(ctx, args_[1], &info); + if (s.IsNotFound()) return {Status::RedisExecErr, "key does not exist"}; + if (!s.ok()) return {Status::RedisExecErr, s.ToString()}; + + switch (type_) { + case TopKInfoType::kAll: + *output = redis::MultiLen(2 * 4); + *output += redis::SimpleString("K"); + *output += redis::Integer(info.k); + *output += redis::SimpleString("Width"); + *output += redis::Integer(info.width); + *output += redis::SimpleString("Depth"); + *output += redis::Integer(info.depth); + *output += redis::SimpleString("Decay"); + *output += redis::Double(redis::RESP::v2, info.decay); + break; + case TopKInfoType::kTopK: + *output = redis::Integer(info.k); + break; + case TopKInfoType::kWidth: + *output = redis::Integer(info.width); + break; + case TopKInfoType::kDepth: + *output = redis::Integer(info.depth); + break; + case TopKInfoType::kDecay: + *output = redis::Double(redis::RESP::v2, info.decay); + break; + } + return Status::OK(); + } + + private: + TopKInfoType type_ = TopKInfoType::kAll; +}; + +class CommandTopKQuery final : public Commander { + public: + Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { + redis::TopK topk(srv->storage, conn->GetNamespace()); + CHECK(args_.size() == 3); + + bool is_exists = false; + auto s = topk.Query(ctx, args_[1], args_[2], &is_exists); + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + *output = redis::Bool(redis::RESP::v2, is_exists); + return Status::OK(); + } +}; + +REDIS_REGISTER_COMMANDS(TopK, MakeCmdAttr("topk.add", 3, "write", 1, 1, 1), + MakeCmdAttr("topk.list", 2, "read-only", 1, 1, 1), + MakeCmdAttr("topk.info", 2, "read-only", 1, 1, 1), + MakeCmdAttr("topk.query", 3, "read-only", 1, 1, 1), + MakeCmdAttr("topk.reserve", -3, "write", 1, 1, 1), + MakeCmdAttr("topk.incrby", 4, "write", 1, 1, 1)); + +} // namespace redis \ No newline at end of file diff --git a/src/commands/commander.h b/src/commands/commander.h index 3f38db02580..55607f987f3 100644 --- a/src/commands/commander.h +++ b/src/commands/commander.h @@ -116,6 +116,7 @@ enum class CommandCategory : uint8_t { Txn, ZSet, Timeseries, + TopK, // this is a special category for disabling commands, // basically can be used for version releasing or debugging Disabled, diff --git a/src/server/server.cc b/src/server/server.cc index d16e756fadb..a952cdadd28 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -1215,24 +1215,25 @@ Server::InfoEntries Server::GetReplicationInfo() { } int idx = 0; - rocksdb::SequenceNumber latest_seq = storage->LatestSeqNumber(); { std::shared_lock guard(slave_threads_mu_); entries.emplace_back("connected_slaves", slave_threads_.size()); + rocksdb::SequenceNumber latest_seq = storage->LatestSeqNumber(); for (const auto &slave : slave_threads_) { if (slave->IsStopped()) continue; + auto slave_ack_seq = slave->GetAckSeq(); entries.emplace_back( "slave" + std::to_string(idx), fmt::format("ip={},port={},offset={},lag={}", slave->GetConn()->GetAnnounceIP(), - slave->GetConn()->GetAnnouncePort(), slave->GetAckSeq(), latest_seq - slave->GetAckSeq())); + slave->GetConn()->GetAnnouncePort(), slave_ack_seq >= latest_seq ? latest_seq : slave_ack_seq, + slave_ack_seq >= latest_seq ? 0 : latest_seq - slave_ack_seq)); ++idx; } + entries.emplace_back("master_repl_offset", latest_seq); } - entries.emplace_back("master_repl_offset", latest_seq); - return entries; } diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc index 692f8804db1..7040776f876 100644 --- a/src/storage/redis_metadata.cc +++ b/src/storage/redis_metadata.cc @@ -334,7 +334,7 @@ bool Metadata::IsSingleKVType() const { return Type() == kRedisString || Type() bool Metadata::IsEmptyableType() const { return IsSingleKVType() || Type() == kRedisStream || Type() == kRedisBloomFilter || Type() == kRedisHyperLogLog || - Type() == kRedisTDigest || Type() == kRedisTimeSeries; + Type() == kRedisTDigest || Type() == kRedisTimeSeries || Type() == kRedisTopK; } bool Metadata::Expired() const { return ExpireAt(util::GetTimeStampMS()); } @@ -569,3 +569,27 @@ rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) { return rocksdb::Status::OK(); } + +void TopKMetadata::Encode(std::string *dst) const { + Metadata::Encode(dst); + PutFixed32(dst, top_k); + PutFixed16(dst, width); + PutFixed32(dst, depth); + PutDouble(dst, decay); +} + +rocksdb::Status TopKMetadata::Decode(Slice *input) { + if (auto s = Metadata::Decode(input); !s.ok()) { + return s; + } + if (input->size() < sizeof(double) + sizeof(uint32_t) * 2 + sizeof(uint16_t)) { + return rocksdb::Status::InvalidArgument(kErrMetadataTooShort); + } + + GetFixed32(input, &top_k); + GetFixed16(input, &width); + GetFixed32(input, &depth); + GetDouble(input, &decay); + + return rocksdb::Status::OK(); +} \ No newline at end of file diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h index fd80e5a5ba0..2db0a422ab7 100644 --- a/src/storage/redis_metadata.h +++ b/src/storage/redis_metadata.h @@ -54,12 +54,13 @@ enum RedisType : uint8_t { kRedisHyperLogLog = 11, kRedisTDigest = 12, kRedisTimeSeries = 13, + kRedisTopK = 14, kRedisTypeMax }; inline constexpr const std::array RedisTypeNames = { - "none", "string", "hash", "list", "set", "zset", "bitmap", - "sortedint", "stream", "MBbloom--", "ReJSON-RL", "hyperloglog", "TDIS-TYPE", "timeseries"}; + "none", "string", "hash", "list", "set", "zset", "bitmap", "sortedint", + "stream", "MBbloom--", "ReJSON-RL", "hyperloglog", "TDIS-TYPE", "timeseries", "topk"}; struct RedisTypes { RedisTypes(std::initializer_list list) { @@ -409,3 +410,20 @@ class TimeSeriesMetadata : public Metadata { void Encode(std::string *dst) const override; rocksdb::Status Decode(Slice *input) override; }; + +class TopKMetadata : public Metadata { + public: + uint32_t top_k; + uint16_t width; + uint32_t depth; + double decay; + + explicit TopKMetadata(bool generate_version = true) : Metadata(kRedisTopK, generate_version) {} + + explicit TopKMetadata(uint64_t top_k, uint64_t width = 7, uint64_t depth = 8, double decay = 0.9, + bool generate_version = true) + : Metadata(kRedisTopK, generate_version), top_k(top_k), width(width), depth(depth), decay(decay) {} + + void Encode(std::string *dst) const override; + rocksdb::Status Decode(Slice *input) override; +}; diff --git a/src/types/redis_topk.cc b/src/types/redis_topk.cc new file mode 100644 index 00000000000..983505e3b91 --- /dev/null +++ b/src/types/redis_topk.cc @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "redis_topk.h" + +#include "commands/ttl_util.h" +#include "topk.h" + +namespace redis { + +rocksdb::Status TopK::Reserve(engine::Context &ctx, const Slice &user_key, uint32_t k, uint32_t width, uint32_t depth, + double decay) { + std::string ns_key = AppendNamespacePrefix(user_key); + + TopKMetadata topk_metadata; + rocksdb::Status s = getTopKMetadata(ctx, ns_key, &topk_metadata); + if (!s.ok() && !s.IsNotFound()) return s; + if (!s.IsNotFound()) { + return rocksdb::Status::InvalidArgument("TopK already exists"); + } + + return createTopK(ctx, ns_key, k, width, depth, decay, &topk_metadata); +} + +rocksdb::Status TopK::Add(engine::Context &ctx, const Slice &user_key, const Slice &items) { + return IncrBy(ctx, user_key, items, 1); +} + +rocksdb::Status TopK::IncrBy(engine::Context &ctx, const Slice &user_key, const Slice &items, uint32_t incr) { + std::string ns_key = AppendNamespacePrefix(user_key); + + TopKMetadata topk_metadata; + rocksdb::Status s = getTopKMetadata(ctx, ns_key, &topk_metadata); + if (!s.ok()) return s; + + auto batch = storage_->GetWriteBatchBase(); + WriteBatchLogData log_data(kRedisTopK, {"IncrBy"}); + s = batch->PutLogData(log_data.Encode()); + if (!s.ok()) return s; + + BlockSplitTopK topk(topk_metadata.top_k, topk_metadata.width, topk_metadata.depth, topk_metadata.decay); + s = getTopKData(ctx, ns_key, topk_metadata, &topk); + if (!s.ok()) return s; + + std::vector is_dirty_buckets(topk_metadata.width * topk_metadata.depth, false); + std::vector is_dirty_heaps(topk_metadata.top_k, false); + topk.Add(items.data_, incr, is_dirty_buckets, is_dirty_heaps); + + s = setTopkData(ctx, ns_key, topk_metadata, topk, is_dirty_buckets, is_dirty_heaps); + if (!s.ok()) return s; + + return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + +rocksdb::Status TopK::Query(engine::Context &ctx, const Slice &user_key, const Slice &items, bool *exists) { + std::string ns_key = AppendNamespacePrefix(user_key); + + TopKMetadata topk_metadata; + rocksdb::Status s = getTopKMetadata(ctx, ns_key, &topk_metadata); + if (!s.ok()) return s; + + BlockSplitTopK topk(topk_metadata.top_k, topk_metadata.width, topk_metadata.depth, topk_metadata.decay); + s = getTopKData(ctx, ns_key, topk_metadata, &topk); + if (!s.ok()) return s; + + *exists = topk.Query(items.data_); + + return rocksdb::Status::OK(); +} + +rocksdb::Status TopK::List(engine::Context &ctx, const Slice &user_key, std::vector &items) { + std::string ns_key = AppendNamespacePrefix(user_key); + + TopKMetadata topk_metadata; + rocksdb::Status s = getTopKMetadata(ctx, ns_key, &topk_metadata); + if (!s.ok()) return s; + + BlockSplitTopK topk(topk_metadata.top_k, topk_metadata.width, topk_metadata.depth, topk_metadata.decay); + s = getTopKData(ctx, ns_key, topk_metadata, &topk); + if (!s.ok()) return s; + + auto heap_buckets = topk.List(); + for (auto &bucket : heap_buckets) { + items.emplace_back(bucket.item); + } + + return rocksdb::Status::OK(); +} + +rocksdb::Status TopK::Info(engine::Context &ctx, const Slice &user_key, TopKInfo *info) { + std::string ns_key = AppendNamespacePrefix(user_key); + + TopKMetadata metadata; + auto s = getTopKMetadata(ctx, ns_key, &metadata); + if (!s.ok()) return s; + + info->k = metadata.top_k; + info->width = metadata.width; + info->depth = metadata.depth; + info->decay = metadata.decay; + + return rocksdb::Status::OK(); +} + +rocksdb::Status TopK::getTopKMetadata(engine::Context &ctx, const Slice &ns_key, TopKMetadata *metadata) { + return Database::GetMetadata(ctx, {kRedisTopK}, ns_key, metadata); +} + +rocksdb::Status TopK::createTopK(engine::Context &ctx, const Slice &ns_key, uint32_t k, uint32_t width, uint32_t depth, + double decay, TopKMetadata *metadata) { + metadata->top_k = k; + metadata->width = width; + metadata->depth = depth; + metadata->decay = decay; + + BlockSplitTopK block_split_top_k(k, width, depth, decay); + + auto batch = storage_->GetWriteBatchBase(); + WriteBatchLogData log_data(kRedisTopK, {"createTopK"}); + auto s = batch->PutLogData(log_data.Encode()); + if (!s.ok()) return s; + + std::string top_k_meta_bytes; + metadata->Encode(&top_k_meta_bytes); + s = batch->Put(metadata_cf_handle_, ns_key, top_k_meta_bytes); + if (!s.ok()) return s; + + // is dirty vector to optimize writes + std::vector is_dirty_buckets(width * depth, true); + std::vector is_dirty_heaps(k, true); + s = setTopkData(ctx, ns_key, *metadata, block_split_top_k, is_dirty_buckets, is_dirty_heaps); + if (!s.ok()) return s; + + return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + +rocksdb::Status TopK::getTopKData(engine::Context &ctx, const Slice &ns_key, const TopKMetadata &metadata, + BlockSplitTopK *topk) { + for (uint8_t i = 0; i < 3; i++) { + if (i == 0) { + // get buckets of topk structure + for (uint32_t j = 0; j < metadata.width * metadata.depth; j++) { + for (uint8_t k = 0; k < 2; k++) { + std::string bk_key = getSubKey(ns_key, metadata, i, j, k); + rocksdb::PinnableSlice bk_value; + rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), bk_key, &bk_value); + if (!s.ok()) return s; + + uint32_t dep = j / metadata.width; + uint32_t wid = j % metadata.width; + if (k == 0) { + topk->buckets[dep][wid].fp = static_cast(std::stoul(bk_value.data())); + } else { + topk->buckets[dep][wid].count = static_cast(std::stoul(bk_value.data())); + } + } + } + } else if (i == 1) { + // get heapbucket of topk structure + for (uint32_t j = 0; j < metadata.top_k; j++) { + for (uint8_t k = 0; k < 3; k++) { + std::string hb_key = getSubKey(ns_key, metadata, i, j, k); + rocksdb::PinnableSlice hb_value; + rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), hb_key, &hb_value); + if (!s.ok()) return s; + + if (k == 0) { + topk->heap[j].count = static_cast(std::stoul(hb_value.data())); + } else if (k == 1) { + topk->heap[j].fp = static_cast(std::stoul(hb_value.data())); + } else { + topk->heap[j].item = hb_value.data(); + } + } + } + } else { + std::string tk_key = getTKKey(ns_key, metadata, i); + rocksdb::PinnableSlice pinnable_value; + rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(), tk_key, &pinnable_value); + if (!s.ok()) return s; + topk->heap_size = static_cast(std::stoul(pinnable_value.data())); + } + } + return rocksdb::Status::OK(); +} + +rocksdb::Status TopK::setTopkData(engine::Context &ctx, const Slice &ns_key, const TopKMetadata &metadata, + const BlockSplitTopK &topk, const std::vector &is_dirty_buckets, + const std::vector &is_dirty_heaps) { + auto batch = storage_->GetWriteBatchBase(); + + for (uint8_t i = 0; i < 3; i++) { + if (i == 0) { + for (uint32_t j = 0; j < metadata.width * metadata.depth; j++) { + if (!is_dirty_buckets[j]) { + continue; + } + for (uint32_t k = 0; k < 2; k++) { + std::string sub_key = getSubKey(ns_key, metadata, i, j, k); + std::string sub_value; + uint32_t dep = j / metadata.width; + uint32_t wid = j % metadata.width; + if (k == 0) { + sub_value = std::to_string(topk.buckets[dep][wid].fp); + } else { + sub_value = std::to_string(topk.buckets[dep][wid].count); + } + rocksdb::Status s = batch->Put(sub_key, sub_value); + if (!s.ok()) return s; + } + } + } else if (i == 1) { + for (uint32_t j = 0; j < metadata.top_k; j++) { + if (!is_dirty_heaps[j]) { + continue; + } + for (uint8_t k = 0; k < 3; k++) { + std::string sub_key = getSubKey(ns_key, metadata, i, j, k); + std::string sub_value; + if (k == 0) { + sub_value = std::to_string(topk.heap[j].count); + } else if (k == 1) { + sub_value = std::to_string(topk.heap[j].fp); + } else { + sub_value = topk.heap[j].item; + } + rocksdb::Status s = batch->Put(sub_key, sub_value); + if (!s.ok()) return s; + } + } + } else { + std::string tk_key = getTKKey(ns_key, metadata, i); + std::string tk_value; + tk_value = std::to_string(topk.heap_size); + rocksdb::Status s = batch->Put(tk_key, tk_value); + if (!s.ok()) return s; + } + } + + return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + +std::string TopK::getTKKey(const Slice &ns_key, const TopKMetadata &metadata, uint8_t index) { + std::string sub_key; + PutFixed8(&sub_key, index); + std::string bf_key = InternalKey(ns_key, sub_key, metadata.version, storage_->IsSlotIdEncoded()).Encode(); + return bf_key; +} + +std::string TopK::getSubKey(const Slice &ns_key, const TopKMetadata &metadata, uint8_t topk_index, uint32_t sub_index, + uint8_t index) { + std::string sub_key; + PutFixed8(&sub_key, topk_index); + PutFixed32(&sub_key, sub_index); + PutFixed8(&sub_key, index); + return InternalKey(ns_key, sub_key, metadata.version, storage_->IsSlotIdEncoded()).Encode(); +} + +} // namespace redis \ No newline at end of file diff --git a/src/types/redis_topk.h b/src/types/redis_topk.h new file mode 100644 index 00000000000..089a850446c --- /dev/null +++ b/src/types/redis_topk.h @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include "storage/redis_db.h" +#include "topk.h" + +namespace redis { + +enum class TopKInfoType { kAll, kTopK, kWidth, kDepth, kDecay }; + +struct TopKInfo { + uint32_t k; + uint32_t width; + uint32_t depth; + double decay; +}; + +class TopK : public SubKeyScanner { + public: + using Slice = rocksdb::Slice; + + explicit TopK(engine::Storage *storage, const std::string &ns) : SubKeyScanner(storage, ns) {} + + rocksdb::Status Reserve(engine::Context &ctx, const Slice &user_key, uint32_t k, uint32_t width, uint32_t depth, + double decay); + rocksdb::Status Query(engine::Context &ctx, const Slice &user_key, const Slice &items, bool *exists); + rocksdb::Status Add(engine::Context &ctx, const Slice &user_key, const Slice &items); + rocksdb::Status List(engine::Context &ctx, const Slice &user_key, std::vector &items); + rocksdb::Status Info(engine::Context &ctx, const Slice &user_key, TopKInfo *info); + rocksdb::Status IncrBy(engine::Context &ctx, const Slice &user_key, const Slice &items, uint32_t incr); + + private: + rocksdb::Status getTopKMetadata(engine::Context &ctx, const Slice &ns_key, TopKMetadata *metadata); + rocksdb::Status createTopK(engine::Context &ctx, const Slice &ns_key, uint32_t k, uint32_t width, uint32_t depth, + double decay, TopKMetadata *metadata); + + rocksdb::Status getTopKData(engine::Context &ctx, const Slice &ns_key, const TopKMetadata &metadata, + BlockSplitTopK *topk); + rocksdb::Status setTopkData(engine::Context &ctx, const Slice &ns_key, const TopKMetadata &metadata, + const BlockSplitTopK &topk, const std::vector &is_dirty_buckets, + const std::vector &is_dirty_heaps); + + std::string getTKKey(const Slice &ns_key, const TopKMetadata &metadata, uint8_t index); + + std::string getSubKey(const Slice &ns_key, const TopKMetadata &metadata, uint8_t topk_index, uint32_t sub_index, + uint8_t index); +}; + +} // namespace redis \ No newline at end of file diff --git a/src/types/topk.cc b/src/types/topk.cc new file mode 100644 index 00000000000..d7da238689b --- /dev/null +++ b/src/types/topk.cc @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "topk.h" + +#include +#include +#include +#include +#include + +//----------------------------------------------------------------------------- +// MurmurHash2 was written by Austin Appleby, and is placed in the public +// domain. The author hereby disclaims copyright to this source code. + +// Note - This code makes a few assumptions about how your machine behaves - + +// 1. We can read a 4-byte value from any address without crashing +// 2. sizeof(int) == 4 + +// And it has a few limitations - + +// 1. It will not work incrementally. +// 2. It will not produce the same results on little-endian and big-endian +// machines. + +//----------------------------------------------------------------------------- + +static uint32_t MurmurHash2(const void *key, int len, uint32_t seed) { + // 'm' and 'r' are mixing constants generated offline. + // They're not really 'magic', they just happen to work well. + + const uint32_t m = 0x5bd1e995; + const int r = 24; + + // Initialize the hash to a 'random' value + + uint32_t h = seed ^ len; + + // Mix 4 bytes at a time into the hash + + auto *data = reinterpret_cast(key); + + while (len >= 4) { + uint32_t k = *(uint32_t *)data; + + k *= m; + k ^= k >> r; + k *= m; + + h *= m; + h ^= k; + + data += 4; + len -= 4; + } + + // Handle the last few bytes of the input heap + + switch (len) { + case 3: + h ^= data[2] << 16; + case 2: + h ^= data[1] << 8; + case 1: + h ^= data[0]; + h *= m; + }; + + // Do a few final mixes of the hash to ensure the last few + // bytes are well-incorporated. + + h ^= h >> 13; + h *= m; + h ^= h >> 15; + + return h; +} + +static uint32_t TopkHash(const void *item, int itemlen, uint32_t i) { return MurmurHash2(item, itemlen, i); } +constexpr uint32_t GA = 1919; + +/* ---------------------------------------------------------------------- */ +void BlockSplitTopK::HeapifyDown(int start, std::vector &is_dirty_heaps) { + int child = start; + + // check whether larger than children + if (heap_size < 2 || (heap_size - 2) / 2 < child) { + return; + } + + child = 2 * child + 1; + if ((child + 1) < heap_size && (heap[child].count > heap[child + 1].count)) { + ++child; + } + if (heap[child].count > heap[start].count) { + return; + } + + HeapBucket top = heap[start]; + do { + heap[start] = heap[child]; + is_dirty_heaps[start] = true; + start = child; + + if ((heap_size - 2) / 2 < child) { + break; + } + child = 2 * child + 1; + + if ((child + 1) < heap_size && (heap[child].count > heap[child + 1].count)) { + ++child; + } + } while (heap[child].count < top.count); + heap[start] = top; + is_dirty_heaps[start] = true; +} + +void BlockSplitTopK::HeapifyUp(int start, std::vector &is_dirty_heaps) { + int parent = start; + + // check whether smaller than parent + if (heap_size < 2 || parent == 0) { + return; + } + + parent = (parent - 1) / 2; + if (heap[parent].count > heap[start].count) { + return; + } + + HeapBucket bottom = heap[start]; + do { + heap[start] = heap[parent]; + is_dirty_heaps[start] = true; + start = parent; + + if (start == 0) { + break; + } + parent = (parent - 1) / 2; + } while (heap[parent].count > bottom.count); + heap[start] = bottom; + is_dirty_heaps[start] = true; +} + +int BlockSplitTopK::CheckExistInHeap(const std::string &item) const { + for (int i = heap_size - 1; i >= 0; --i) { + if (heap[i].item == item) { + return i; + } + } + return -1; +} + +int BlockSplitTopK::CmpHeapBucketCount(const HeapBucket &a, const HeapBucket &b) { + return a.count < b.count ? 1 : a.count > b.count ? -1 : 0; +} + +void BlockSplitTopK::Add(const std::string &item, uint32_t increment, std::vector &is_dirty_buckets, + std::vector &is_dirty_heaps) { + uint32_t itemlen = item.size(); + const char *data = item.c_str(); + CounterT max_count = 0; + uint32_t fp = TopkHash(data, (int)itemlen, GA); + + int location = CheckExistInHeap(item); + + for (size_t i = 0; i < depth; ++i) { + uint32_t loc = TopkHash(data, (int)itemlen, i) % width; + + if (buckets[i][loc].count == 0) { + buckets[i][loc].fp = fp; + buckets[i][loc].count = increment; + is_dirty_buckets[i * width + loc] = true; + max_count = std::max(max_count, buckets[i][loc].count); + } else if (buckets[i][loc].fp == fp && location != -1) { + buckets[i][loc].count += increment; + max_count = std::max(max_count, buckets[i][loc].count); + is_dirty_buckets[i * width + loc] = true; + } else { + // decay + uint32_t local_incr = increment; + for (; local_incr > 0; --local_incr) { + double decay = 0.0; + if (buckets[i][loc].count < TOPK_DECAY_LOOKUP_TABLE) { + decay = lookup_table[buckets[i][loc].count]; + } else { + decay = + pow(lookup_table[TOPK_DECAY_LOOKUP_TABLE - 1], (buckets[i][loc].count / (TOPK_DECAY_LOOKUP_TABLE - 1))) * + lookup_table[buckets[i][loc].count % (TOPK_DECAY_LOOKUP_TABLE - 1)]; + } + double chance = rand() / (double)RAND_MAX; + if (chance < decay) { + --buckets[i][loc].count; + if (buckets[i][loc].count == 0) { + buckets[i][loc].fp = fp; + buckets[i][loc].count = 1; + is_dirty_buckets[i * width + loc] = true; + max_count = std::max(max_count, buckets[i][loc].count); + break; + } + } + } + } + } + + if (k == (uint32_t)heap_size) { + if (location == -1) { + if (heap[0].count == max_count || heap[0].count + 1 == max_count) { + heap[0].fp = fp; + heap[0].item = item; + + heap[0].count = max_count; + is_dirty_heaps[0] = true; + HeapifyDown(0, is_dirty_heaps); + } + } else { + heap[location].count += increment; + HeapifyDown(location, is_dirty_heaps); + } + } else { + heap[heap_size].fp = fp; + heap[heap_size].item = item; + heap[heap_size].count = max_count; + + is_dirty_heaps[heap_size] = true; + + HeapifyUp((int)heap_size, is_dirty_heaps); + heap_size++; + } +} + +bool BlockSplitTopK::Query(const std::string &item) const { return CheckExistInHeap(item) != -1; } + +std::vector BlockSplitTopK::List() { + std::vector result(heap_size); + for (int i = 0; i < heap_size; i++) { + result[i] = heap[i]; + } + std::sort(result.begin(), result.end(), + [this](const HeapBucket &a, const HeapBucket &b) { return CmpHeapBucketCount(a, b) > 0; }); + return result; +} \ No newline at end of file diff --git a/src/types/topk.h b/src/types/topk.h new file mode 100644 index 00000000000..2791a8d7dfd --- /dev/null +++ b/src/types/topk.h @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include + +#include +#include +#include +#include +#include + +static constexpr int TOPK_DECAY_LOOKUP_TABLE = 256; + +using CounterT = uint32_t; + +struct HeapBucket { + uint32_t fp; + CounterT count; + std::string item; + + HeapBucket() = default; + + HeapBucket(uint32_t fp, CounterT count, std::string item) : fp(fp), count(count), item(std::move(item)) {} + + HeapBucket(const HeapBucket &other) { + if (this != &other) { + fp = other.fp; + count = other.count; + item = other.item; + } + } + + HeapBucket(const HeapBucket &&other) noexcept { + if (this != &other) { + fp = other.fp; + count = other.count; + item = other.item; + } + } + + HeapBucket &operator=(const HeapBucket &other) { + if (this != &other) { + fp = other.fp; + count = other.count; + item = other.item; + } + return *this; + } + + HeapBucket &operator=(const HeapBucket &&other) noexcept { + if (this != &other) { + fp = other.fp; + count = other.count; + item = other.item; + } + return *this; + } + + ~HeapBucket() = default; +}; + +struct Bucket { + uint32_t fp; + CounterT count; +}; + +class BlockSplitTopK { + public: + BlockSplitTopK() = delete; + BlockSplitTopK(const BlockSplitTopK &) = delete; + BlockSplitTopK &operator=(const BlockSplitTopK &) = delete; + BlockSplitTopK(BlockSplitTopK &&) = delete; + BlockSplitTopK &operator=(BlockSplitTopK &&) = delete; + + explicit BlockSplitTopK(uint32_t k, uint32_t width, uint32_t depth, double decay) + : k(k), + width(width), + depth(depth), + decay(decay), + heap_size(0), + buckets(depth, std::vector(width, Bucket{0, 0})), + heap(k, HeapBucket{0, 0, ""}) { + for (int i = 0; i < TOPK_DECAY_LOOKUP_TABLE; ++i) { + lookup_table[i] = pow(decay, i); + } + } + + ~BlockSplitTopK() = default; + + void Add(const std::string &item, uint32_t increment, std::vector &is_dirty_buckets, + std::vector &is_dirty_heaps); + bool Query(const std::string &item) const; + std::vector List(); + + void HeapifyDown(int start, std::vector &is_dirty_heaps); + void HeapifyUp(int start, std::vector &is_dirty_heaps); + int CheckExistInHeap(const std::string &item) const; + static int CmpHeapBucketCount(const HeapBucket &a, const HeapBucket &b); + + uint32_t k; + uint32_t width; + uint32_t depth; + double decay; + + int heap_size; + + std::vector> buckets; + std::vector heap; + double lookup_table[TOPK_DECAY_LOOKUP_TABLE]; +}; \ No newline at end of file diff --git a/tests/cppunit/types/topk_test.cc b/tests/cppunit/types/topk_test.cc new file mode 100644 index 00000000000..daeffc0aeac --- /dev/null +++ b/tests/cppunit/types/topk_test.cc @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include + +#include + +#include "test_base.h" +#include "types/redis_topk.h" + +static constexpr uint32_t k = 5; +static constexpr uint32_t width = 7; +static constexpr uint32_t depth = 8; +static constexpr double decay = 0.9; + +class RedisTopKTest : public TestBase { + protected: + explicit RedisTopKTest() : TestBase() { top_k_ = std::make_unique(storage_.get(), "topk_ns"); } + ~RedisTopKTest() override = default; + + void SetUp() override { + key_ = "test_topk->key"; + top_k_->Reserve(*ctx_, key_, k, width, depth, decay); + } + + void TearDown() override {} + + std::unique_ptr top_k_; +}; + +TEST_F(RedisTopKTest, TestTopKInfo) { + // test exist key + redis::TopKInfo info1; + top_k_->Info(*ctx_, key_, &info1); + ASSERT_EQ(info1.k, k); + ASSERT_EQ(info1.width, width); + ASSERT_EQ(info1.depth, depth); + ASSERT_EQ(info1.decay, decay); + + // test not exist key + redis::TopKInfo info2; + auto s = top_k_->Info(*ctx_, "not_exist_key", &info2); + ASSERT_FALSE(s.ok()); +} + +TEST_F(RedisTopKTest, TestTopKAddAndQuery) { + // test not exist key + std::string no_exist_key = "no_exist_key"; + auto s = top_k_->Add(*ctx_, no_exist_key, "1"); + ASSERT_FALSE(s.ok()); + + bool exist = false; + s = top_k_->Query(*ctx_, no_exist_key, "1", &exist); + ASSERT_FALSE(s.ok()); + + std::vector list; + s = top_k_->List(*ctx_, no_exist_key, list); + ASSERT_FALSE(s.ok()); + + // test exist key + std::vector values1 = {"1", "2", "3", "4", "5"}; + std::vector values2 = {"6", "7", "8", "9", "10"}; + std::unordered_set values_set1(values1.begin(), values1.end()); + std::unordered_set values_set2(values2.begin(), values2.end()); + + // found not exist values1 + for (const auto &value : values1) { + bool found = true; + top_k_->Query(*ctx_, key_, value, &found); + ASSERT_FALSE(found); + } + // add values1, and query values1. + for (const auto &value : values1) { + top_k_->Add(*ctx_, key_, value); + bool found = false; + top_k_->Query(*ctx_, key_, value, &found); + ASSERT_TRUE(found); + } + for (const auto &value : values1) { + bool found = false; + top_k_->Query(*ctx_, key_, value, &found); + ASSERT_TRUE(found); + } + + // found topk list. + std::vector top_k_list; + top_k_->List(*ctx_, key_, top_k_list); + ASSERT_EQ(top_k_list.size(), values1.size()); + for (size_t i = 0; i < k; ++i) { + ASSERT_TRUE(values_set1.find(top_k_list[i]) != values_set1.end()); + } + + // heap is full, need remove values1. + for (const auto &value : values2) { + bool found = false; + // due to decay, topk is possible to remove values1. + while (!found) { + top_k_->Add(*ctx_, key_, value); + top_k_->Query(*ctx_, key_, value, &found); + } + top_k_->Add(*ctx_, key_, value); + } + + // values1 is removed. + for (const auto &value : values1) { + bool found = true; + top_k_->Query(*ctx_, key_, value, &found); + ASSERT_FALSE(found); + } + + // found topk list. + top_k_list.clear(); + top_k_->List(*ctx_, key_, top_k_list); + for (const auto &value : top_k_list) { + ASSERT_TRUE(values_set2.find(value) != values_set2.end()); + } +} diff --git a/x.py b/x.py index d0bf933dd02..559d7eecfaa 100755 --- a/x.py +++ b/x.py @@ -32,7 +32,7 @@ CMAKE_REQUIRE_VERSION = (3, 16, 0) CLANG_FORMAT_REQUIRED_VERSION = (18, 0, 0) CLANG_TIDY_REQUIRED_VERSION = (18, 0, 0) -GOLANGCI_LINT_REQUIRED_VERSION = (2, 6, 0) +GOLANGCI_LINT_REQUIRED_VERSION = (2, 7, 0) SEMVER_REGEX = re.compile( r"""