Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 47 additions & 7 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ DbStats& DbStats::operator+=(const DbStats& o) {
}

SliceEvents& SliceEvents::operator+=(const SliceEvents& o) {
static_assert(sizeof(SliceEvents) == 136, "You should update this function with new fields");
static_assert(sizeof(SliceEvents) == 144, "You should update this function with new fields");

ADD(evicted_keys);
ADD(hard_evictions);
Expand All @@ -383,6 +383,7 @@ SliceEvents& SliceEvents::operator+=(const SliceEvents& o) {
ADD(ram_misses);
ADD(huff_encode_total);
ADD(huff_encode_success);
ADD(journal_omit);
return *this;
}

Expand Down Expand Up @@ -526,6 +527,11 @@ DbSlice::AutoUpdater::AutoUpdater(DbIndex db_ind, std::string_view key, const It
DCHECK(IsValid(it));
}

void DbSlice::ProvideHints(MutationHints* hints) {
DCHECK(!mutation_hints_);
mutation_hints_ = hints;
}

DbSlice::ItAndUpdater DbSlice::FindMutable(const Context& cntx, string_view key) {
return std::move(FindMutableInternal(cntx, key, std::nullopt).value());
}
Expand All @@ -538,12 +544,11 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutable(const Context& cntx, string
OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutableInternal(const Context& cntx, string_view key,
std::optional<unsigned> req_obj_type) {
auto res = FindInternal(cntx, key, req_obj_type, UpdateStatsMode::kMutableStats);
if (!res.ok()) {
return res.status();
}
RETURN_ON_BAD_STATUS(res);

auto it = Iterator(*res, StringOrView::FromView(key));
PreUpdateBlocking(cntx.db_index, it);

// PreUpdate() might have caused a deletion of `it`
if (res->IsOccupied()) {
DCHECK_GE(db_arr_[cntx.db_index]->stats.obj_memory_usage, (*res)->second.MallocUsed());
Expand Down Expand Up @@ -666,7 +671,24 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx,

if (res.ok()) {
Iterator it(*res, StringOrView::FromView(key));
PreUpdateBlocking(cntx.db_index, it);

bool omit_update = false;
if (auto* mutation = std::exchange(mutation_hints_, nullptr); mutation) {
// We can omit the journal write if:
// 0. it support mutation hints and uses only a single key
// 1. there is a single eventually-consistent snapshot (i.e. replica full sync)
// 2. there are no other journal consumers
// 3. the snapshot did not reach the bucket yet
omit_update = mutation->hint.single_key && mutation->hint.support_omit &&
change_cb_.size() == 1 && it.GetVersion() < change_cb_.front().first &&
journal::CallbackNumber() == 1;

events_.journal_omit += unsigned(omit_update);
mutation->result.omit_journal = omit_update;
}

if (!omit_update)
PreUpdateBlocking(cntx.db_index, it);

// PreUpdate() might have caused a deletion of `it`
if (res->IsOccupied()) {
Expand All @@ -679,12 +701,28 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx,
}

// It's a new entry.
mutation_hints_ = nullptr; // for now: just take them

auto status = res.status();
CHECK(status == OpStatus::KEY_NOTFOUND || status == OpStatus::OUT_OF_MEMORY) << status;

bool omit_update = false;
if (!change_cb_.empty()) {
auto bucket_set = db.prime.CVCUponInsert(key);
CallChangeCallbacks(cntx.db_index, bucket_set);

if (auto* mutation = std::exchange(mutation_hints_, nullptr); mutation) {
auto max_v =
std::ranges::max(bucket_set.buckets(), {}, [](const auto& b) { return b.GetVersion(); });
omit_update = mutation->hint.single_key && mutation->hint.support_omit &&
change_cb_.size() == 1 && max_v.GetVersion() < change_cb_.front().first &&
journal::CallbackNumber() == 1;

events_.journal_omit += unsigned(omit_update);
mutation->result.omit_journal = omit_update;
}

if (!omit_update)
CallChangeCallbacks(cntx.db_index, bucket_set);

// Set of possible insertion buckets must be the same after possibly blocking call
DCHECK(bucket_set == db.prime.CVCUponInsert(key));
Expand Down Expand Up @@ -777,7 +815,9 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx,
}

DCHECK_EQ(it->second.MallocUsed(), 0UL); // Make sure accounting is no-op
it.SetVersion(NextVersion());

if (!omit_update)
it.SetVersion(NextVersion());

TouchTopKeysIfNeeded(key, db.sample_top_keys);
TouchHllIfNeeded(key, db.sample_unique_keys);
Expand Down
26 changes: 26 additions & 0 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ struct SliceEvents {
// how many updates and insertions of keys between snapshot intervals
size_t update = 0;

// how many journal omit optimizations were performed
size_t journal_omit = 0;

uint64_t huff_encode_total = 0, huff_encode_success = 0;

SliceEvents& operator+=(const SliceEvents& o);
Expand Down Expand Up @@ -223,6 +226,24 @@ class DbSlice {
int32_t expire_options = 0; // ExpireFlags
};

// Hints for the current operation.
struct MutationHints {
// Inputs
struct {
// Operation contains only a single key
bool single_key = false;

// Support omitting journal writes
bool support_omit = false;
} const hint;

// Outputs
struct {
// The journal write is safe to be omitted
bool omit_journal = false;
} result = {};
};

DbSlice(uint32_t index, bool cache_mode, EngineShard* owner);
~DbSlice();

Expand Down Expand Up @@ -251,6 +272,8 @@ class DbSlice {
bool is_new = false;
};

void ProvideHints(MutationHints* hints);

ItAndUpdater FindMutable(const Context& cntx, std::string_view key);
OpResult<ItAndUpdater> FindMutable(const Context& cntx, std::string_view key,
unsigned req_obj_type);
Expand Down Expand Up @@ -631,6 +654,9 @@ class DbSlice {
// Record whenever a key expired to DbTable::expired_keys_events_ for keyspace notifications
bool expired_keys_events_recording_ = true;

// Provided for the next FindMutableInternal operation
MutationHints* mutation_hints_ = nullptr;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the size of MutationHints ? why do you need to pass by pointer and not by value?
looks fragile

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it is also used to get output variables


struct Hash {
size_t operator()(const facade::ConnectionRef& c) const {
return std::hash<uint32_t>()(c.GetClientId());
Expand Down
4 changes: 2 additions & 2 deletions src/server/journal/journal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ error_code Close() {
return {};
}

bool HasRegisteredCallbacks() {
return journal_slice.HasRegisteredCallbacks();
unsigned CallbackNumber() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CallbackNumber is not a great name.

return journal_slice.OnChangeCbCount();
}

bool IsLSNInBuffer(LSN lsn) {
Expand Down
5 changes: 4 additions & 1 deletion src/server/journal/journal.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ std::error_code Close();

//******* The following functions must be called in the context of the owning shard *********//

bool HasRegisteredCallbacks();
unsigned CallbackNumber();
inline bool HasRegisteredCallbacks() {
return CallbackNumber() > 0;
}

bool IsLSNInBuffer(LSN lsn);

Expand Down
4 changes: 2 additions & 2 deletions src/server/journal/journal_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class JournalSlice {
uint32_t RegisterOnChange(JournalConsumerInterface* consumer);
void UnregisterOnChange(uint32_t);

bool HasRegisteredCallbacks() const {
return !journal_consumers_arr_.empty();
unsigned OnChangeCbCount() const {
return journal_consumers_arr_.size();
}

/// Returns whether the journal entry with this LSN is available
Expand Down
2 changes: 2 additions & 0 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3295,6 +3295,8 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
append("lua_force_gc_calls", m.lua_stats.force_gc_calls);
append("lua_gc_freed_memory_total", m.lua_stats.gc_freed_memory);
append("lua_gc_duration_total_sec", m.lua_stats.gc_duration_ns * 1e-9);

append("total_journal_omits", m.events.journal_omit);
};

auto add_tiered_info = [&] {
Expand Down
9 changes: 9 additions & 0 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class SetCmd {

const OpArgs op_args_;
bool explicit_journal_; // call RecordJournal (auto journaling disabled)
bool skip_journal_ = false;
};

size_t SetRangeInternal(std::string* value, size_t start, std::string_view range) {
Expand Down Expand Up @@ -836,10 +837,15 @@ OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value
}
}

DbSlice::MutationHints hints{.hint{.single_key = true, .support_omit = explicit_journal_}};
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do it more elegantly?

  1. at least in this example you pass hints with .single_key = true and set support_omit then why passing them at all? why not pass a final boolean true if the caller supports omit
  2. the boolean could be added to op_args_.db_cntx, maybe so you won't need to inject this before the find call.
  3. skip_journal_ can be returned by AddOrFind inside op_res.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, let's move it to existing structures

db_slice.ProvideHints(&hints);

// We can use std::nullopt here because SET command can change the key type to string
auto op_res = db_slice.AddOrFind(op_args_.db_cntx, key, std::nullopt);
RETURN_ON_BAD_STATUS(op_res);

skip_journal_ = hints.result.omit_journal;

if (!op_res->is_new) {
if (auto status = CachePrevIfNeeded(params, op_res->it); status != OpStatus::OK)
return status;
Expand Down Expand Up @@ -940,6 +946,9 @@ void SetCmd::PostEdit(const SetParams& params, std::string_view key, std::string
}

void SetCmd::RecordJournal(const SetParams& params, string_view key, string_view value) {
if (skip_journal_)
return;

absl::InlinedVector<string_view, 5> cmds({key, value}); // 5 is theoretical maximum;

std::string exp_str;
Expand Down
6 changes: 6 additions & 0 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ async def compare_datasets(c_master, c_replica):
2, [2], dict(key_target=1_000, data_size=10_000, huge_value_target=0), 100, marks=M_SLOW
),
# Stress test
# Single replica process might have additional optimizations that need to be verified
pytest.param(4, [4], dict(key_target=500_000, types=["STRING"]), 100_000, marks=M_STRESS),
pytest.param(8, [8, 8], dict(key_target=1_000_000, units=16), 50_000, marks=M_STRESS),
],
)
Expand Down Expand Up @@ -254,6 +256,10 @@ async def check():
# it's usually close to 1% but there are some that are close to 3.
assert preemptions <= (key_capacity * 0.03)

if len(replicas) == 1:
print("total omits", info["total_journal_omits"])
assert info["total_journal_omits"] > 0


"""
Regression test for the double-apply bug during full sync.
Expand Down
10 changes: 7 additions & 3 deletions tests/dragonfly/seeder/script-genlib.lua
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@ end
function LG_funcs.mod_string(key)
-- APPEND and SETRANGE are the only modifying operations for strings,
-- issue APPEND rarely to not grow data too much
if math.random() < 0.05 then
redis.apcall('APPEND', key, '+')
else
-- replace the whole string fully sometimes
local p = math.random()
if p < 0.2 then
redis.apcall('APPEND', key, dragonfly.randstr(2))
elseif p < 0.9 then
local replacement = dragonfly.randstr(LG_funcs.dsize // 2)
redis.apcall('SETRANGE', key, math.random(0, LG_funcs.dsize // 2), replacement)
else
redis.apcall('SET', key, dragonfly.randstr(LG_funcs.dsize))
end
end

Expand Down
Loading