Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: introduce background deletions of DenseSet objects #4496

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 76 additions & 6 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ extern "C" {
#include "server/search/doc_index.h"
#include "server/set_family.h"
#include "server/transaction.h"
#include "util/fibers/proactor_base.h"
#include "util/varz.h"

ABSL_FLAG(uint32_t, dbnum, 16, "Number of databases");
Expand All @@ -41,6 +42,7 @@ ABSL_FLAG(uint32_t, keys_output_limit, 8192, "Maximum number of keys output by k
namespace dfly {
using namespace std;
using namespace facade;
using util::fb2::ProactorBase;

namespace {

Expand Down Expand Up @@ -994,14 +996,46 @@ std::optional<int32_t> ParseExpireOptionsOrReply(const CmdArgList args, SinkRepl
return flags;
}

void DeleteGeneric(CmdArgList args, const CommandContext& cmd_cntx) {
constexpr uint32_t kClearStepSize = 1024;
struct ClearNode {
DenseSet* ds;
uint32_t cursor;
ClearNode* next;

ClearNode(DenseSet* d, uint32_t c, ClearNode* n) : ds(d), cursor(c), next(n) {
}
};

// We add async deletion requests to a linked list and process them asynchronously
// in each thread.
__thread ClearNode* clear_head = nullptr;

int32_t ClearQueuedDenseSetEntries() {
if (clear_head == nullptr)
return -1; // unregister itself.

auto* current = clear_head;

DVLOG(2) << "ClearQueuedDenseSetEntries " << current->cursor;
uint32_t next = current->ds->ClearStep(current->cursor, kClearStepSize);
if (next == current->ds->BucketCount()) { // reached the end.
CompactObj::DeleteMR<DenseSet>(current->ds);
clear_head = current->next;
delete current;
} else {
current->cursor = next;
}
return ProactorBase::kOnIdleMaxLevel;
};

void DeleteGeneric(CmdArgList args, const CommandContext& cmd_cntx, bool async) {
atomic_uint32_t result{0};
auto* builder = cmd_cntx.rb;
bool is_mc = (builder->GetProtocol() == Protocol::MEMCACHE);

auto cb = [&result](const Transaction* t, EngineShard* shard) {
auto cb = [&](const Transaction* t, EngineShard* shard) {
ShardArgs args = t->GetShardArgs(shard->shard_id());
auto res = GenericFamily::OpDel(t->GetOpArgs(shard), args);
auto res = GenericFamily::OpDel(t->GetOpArgs(shard), args, async);
result.fetch_add(res.value_or(0), memory_order_relaxed);

return OpStatus::OK;
Expand Down Expand Up @@ -1029,7 +1063,18 @@ void DeleteGeneric(CmdArgList args, const CommandContext& cmd_cntx) {

} // namespace

OpResult<uint32_t> GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& keys) {

void GenericFamily::ShutdownShardData() {
// we do not buther with deleting the sets during the shutdown.
// this works well because we destroy mimalloc heap anyways.
while (clear_head) {
auto* next = clear_head->next;
delete clear_head;
clear_head = next;
}
}

OpResult<uint32_t> GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& keys, bool async) {
DVLOG(1) << "Del: " << keys.Front();
auto& db_slice = op_args.GetDbSlice();

Expand All @@ -1040,6 +1085,31 @@ OpResult<uint32_t> GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs&
if (!IsValid(it))
continue;

PrimeValue& pv = it->second;
if (pv.ObjType() == OBJ_SET && pv.Encoding() == kEncodingStrMap2) {
DenseSet* ds = (DenseSet*)pv.RObjPtr();
pv.SetRObjPtr(nullptr);

uint32_t next = ds->ClearStep(0, kClearStepSize);
if (next < ds->BucketCount() && async) {
ProactorBase* pb = ProactorBase::me();
DCHECK(pb);

bool launch_task = (clear_head == nullptr);

// register ds
clear_head = new ClearNode{ds, next, clear_head};
ds = nullptr;
if (launch_task) {
pb->AddOnIdleTask(&ClearQueuedDenseSetEntries);
}
}

if (ds) {
CompactObj::DeleteMR<DenseSet>(ds);
}
}

db_slice.Del(op_args.db_cntx, it);
++res;
}
Expand All @@ -1050,11 +1120,11 @@ OpResult<uint32_t> GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs&
void GenericFamily::Del(CmdArgList args, const CommandContext& cmd_cntx) {
VLOG(1) << "Del " << ArgS(args, 0);

DeleteGeneric(args, cmd_cntx);
DeleteGeneric(args, cmd_cntx, false);
}

void GenericFamily::Unlink(CmdArgList args, const CommandContext& cmd_cntx) {
DeleteGeneric(args, cmd_cntx);
DeleteGeneric(args, cmd_cntx, true);
}

void GenericFamily::Ping(CmdArgList args, const CommandContext& cmd_cntx) {
Expand Down
11 changes: 5 additions & 6 deletions src/server/generic_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,13 @@

ABSL_DECLARE_FLAG(uint32_t, dbnum);

namespace facade {
class SinkReplyBuilder;
};

namespace dfly {

using facade::CmdArgList;
using facade::OpResult;

class CommandRegistry;
class Transaction;
struct CommandContext;

class GenericFamily {
Expand All @@ -29,10 +25,13 @@ class GenericFamily {

// Accessed by Service::Exec and Service::Watch as an utility.
static OpResult<uint32_t> OpExists(const OpArgs& op_args, const ShardArgs& keys);
static OpResult<uint32_t> OpDel(const OpArgs& op_args, const ShardArgs& keys);
static OpResult<uint32_t> OpDel(const OpArgs& op_args, const ShardArgs& keys, bool async);

// Thread local callback that is called from each shard thread
// when the service is shutdown.
static void ShutdownShardData();

private:
using SinkReplyBuilder = facade::SinkReplyBuilder;

static void Del(CmdArgList args, const CommandContext& cmd_cntx);
static void Unlink(CmdArgList args, const CommandContext& cmd_cntx);
Expand Down
14 changes: 14 additions & 0 deletions src/server/generic_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -872,4 +872,18 @@ TEST_F(GenericFamilyTest, RestoreOOM) {
EXPECT_THAT(resp, ErrArg("Out of memory"));
}

TEST_F(GenericFamilyTest, Unlink) {
for (unsigned i = 0; i < 1000; ++i) {
unsigned start = i * 10;
vector<string> cmd = {"SADD", "s1"};
for (unsigned j = 0; j < 10; ++j) {
cmd.push_back(absl::StrCat("f", start + j));
}
auto resp = Run(absl::MakeSpan(cmd));
ASSERT_THAT(resp, IntArg(10));
}
auto resp = Run({"unlink", "s1"});
EXPECT_THAT(resp, IntArg(1));
}

} // namespace dfly
3 changes: 3 additions & 0 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,9 @@ void Service::Shutdown() {
pp_.AwaitFiberOnAll([](ProactorBase* pb) {
ServerState::tlocal()->EnterLameDuck();
facade::Connection::ShutdownThreadLocal();
if (auto* shard = EngineShard::tlocal(); shard) {
GenericFamily::ShutdownShardData();
}
});

config_registry.Reset();
Expand Down
2 changes: 1 addition & 1 deletion src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ void StringFamily::Set(CmdArgList args, const CommandContext& cmnd_cntx) {
if (rel_ms < 0) {
cmnd_cntx.tx->ScheduleSingleHop([](const Transaction* tx, EngineShard* es) {
ShardArgs args = tx->GetShardArgs(es->shard_id());
GenericFamily::OpDel(tx->GetOpArgs(es), args);
GenericFamily::OpDel(tx->GetOpArgs(es), args, false);
return OpStatus::OK;
});
return builder->SendStored();
Expand Down
Loading