Skip to content

Commit

Permalink
chore: introduce background deletions of DenseSet objects
Browse files Browse the repository at this point in the history
We currently implement them only for sets but the same approach can work for zset, hashes, lists as well.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Jan 22, 2025
1 parent 4a2f2e3 commit 386c8ce
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 9 deletions.
73 changes: 66 additions & 7 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ extern "C" {
#include "server/search/doc_index.h"
#include "server/set_family.h"
#include "server/transaction.h"
#include "util/fibers/proactor_base.h"
#include "util/varz.h"

ABSL_FLAG(uint32_t, dbnum, 16, "Number of databases");
Expand All @@ -41,6 +42,7 @@ ABSL_FLAG(uint32_t, keys_output_limit, 8192, "Maximum number of keys output by k
namespace dfly {
using namespace std;
using namespace facade;
using util::fb2::ProactorBase;

namespace {

Expand Down Expand Up @@ -994,14 +996,46 @@ std::optional<int32_t> ParseExpireOptionsOrReply(const CmdArgList args, SinkRepl
return flags;
}

void DeleteGeneric(CmdArgList args, const CommandContext& cmd_cntx) {
constexpr uint32_t kClearStepSize = 1024;
struct ClearNode {
DenseSet* ds;
uint32_t cursor;
ClearNode* next;

ClearNode(DenseSet* d, uint32_t c, ClearNode* n) : ds(d), cursor(c), next(n) {
}
};

// We add async deletion requests to a linked list and process them asynchronously
// in each thread.
__thread ClearNode* clear_head = nullptr;

int32_t ClearQueuedDenseSetEntries() {
if (clear_head == nullptr)
return -1; // unregister itself.

auto* current = clear_head;

DVLOG(2) << "ClearQueuedDenseSetEntries " << current->cursor;
uint32_t next = current->ds->ClearStep(current->cursor, kClearStepSize);
if (next == current->ds->BucketCount()) { // reached the end.
CompactObj::DeleteMR<DenseSet>(current->ds);
clear_head = current->next;
delete current;
} else {
current->cursor = next;
}
return ProactorBase::kOnIdleMaxLevel;
};

void DeleteGeneric(CmdArgList args, const CommandContext& cmd_cntx, bool async) {
atomic_uint32_t result{0};
auto* builder = cmd_cntx.rb;
bool is_mc = (builder->GetProtocol() == Protocol::MEMCACHE);

auto cb = [&result](const Transaction* t, EngineShard* shard) {
auto cb = [&](const Transaction* t, EngineShard* shard) {
ShardArgs args = t->GetShardArgs(shard->shard_id());
auto res = GenericFamily::OpDel(t->GetOpArgs(shard), args);
auto res = GenericFamily::OpDel(t->GetOpArgs(shard), args, async);
result.fetch_add(res.value_or(0), memory_order_relaxed);

return OpStatus::OK;
Expand Down Expand Up @@ -1029,17 +1063,42 @@ void DeleteGeneric(CmdArgList args, const CommandContext& cmd_cntx) {

} // namespace

OpResult<uint32_t> GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& keys) {
OpResult<uint32_t> GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& keys, bool async) {
DVLOG(1) << "Del: " << keys.Front();
auto& db_slice = op_args.GetDbSlice();

uint32_t res = 0;

for (string_view key : keys) {
auto it = db_slice.FindMutable(op_args.db_cntx, key).it; // post_updater will run immediately
auto it = db_slice.FindMutable(op_args.db_cntx, key).it;
if (!IsValid(it))
continue;

PrimeValue& pv = it->second;
if (pv.ObjType() == OBJ_SET && pv.Encoding() == kEncodingStrMap2) {
DenseSet* ds = (DenseSet*)pv.RObjPtr();
pv.SetRObjPtr(nullptr);

uint32_t next = ds->ClearStep(0, kClearStepSize);
if (next < ds->BucketCount() && async) {
ProactorBase* pb = ProactorBase::me();
DCHECK(pb);

bool launch_task = (clear_head == nullptr);

// register ds
clear_head = new ClearNode{ds, next, clear_head};
ds = nullptr;
if (launch_task) {
pb->AddOnIdleTask(&ClearQueuedDenseSetEntries);
}
}

if (ds) {
CompactObj::DeleteMR<DenseSet>(ds);
}
}

db_slice.Del(op_args.db_cntx, it);
++res;
}
Expand All @@ -1050,11 +1109,11 @@ OpResult<uint32_t> GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs&
void GenericFamily::Del(CmdArgList args, const CommandContext& cmd_cntx) {
VLOG(1) << "Del " << ArgS(args, 0);

DeleteGeneric(args, cmd_cntx);
DeleteGeneric(args, cmd_cntx, false);
}

void GenericFamily::Unlink(CmdArgList args, const CommandContext& cmd_cntx) {
DeleteGeneric(args, cmd_cntx);
DeleteGeneric(args, cmd_cntx, true);
}

void GenericFamily::Ping(CmdArgList args, const CommandContext& cmd_cntx) {
Expand Down
2 changes: 1 addition & 1 deletion src/server/generic_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class GenericFamily {

// Accessed by Service::Exec and Service::Watch as an utility.
static OpResult<uint32_t> OpExists(const OpArgs& op_args, const ShardArgs& keys);
static OpResult<uint32_t> OpDel(const OpArgs& op_args, const ShardArgs& keys);
static OpResult<uint32_t> OpDel(const OpArgs& op_args, const ShardArgs& keys, bool async);

private:
using SinkReplyBuilder = facade::SinkReplyBuilder;
Expand Down
14 changes: 14 additions & 0 deletions src/server/generic_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -872,4 +872,18 @@ TEST_F(GenericFamilyTest, RestoreOOM) {
EXPECT_THAT(resp, ErrArg("Out of memory"));
}

TEST_F(GenericFamilyTest, Unlink) {
for (unsigned i = 0; i < 1000; ++i) {
unsigned start = i * 10;
vector<string> cmd = {"SADD", "s1"};
for (unsigned j = 0; j < 10; ++j) {
cmd.push_back(absl::StrCat("f", start + j));
}
auto resp = Run(absl::MakeSpan(cmd));
ASSERT_THAT(resp, IntArg(10));
}
auto resp = Run({"unlink", "s1"});
EXPECT_THAT(resp, IntArg(1));
}

} // namespace dfly
2 changes: 1 addition & 1 deletion src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ void StringFamily::Set(CmdArgList args, const CommandContext& cmnd_cntx) {
if (rel_ms < 0) {
cmnd_cntx.tx->ScheduleSingleHop([](const Transaction* tx, EngineShard* es) {
ShardArgs args = tx->GetShardArgs(es->shard_id());
GenericFamily::OpDel(tx->GetOpArgs(es), args);
GenericFamily::OpDel(tx->GetOpArgs(es), args, false);
return OpStatus::OK;
});
return builder->SendStored();
Expand Down

0 comments on commit 386c8ce

Please sign in to comment.