Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of the Hyperloglog data structure #2142

Open
wants to merge 25 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ enum RedisType : uint8_t {
kRedisBloomFilter = 9,
kRedisJson = 10,
kRedisSearch = 11,
kRedisHyperLogLog = 12,
tutububug marked this conversation as resolved.
Show resolved Hide resolved
};

struct RedisTypes {
Expand Down Expand Up @@ -329,3 +330,26 @@ class SearchMetadata : public Metadata {
void Encode(std::string *dst) const override;
rocksdb::Status Decode(Slice *input) override;
};

constexpr uint32_t kHyperLogLogRegisterCountPow = 14; /* The greater is Pow, the smaller the error. */
constexpr uint32_t kHyperLogLogHashBitCount =
64 - kHyperLogLogRegisterCountPow; /* The number of bits of the hash value used for determining the number of
leading zeros. */
constexpr uint32_t kHyperLogLogRegisterCount = 1 << kHyperLogLogRegisterCountPow; /* With Pow=14, 16384 registers. */

class HyperloglogMetadata : public Metadata {
public:
enum class EncodeType : uint8_t {
DENSE = 0, // dense encoding implement as sub keys to store registers by segment in data column family.
SPARSE = 1, // TODO sparse encoding implement as a compressed string to store registers in metadata column family.
};

explicit HyperloglogMetadata(EncodeType encode_type = EncodeType::DENSE, bool generate_version = true)
: Metadata(kRedisHyperLogLog, generate_version) {
size = 1; // 'size' must non-zone, or 'GetMetadata' will failed as 'expired'.
}

private:
// TODO optimize for converting storage encoding automatically
// EncodeType encode_type_;
};
4 changes: 4 additions & 0 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
#include "observer_or_unique.h"
#include "status.h"

#if defined(__sparc__) || defined(__arm__)
#define USE_ALIGNED_ACCESS
#endif

enum class StorageEngineType : uint16_t {
RocksDB,
Speedb,
Expand Down
75 changes: 0 additions & 75 deletions src/types/redis_bitmap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@

namespace redis {

constexpr uint32_t kBitmapSegmentBits = 1024 * 8;
constexpr uint32_t kBitmapSegmentBytes = 1024;

constexpr char kErrBitmapStringOutOfRange[] =
"The size of the bitmap string exceeds the "
"configuration item max-bitmap-to-string-mb";
Expand Down Expand Up @@ -654,78 +651,6 @@ rocksdb::Status Bitmap::BitOp(BitOpFlags op_flag, const std::string &op_name, co
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

// SegmentCacheStore is used to read segments from storage.
class Bitmap::SegmentCacheStore {
public:
SegmentCacheStore(engine::Storage *storage, rocksdb::ColumnFamilyHandle *metadata_cf_handle,
std::string namespace_key, const Metadata &bitmap_metadata)
: storage_(storage),
metadata_cf_handle_(metadata_cf_handle),
ns_key_(std::move(namespace_key)),
metadata_(bitmap_metadata) {}

// Get a read-only segment by given index
rocksdb::Status Get(uint32_t index, const std::string **cache) {
std::string *res = nullptr;
auto s = get(index, /*set_dirty=*/false, &res);
if (s.ok()) {
*cache = res;
}
return s;
}

// Get a segment by given index, and mark it dirty.
rocksdb::Status GetMut(uint32_t index, std::string **cache) { return get(index, /*set_dirty=*/true, cache); }

// Add all dirty segments into write batch.
void BatchForFlush(ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch) {
uint64_t used_size = 0;
for (auto &[index, content] : cache_) {
if (content.first) {
std::string sub_key =
InternalKey(ns_key_, getSegmentSubKey(index), metadata_.version, storage_->IsSlotIdEncoded()).Encode();
batch->Put(sub_key, content.second);
used_size = std::max(used_size, static_cast<uint64_t>(index) * kBitmapSegmentBytes + content.second.size());
}
}
if (used_size > metadata_.size) {
metadata_.size = used_size;
std::string bytes;
metadata_.Encode(&bytes);
batch->Put(metadata_cf_handle_, ns_key_, bytes);
}
}

private:
rocksdb::Status get(uint32_t index, bool set_dirty, std::string **cache) {
auto [seg_itor, no_cache] = cache_.try_emplace(index);
auto &[is_dirty, str] = seg_itor->second;

if (no_cache) {
is_dirty = false;
std::string sub_key =
InternalKey(ns_key_, getSegmentSubKey(index), metadata_.version, storage_->IsSlotIdEncoded()).Encode();
rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(), sub_key, &str);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
}

is_dirty |= set_dirty;
*cache = &str;
return rocksdb::Status::OK();
}

static std::string getSegmentSubKey(uint32_t index) { return std::to_string(index * kBitmapSegmentBytes); }

engine::Storage *storage_;
rocksdb::ColumnFamilyHandle *metadata_cf_handle_;
std::string ns_key_;
Metadata metadata_;
// Segment index -> [is_dirty, segment_cache_string]
std::unordered_map<uint32_t, std::pair<bool, std::string>> cache_;
};

// Copy a range of bytes from entire bitmap and store them into ArrayBitfieldBitmap.
static rocksdb::Status CopySegmentsBytesToBitfield(Bitmap::SegmentCacheStore &store, uint32_t byte_offset,
uint32_t bytes, ArrayBitfieldBitmap *bitfield) {
Expand Down
74 changes: 74 additions & 0 deletions src/types/redis_bitmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ enum BitOpFlags {

namespace redis {

constexpr uint32_t kBitmapSegmentBits = 1024 * 8;
constexpr uint32_t kBitmapSegmentBytes = 1024;

// We use least-significant bit (LSB) numbering (also known as bit-endianness).
// This means that within a group of 8 bits, we read right-to-left.
// This is different from applying "bit" commands to string, which uses MSB.
Expand Down Expand Up @@ -82,4 +85,75 @@ class Bitmap : public Database {
std::vector<std::optional<BitfieldValue>> *rets);
};

// SegmentCacheStore is used to read segments from storage.
class Bitmap::SegmentCacheStore {
public:
SegmentCacheStore(engine::Storage *storage, rocksdb::ColumnFamilyHandle *metadata_cf_handle,
std::string namespace_key, const Metadata &bitmap_metadata)
: storage_(storage),
metadata_cf_handle_(metadata_cf_handle),
ns_key_(std::move(namespace_key)),
metadata_(bitmap_metadata) {}
// Get a read-only segment by given index
rocksdb::Status Get(uint32_t index, const std::string **cache) {
std::string *res = nullptr;
auto s = get(index, /*set_dirty=*/false, &res);
if (s.ok()) {
*cache = res;
}
return s;
}

// Get a segment by given index, and mark it dirty.
rocksdb::Status GetMut(uint32_t index, std::string **cache) { return get(index, /*set_dirty=*/true, cache); }

// Add all dirty segments into write batch.
void BatchForFlush(ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch) {
uint64_t used_size = 0;
for (auto &[index, content] : cache_) {
if (content.first) {
std::string sub_key =
InternalKey(ns_key_, getSegmentSubKey(index), metadata_.version, storage_->IsSlotIdEncoded()).Encode();
batch->Put(sub_key, content.second);
used_size = std::max(used_size, static_cast<uint64_t>(index) * kBitmapSegmentBytes + content.second.size());
}
}
if (used_size > metadata_.size) {
metadata_.size = used_size;
std::string bytes;
metadata_.Encode(&bytes);
batch->Put(metadata_cf_handle_, ns_key_, bytes);
}
}

private:
rocksdb::Status get(uint32_t index, bool set_dirty, std::string **cache) {
auto [seg_itor, no_cache] = cache_.try_emplace(index);
auto &[is_dirty, str] = seg_itor->second;

if (no_cache) {
is_dirty = false;
std::string sub_key =
InternalKey(ns_key_, getSegmentSubKey(index), metadata_.version, storage_->IsSlotIdEncoded()).Encode();
rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(), sub_key, &str);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
}

is_dirty |= set_dirty;
*cache = &str;
return rocksdb::Status::OK();
}

static std::string getSegmentSubKey(uint32_t index) { return std::to_string(index * kBitmapSegmentBytes); }

engine::Storage *storage_;
rocksdb::ColumnFamilyHandle *metadata_cf_handle_;
std::string ns_key_;
Metadata metadata_;
// Segment index -> [is_dirty, segment_cache_string]
std::unordered_map<uint32_t, std::pair<bool, std::string>> cache_;
};

} // namespace redis
Loading
Loading