Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 15 additions & 17 deletions src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ extern "C" {
#include "core/cms.h"
#include "core/detail/bitpacking.h"
#include "core/huff_coder.h"
#include "core/oah_set.h"
#include "core/page_usage/page_usage_stats.h"
#include "core/qlist.h"
#include "core/sorted_map.h"
Expand Down Expand Up @@ -63,10 +64,12 @@ size_t UpdateSize(size_t size, int64_t update) {

inline void FreeObjSet(unsigned encoding, void* ptr, MemoryResource* mr) {
switch (encoding) {
case kEncodingStrMap2: {
CompactObj::DeleteMR<StringSet>(ptr);
case kEncodingStrMap2:
VisitSet(ptr, [](auto* ss) {
using T = std::remove_pointer_t<decltype(ss)>;
CompactObj::DeleteMR<T>(ss);
});
break;
}

case kEncodingIntSet:
zfree((void*)ptr);
Expand All @@ -87,10 +90,10 @@ void FreeList(unsigned encoding, void* ptr, MemoryResource* mr) {

size_t MallocUsedSet(unsigned encoding, void* ptr) {
switch (encoding) {
case kEncodingStrMap2: {
StringSet* ss = (StringSet*)ptr;
return ss->ObjMallocUsed() + ss->SetMallocUsed() + zmalloc_usable_size(ptr);
}
case kEncodingStrMap2:
return VisitSet(ptr, [ptr](auto* ss) {
return ss->ObjMallocUsed() + ss->SetMallocUsed() + zmalloc_usable_size(ptr);
});
case kEncodingIntSet:
return intsetBlobLen((intset*)ptr);
}
Expand Down Expand Up @@ -277,12 +280,10 @@ pair<void*, bool> DefragSortedMap(detail::SortedMap* sm, PageUsage* page_usage)
return {sm, reallocated};
}

pair<void*, bool> DefragStrSet(StringSet* ss, PageUsage* page_usage) {
template <typename Set> pair<void*, bool> DefragDenseSet(Set* ss, PageUsage* page_usage) {
bool realloced = false;

for (auto it = ss->begin(); it != ss->end(); ++it)
realloced |= it.ReallocIfNeeded(page_usage);

return {ss, realloced};
}

Expand Down Expand Up @@ -313,9 +314,8 @@ pair<void*, bool> DefragSet(unsigned encoding, void* ptr, PageUsage* page_usage)
return DefragIntSet((intset*)ptr, page_usage);
}

case kEncodingStrMap2: {
return DefragStrSet((StringSet*)ptr, page_usage);
}
case kEncodingStrMap2:
return VisitSet(ptr, [page_usage](auto* ss) { return DefragDenseSet(ss, page_usage); });

default:
ABSL_UNREACHABLE();
Expand Down Expand Up @@ -460,10 +460,8 @@ size_t RobjWrapper::Size() const {
intset* is = (intset*)inner_obj_;
return intsetLen(is);
}
case kEncodingStrMap2: {
StringSet* ss = (StringSet*)inner_obj_;
return ss->UpperBoundSize();
}
case kEncodingStrMap2:
return VisitSet(inner_obj_, [](auto* ss) { return ss->UpperBoundSize(); });
default:
LOG(FATAL) << "Unexpected encoding " << encoding_;
};
Expand Down
41 changes: 37 additions & 4 deletions src/core/oah_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <vector>

#include "core/detail/stateless_allocator.h"
#include "core/string_set.h"
#include "oah_entry.h"

namespace dfly {
Expand Down Expand Up @@ -107,11 +108,19 @@ class OAHSet { // Open Addressing Hash Set
void SetEntryIt() {
if (!owner_)
return;
// time_now_ == 0 disables expiry (callers set it to 0 around serialization).
const uint32_t now = owner_->time_now_;
for (auto num_entries = owner_->entries_.size(); bucket_ < num_entries; ++bucket_) {
auto& bucket = owner_->entries_[bucket_];
for (uint32_t bucket_size = bucket.ElementsNum(); pos_ < bucket_size; ++pos_) {
if (bucket[pos_])
return;
auto& entry = bucket[pos_];
if (!entry)
continue;
if (now != 0 && entry.HasExpiry() && entry.GetExpiry() <= now) {
entry.ExpireIfNeeded(now, &owner_->size_, &owner_->obj_alloc_used_);
continue;
}
return;
}
pos_ = 0;
}
Expand Down Expand Up @@ -468,11 +477,11 @@ class OAHSet { // Open Addressing Hash Set
return time_now_;
}

size_t ObjAllocUsed() const {
size_t ObjMallocUsed() const {
return obj_alloc_used_;
}

size_t SetAllocUsed() const {
size_t SetMallocUsed() const {
return entries_.capacity() * sizeof(OAHEntry) + ptr_vectors_alloc_used_;
}

Expand Down Expand Up @@ -762,4 +771,28 @@ class OAHSet { // Open Addressing Hash Set
Buckets entries_;
};

// Snapshot of --use_oah_set captured once at startup.
inline bool g_use_oah_set = false;
Comment thread
BorysTheDev marked this conversation as resolved.

// Dispatches a generic lambda over the runtime-selected dense-set type backing
// kEncodingStrMap2 SETs. Both StringSet and OAHSet expose the same surface
// (set_time, Empty, BucketCount, Reserve, ObjMallocUsed, ...) so the lambda
// can be written once and visit either concrete type.
template <typename Fn> auto VisitSet(void* ptr, Fn&& fn) {
return g_use_oah_set ? fn(static_cast<OAHSet*>(ptr)) : fn(static_cast<StringSet*>(ptr));
}

// Extracts the current member as a string_view from either a StringSet or an
// OAHSet iterator. Free functions so generic code (e.g. inside VisitSet
// lambdas) can write `Key(it)` without a member-method asymmetry between the
// two iterator types.
inline std::string_view Key(StringSet::iterator it) {
sds s = *it;
return {s, sdslen(s)};
}

inline std::string_view Key(OAHSet::iterator it) {
return it->Key();
}

} // namespace dfly
22 changes: 11 additions & 11 deletions src/core/oah_set_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ TEST_F(OAHSetTest, ReallocIfNeededForceReallocates) {
for (size_t i = 0; i < 50; ++i) {
EXPECT_TRUE(ss_->Add(absl::StrCat("key_", i, "_xxxxxxxx"), 100 + i));
}
size_t alloc_before = ss_->ObjAllocUsed();
size_t alloc_before = ss_->ObjMallocUsed();
EXPECT_GT(alloc_before, 0u);

PageUsage page_usage{CollectPageStats::NO, 0.9};
Expand All @@ -633,8 +633,8 @@ TEST_F(OAHSetTest, ReallocIfNeededForceReallocates) {
ASSERT_NE(it, ss_->end());
EXPECT_EQ(it.ExpiryTime(), 100u + i);
}
// ObjAllocUsed remains roughly consistent (mimalloc usable size for same logical size).
EXPECT_GT(ss_->ObjAllocUsed(), 0u);
// ObjMallocUsed remains roughly consistent (mimalloc usable size for same logical size).
EXPECT_GT(ss_->ObjMallocUsed(), 0u);
}

TEST_F(OAHSetTest, ReallocIfNeededVectorEntry) {
Expand Down Expand Up @@ -743,7 +743,7 @@ TEST_F(OAHSetTest, ClearStepIncremental) {
}
EXPECT_EQ(cursor, total);
EXPECT_EQ(ss_->UpperBoundSize(), 0u);
EXPECT_EQ(ss_->ObjAllocUsed(), 0u);
EXPECT_EQ(ss_->ObjMallocUsed(), 0u);
}

TEST_F(OAHSetTest, ClearStepFullBucketCount) {
Expand All @@ -753,7 +753,7 @@ TEST_F(OAHSetTest, ClearStepFullBucketCount) {
uint32_t end = ss_->ClearStep(0, ss_->Capacity());
EXPECT_EQ(end, ss_->Capacity());
EXPECT_EQ(ss_->UpperBoundSize(), 0u);
EXPECT_EQ(ss_->ObjAllocUsed(), 0u);
EXPECT_EQ(ss_->ObjMallocUsed(), 0u);
}

TEST_F(OAHSetTest, GetRandomMemberEmpty) {
Expand Down Expand Up @@ -810,7 +810,7 @@ TEST_F(OAHSetTest, ClearStepResetsExpirationUsed) {
<< "ExpirationUsed must be false after ClearStep fully empties the set";
}

TEST_F(OAHSetTest, ReallocIfNeededObjAllocUsedConsistent) {
TEST_F(OAHSetTest, ReallocIfNeededObjMallocUsedConsistent) {
// Sanity: after force-realloc, obj_alloc_used_ remains the sum of all entries'
// current AllocSize. Guards against signed-delta arithmetic going wrong on the counter.
for (size_t i = 0; i < 100; ++i)
Expand All @@ -824,20 +824,20 @@ TEST_F(OAHSetTest, ReallocIfNeededObjAllocUsedConsistent) {
size_t expected = 0;
for (auto it = ss_->begin(); it != ss_->end(); ++it)
expected += (*it).AllocSize();
EXPECT_EQ(ss_->ObjAllocUsed(), expected);
EXPECT_EQ(ss_->ObjMallocUsed(), expected);
}

TEST_F(OAHSetTest, ClearResetsObjAllocUsed) {
TEST_F(OAHSetTest, ClearResetsObjMallocUsed) {
for (size_t i = 0; i < 100; ++i) {
ss_->Add(random_string(generator_, 10));
}

EXPECT_GT(ss_->ObjAllocUsed(), 0u);
EXPECT_GT(ss_->ObjMallocUsed(), 0u);
EXPECT_GT(ss_->UpperBoundSize(), 0u);

ss_->Clear();

EXPECT_EQ(ss_->ObjAllocUsed(), 0u);
EXPECT_EQ(ss_->ObjMallocUsed(), 0u);
EXPECT_EQ(ss_->UpperBoundSize(), 0u);
}

Expand All @@ -849,7 +849,7 @@ TEST_F(OAHSetTest, IterateEmpty) {
}

static size_t MemUsed(OAHSet& obj) {
return obj.ObjAllocUsed() + obj.SetAllocUsed();
return obj.ObjMallocUsed() + obj.SetMallocUsed();
}

void BM_Clone(benchmark::State& state) {
Expand Down
2 changes: 1 addition & 1 deletion src/server/collection_family_fallback.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ StringMap* HSetFamily::ConvertToStrMap(uint8_t* lp) {
return nullptr;
}

StringSet* SetFamily::ConvertToStrSet(const intset* is, size_t expected_len) {
void* SetFamily::ConvertToStrSet(const intset* is, size_t expected_len) {
Fail();
return nullptr;
}
Expand Down
14 changes: 9 additions & 5 deletions src/server/container_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "base/flags.h"
#include "base/logging.h"
#include "core/detail/listpack_wrap.h"
#include "core/oah_set.h"
#include "core/qlist.h"
#include "core/sorted_map.h"
#include "core/string_map.h"
Expand Down Expand Up @@ -205,12 +206,15 @@ bool IterateSet(const PrimeValue& pv, const IterateFunc& func) {
success = func(ContainerEntry{ival});
}
} else {
for (sds ptr : *static_cast<StringSet*>(pv.RObjPtr())) {
if (!func(ContainerEntry{ptr, sdslen(ptr)})) {
success = false;
break;
VisitSet(pv.RObjPtr(), [&](auto* set) {
for (auto it = set->begin(); it != set->end(); ++it) {
std::string_view key = Key(it);
if (!func(ContainerEntry{key.data(), key.size()})) {
success = false;
break;
}
}
}
});
}

return success;
Expand Down
70 changes: 42 additions & 28 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "server/db_slice.h"

#include "core/dense_set.h"
#include "core/oah_set.h"
#include "strings/human_readable.h"

extern "C" {
Expand Down Expand Up @@ -254,35 +255,48 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotBuckets& eb, PrimeTable

class AsyncDeleter {
public:
static void EnqueDeletion(uint32_t next, DenseSet* ds);
template <typename Set> static void EnqueDeletion(uint32_t next, Set* ds);
static void Shutdown();

private:
static constexpr uint32_t kClearStepSize = 1024;
struct ClearNode {
DenseSet* ds;
void* ds;
uint32_t cursor;
// Advances the node by one step. Deletes the underlying set and returns true
// when the set is fully cleared; otherwise updates cursor and returns false.
bool (*step)(ClearNode*);
ClearNode* next;

ClearNode(DenseSet* d, uint32_t c, ClearNode* n) : ds(d), cursor(c), next(n) {
}
};

// Asynchronously deletes entries during the cpu-idle time.
static int32_t IdleCb();

// We add async deletion requests to a linked list and process them asynchronously
// in each thread.
static __thread ClearNode* head_;
};

__thread AsyncDeleter::ClearNode* AsyncDeleter::head_ = nullptr;

void AsyncDeleter::EnqueDeletion(uint32_t next, DenseSet* ds) {
// ClearStep returns the next cursor; the table is empty when it equals the
// underlying entries-vector size. DenseSet exposes that as BucketCount();
// OAHSet exposes it as Capacity() (BucketCount() omits displacement slots).
template <typename Set> uint32_t ClearStepEnd(Set* s) {
if constexpr (std::is_same_v<Set, OAHSet>)
return s->Capacity();
else
return s->BucketCount();
}

template <typename Set> void AsyncDeleter::EnqueDeletion(uint32_t next, Set* ds) {
auto step = +[](ClearNode* n) {
auto* s = static_cast<Set*>(n->ds);
n->cursor = s->ClearStep(n->cursor, kClearStepSize);
if (n->cursor == ClearStepEnd(s)) {
CompactObj::DeleteMR<Set>(s);
return true;
Comment thread
BorysTheDev marked this conversation as resolved.
}
return false;
};
bool launch_task = (head_ == nullptr);

// register ds
head_ = new ClearNode{ds, next, head_};
head_ = new ClearNode{ds, next, step, head_};
ProactorBase* pb = ProactorBase::me();
DCHECK(pb);
DVLOG(2) << "Adding async deletion task, thread " << pb->GetPoolIndex() << " " << launch_task;
Expand All @@ -306,15 +320,10 @@ int32_t AsyncDeleter::IdleCb() {
return -1; // unregister itself.

auto* current = head_;

DVLOG(2) << "IdleCb " << current->cursor;
uint32_t next = current->ds->ClearStep(current->cursor, kClearStepSize);
if (next == current->ds->BucketCount()) { // reached the end.
CompactObj::DeleteMR<DenseSet>(current->ds);
if (current->step(current)) {
head_ = current->next;
delete current;
} else {
current->cursor = next;
}
return ProactorBase::kOnIdleMaxLevel;
};
Expand Down Expand Up @@ -1839,16 +1848,21 @@ void DbSlice::PerformDeletionAtomic(const Iterator& del_it, DbTable* table, bool
AccountObjectMemory(del_it.key(), pv.ObjType(), -value_heap_size, table); // Value

if (async && MayDeleteAsynchronously(pv)) {
DenseSet* ds = (DenseSet*)pv.RObjPtr();
auto schedule = [](auto* ds) {
using Ds = std::remove_pointer_t<decltype(ds)>;
uint32_t next = ds->ClearStep(0, 512);
if (next < ClearStepEnd(ds))
AsyncDeleter::EnqueDeletion(next, ds);
else
CompactObj::DeleteMR<Ds>(ds);
};
void* obj_ptr = pv.RObjPtr();
pv.SetRObjPtr(nullptr);
const size_t kClearStepSize = 512;

uint32_t next = ds->ClearStep(0, kClearStepSize);
if (next < ds->BucketCount()) {
AsyncDeleter::EnqueDeletion(next, ds);
} else {
CompactObj::DeleteMR<DenseSet>(ds);
}
// SET dispatches via VisitSet (StringSet/OAHSet); HASH is always StringMap (DenseSet-derived).
if (pv.ObjType() == OBJ_SET)
VisitSet(obj_ptr, schedule);
else
schedule(static_cast<DenseSet*>(obj_ptr));
}

if (table->slots_stats) {
Expand Down
Loading
Loading