Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 59 additions & 19 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

#include "server/db_slice.h"

#include <ranges>

#include "core/dense_set.h"
#include "core/overloaded.h"
#include "strings/human_readable.h"

extern "C" {
Expand Down Expand Up @@ -365,7 +368,7 @@ DbStats& DbStats::operator+=(const DbStats& o) {
}

SliceEvents& SliceEvents::operator+=(const SliceEvents& o) {
static_assert(sizeof(SliceEvents) == 136, "You should update this function with new fields");
static_assert(sizeof(SliceEvents) == 144, "You should update this function with new fields");

ADD(evicted_keys);
ADD(hard_evictions);
Expand All @@ -384,6 +387,7 @@ SliceEvents& SliceEvents::operator+=(const SliceEvents& o) {
ADD(ram_misses);
ADD(huff_encode_total);
ADD(huff_encode_success);
ADD(journal_omit);
return *this;
}

Expand Down Expand Up @@ -539,12 +543,11 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutable(const Context& cntx, string
OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutableInternal(const Context& cntx, string_view key,
std::optional<unsigned> req_obj_type) {
auto res = FindInternal(cntx, key, req_obj_type, UpdateStatsMode::kMutableStats);
if (!res.ok()) {
return res.status();
}
RETURN_ON_BAD_STATUS(res);

auto it = Iterator(*res, StringOrView::FromView(key));
PreUpdateBlocking(cntx.db_index, it);

// PreUpdate() might have caused a deletion of `it`
if (res->IsOccupied()) {
DCHECK_GE(db_arr_[cntx.db_index]->stats.obj_memory_usage, (*res)->second.MallocUsed());
Expand Down Expand Up @@ -667,11 +670,17 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx,

if (res.ok()) {
Iterator it(*res, StringOrView::FromView(key));
PreUpdateBlocking(cntx.db_index, it);

bool omitted_journal = IsOmittableWrite(cntx, it.GetInnerIt());
if (!omitted_journal)
PreUpdateBlocking(cntx.db_index, it);

// PreUpdate() might have caused a deletion of `it`
if (res->IsOccupied()) {
return ItAndUpdater{.it = it, .post_updater{cntx.db_index, key, it, this}, .is_new = false};
return ItAndUpdater{.it = it,
.post_updater{cntx.db_index, key, it, this},
.is_new = false,
.omitted_journal = omitted_journal};
} else {
res = OpStatus::KEY_NOTFOUND;
}
Expand All @@ -683,9 +692,13 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx,
auto status = res.status();
CHECK(status == OpStatus::KEY_NOTFOUND || status == OpStatus::OUT_OF_MEMORY) << status;

bool omit_journal = false;
if (!change_cb_.empty()) {
auto bucket_set = db.prime.CVCUponInsert(key);
CallChangeCallbacks(cntx.db_index, bucket_set);

omit_journal = IsOmittableWrite(cntx, {bucket_set});
if (!omit_journal)
CallChangeCallbacks(cntx.db_index, bucket_set);

// Set of possible insertion buckets must be the same after possibly blocking call
DCHECK(bucket_set == db.prime.CVCUponInsert(key));
Expand Down Expand Up @@ -778,7 +791,9 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx,
}

DCHECK_EQ(it->second.MallocUsed(), 0UL); // Make sure accounting is no-op
it.SetVersion(NextVersion());

if (!omit_journal)
it.SetVersion(NextVersion());

TouchTopKeysIfNeeded(key, db.sample_top_keys);
TouchHllIfNeeded(key, db.sample_unique_keys);
Expand All @@ -796,7 +811,8 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx,
return ItAndUpdater{
.it = Iterator(it, StringOrView::FromView(key)),
.post_updater{cntx.db_index, key, Iterator(it, StringOrView::FromView(key)), this},
.is_new = true};
.is_new = true,
.omitted_journal = omit_journal};
}

void DbSlice::ActivateDb(DbIndex db_ind) {
Expand Down Expand Up @@ -929,7 +945,7 @@ void DbSlice::FlushSlots(const cluster::SlotRanges& slot_ranges) {
}
};

uint64_t cb_id = RegisterOnChange(std::move(on_change));
uint64_t cb_id = RegisterOnChange(false, std::move(on_change));

fb2::Fiber("flush_slots", [this, shared_slots, next_version, cb_id]() {
FlushSlotsFb(*shared_slots, next_version, cb_id);
Expand Down Expand Up @@ -1340,9 +1356,9 @@ void DbSlice::ExpireAllIfNeeded() {
}
}

uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
uint64_t DbSlice::RegisterOnChange(bool replica, ChangeCallback cb) {
DCHECK(!owner_->shard_lock()->IsFree());
return change_cb_.emplace_back(NextVersion(), std::move(cb)).first;
return std::get<0>(change_cb_.emplace_back(NextVersion(), replica, std::move(cb)));
}

// Ordering invariant (PIT mode):
Expand All @@ -1361,9 +1377,9 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_
<< ", upper_bound=" << upper_bound;

const size_t limit = change_cb_.size();
auto ccb = change_cb_.begin();
auto change_cb_iter = change_cb_.begin();
for (size_t i = 0; i < limit; ++i) {
uint64_t cb_version = ccb->first;
const auto& [cb_version, _, cb] = *change_cb_iter;
DCHECK_LE(cb_version, upper_bound);
if (cb_version == upper_bound) {
return;
Expand All @@ -1378,17 +1394,17 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_
// does not change during the serialization, therefore we allow at most one serializer
// reading the bucket at the same time.
if (bucket_version <= cb_version) {
ccb->second(db_ind, ChangeReq{it.GetInnerIt()});
cb(db_ind, ChangeReq{it.GetInnerIt()});
}
++ccb;
++change_cb_iter;
}
}

//! Unregisters the callback.
void DbSlice::UnregisterOnChange(uint64_t id) {
serialization_latch_.Wait();
auto it = find_if(change_cb_.begin(), change_cb_.end(),
[id](const auto& cb) { return cb.first == id; });
[id](const auto& cb) { return std::get<0>(cb) == id; });
CHECK(it != change_cb_.end());
change_cb_.erase(it);
}
Expand Down Expand Up @@ -1921,10 +1937,34 @@ void DbSlice::CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const {
const size_t limit = change_cb_.size();
auto ccb = change_cb_.begin();
for (size_t i = 0; i < limit; ++i) {
CHECK(ccb->second);
ccb->second(id, cr);
const auto& [_v, _r, cb] = *ccb;
CHECK(cb);
cb(id, cr);
++ccb;
}
}

// We can omit the journal write if:
// 1. it supports mutation hints and uses only a single key
// 2. there is a single eventually-consistent snapshot (i.e. replica full sync)
// 3. there are no other journal consumers
// 4. the snapshot did not reach the bucket yet
bool DbSlice::IsOmittableWrite(const Context& cntx, ChangeReq req) {
auto cb1 = [](PrimeTable::bucket_iterator it) { return it.GetVersion(); };
auto cb2 = [cb1](const PrimeTable::BucketSet& bs) -> uint64_t {
DCHECK(!std::ranges::empty(bs.buckets()));
return std::ranges::max(bs.buckets(), {}, cb1).GetVersion();
};

bool omit_update = false;
if (cntx.is_omittable_operation) {
uint64_t max_v = std::visit(Overloaded{cb1, cb2}, req);
bool allowed_snapshot = change_cb_.size() == 1 && std::get<1>(change_cb_.front()) &&
max_v == 0 && max_v < std::get<0>(change_cb_.front());
omit_update = allowed_snapshot && journal::GetCallbackCount() == 1;
events_.journal_omit += unsigned(omit_update);
}
return omit_update;
}

} // namespace dfly
14 changes: 12 additions & 2 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ struct SliceEvents {
// how many updates and insertions of keys between snapshot intervals
size_t update = 0;

// how many journal omit optimizations were performed
size_t journal_omit = 0;

uint64_t huff_encode_total = 0, huff_encode_success = 0;

SliceEvents& operator+=(const SliceEvents& o);
Expand Down Expand Up @@ -248,6 +251,10 @@ class DbSlice {
Iterator it;
AutoUpdater post_updater;
bool is_new = false;

// Set if DbContext::is_omittable_operation was set and the conditions were met.
// Means that the journal write should NOT be performed.
bool omitted_journal = false;
};

ItAndUpdater FindMutable(const Context& cntx, std::string_view key);
Expand Down Expand Up @@ -393,7 +400,7 @@ class DbSlice {
//! Registers the callback to be called for each change.
//! Returns the registration id which is also the unique version of the dbslice
//! at a time of the call.
uint64_t RegisterOnChange(ChangeCallback cb);
uint64_t RegisterOnChange(bool replica, ChangeCallback cb);

bool HasRegisteredCallbacks() const {
return !change_cb_.empty();
Expand Down Expand Up @@ -543,6 +550,9 @@ class DbSlice {

void CreateDb(DbIndex index);

// Returns true if this write could be ignored during replication without losing consistency
bool IsOmittableWrite(const Context& cntx, ChangeReq req);

enum class UpdateStatsMode : uint8_t {
kReadStats,
kMutableStats,
Expand Down Expand Up @@ -608,7 +618,7 @@ class DbSlice {
mutable absl::flat_hash_set<uint64_t, FpHasher> uniq_fps_;

// ordered from the smallest to largest version.
std::list<std::pair<uint64_t, ChangeCallback>> change_cb_;
std::list<std::tuple<uint64_t, bool /* replica */, ChangeCallback>> change_cb_;

// Used in temporary computations in Find item and CbFinish
// This set is used to hold fingerprints of key accessed during the run of
Expand Down
2 changes: 1 addition & 1 deletion src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ TEST_F(DflyEngineTest, Bug496) {
// RegisterOnChange requires the shard lock to be held (see #7153).
shard->shard_lock()->Acquire(IntentLock::EXCLUSIVE);
uint32_t cb_id =
db.RegisterOnChange([&cb_hits](DbIndex, const DbSlice::ChangeReq&) { cb_hits++; });
db.RegisterOnChange(false, [&cb_hits](DbIndex, const DbSlice::ChangeReq&) { cb_hits++; });
shard->shard_lock()->Release(IntentLock::EXCLUSIVE);

{
Expand Down
2 changes: 1 addition & 1 deletion src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ OpStatus OpExpire(const OpArgs& op_args, string_view key, const DbSlice::ExpireP
OpResult<vector<long>> OpFieldExpire(const OpArgs& op_args, string_view key, uint32_t ttl_sec,
CmdArgList values) {
auto& db_slice = op_args.GetDbSlice();
auto [it, auto_updater, is_new] = db_slice.FindMutable(op_args.db_cntx, key);
auto [it, auto_updater, is_new, _] = db_slice.FindMutable(op_args.db_cntx, key);

if (!IsValid(it) || (it->second.ObjType() != OBJ_SET && it->second.ObjType() != OBJ_HASH)) {
std::vector<long> res(values.size(), -2);
Expand Down
4 changes: 2 additions & 2 deletions src/server/journal/journal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ error_code Close() {
return {};
}

bool HasRegisteredCallbacks() {
return journal_slice.HasRegisteredCallbacks();
unsigned GetCallbackCount() {
return journal_slice.OnChangeCbCount();
}

bool IsLSNInBuffer(LSN lsn) {
Expand Down
5 changes: 4 additions & 1 deletion src/server/journal/journal.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ std::error_code Close();

//******* The following functions must be called in the context of the owning shard *********//

bool HasRegisteredCallbacks();
unsigned GetCallbackCount();
inline bool HasRegisteredCallbacks() {
return GetCallbackCount() > 0;
}

bool IsLSNInBuffer(LSN lsn);

Expand Down
4 changes: 2 additions & 2 deletions src/server/journal/journal_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class JournalSlice {
uint32_t RegisterOnChange(JournalConsumerInterface* consumer);
void UnregisterOnChange(uint32_t);

bool HasRegisteredCallbacks() const {
return !journal_consumers_arr_.empty();
unsigned OnChangeCbCount() const {
return journal_consumers_arr_.size();
}

/// Returns whether the journal entry with this LSN is available
Expand Down
2 changes: 1 addition & 1 deletion src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ void RestoreStreamer::Start(util::FiberSocketBase* dest) {
return;

VLOG(1) << "RestoreStreamer start";
SerializerBase::RegisterChangeListener();
SerializerBase::RegisterChangeListener(true);
JournalStreamer::Start(dest);
}

Expand Down
12 changes: 9 additions & 3 deletions src/server/serializer_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ SerializerBase::~SerializerBase() {
// same fiber, so the baseline is serialized first. big_value_mu_ prevents this callback path
// from interleaving with the traversal fiber's bucket serialization, which may preempt while
// emitting large values.
void SerializerBase::RegisterChangeListener() {
void SerializerBase::RegisterChangeListener(bool replica) {
db_array_ = db_slice_->databases(); // copy pointers to survive flush
auto cb = [this](DbIndex dbid, const ChangeReq& req) {
std::visit([&](auto it) { OnChangeBlocking(dbid, it); }, req);
};
snapshot_version_ = db_slice_->RegisterOnChange(cb);
snapshot_version_ = db_slice_->RegisterOnChange(replica, cb);
}

void SerializerBase::UnregisterChangeListener() {
Expand All @@ -145,7 +145,7 @@ void SerializerBase::UnregisterChangeListener() {
bool SerializerBase::ProcessBucket(DbIndex db_index, PrimeTable::bucket_iterator it,
bool on_update) {
// Check if this bucket is stale
if (it.is_done() || it.GetVersion() >= snapshot_version_) {
if (it.GetVersion() >= snapshot_version_) {
stats_.buckets_skipped++;

// Update versions for empty buckets
Expand All @@ -161,6 +161,12 @@ bool SerializerBase::ProcessBucket(DbIndex db_index, PrimeTable::bucket_iterator
return false;
}

// TODO: Flushing to earlier callbacks
if (it.is_done()) {
it.SetVersion(snapshot_version_);
return false;
}

// For non updates (traversal flow), flush change to earlier snapshots and
// acquire serialization latch.
// We must make sure that earlier snapshots serialized this bucket before we update its
Expand Down
2 changes: 1 addition & 1 deletion src/server/serializer_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class SerializerBase : public BucketDependencies, public DelayedEntryHandler {
virtual ~SerializerBase();

// Register db_slice change listener and save snapshot version.
void RegisterChangeListener();
void RegisterChangeListener(bool replica);

// Unregisters the callback. Safe to call if already unregistered.
void UnregisterChangeListener();
Expand Down
2 changes: 2 additions & 0 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3373,6 +3373,8 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
append("lua_force_gc_calls", m.lua_stats.force_gc_calls);
append("lua_gc_freed_memory_total", m.lua_stats.gc_freed_memory);
append("lua_gc_duration_total_sec", m.lua_stats.gc_duration_ns * 1e-9);

append("total_journal_omits", m.events.journal_omit);
};

auto add_tiered_info = [&] {
Expand Down
2 changes: 1 addition & 1 deletion src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void SliceSnapshot::Start(bool stream_journal, SnapshotFlush allow_flush) {
DCHECK(!snapshot_fb_.IsJoinable());

use_background_mode_ = absl::GetFlag(FLAGS_background_snapshotting);
SerializerBase::RegisterChangeListener();
SerializerBase::RegisterChangeListener(stream_journal);

if (stream_journal) {
journal_cb_id_ = journal::RegisterConsumer(this);
Expand Down
9 changes: 7 additions & 2 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ class SetCmd {

OpStatus CachePrevIfNeeded(const SetParams& params, DbSlice::Iterator it);

const OpArgs op_args_;
OpArgs op_args_;
bool explicit_journal_; // call RecordJournal (auto journaling disabled)
bool skip_journal_ = false;
};

size_t SetRangeInternal(std::string* value, size_t start, std::string_view range) {
Expand Down Expand Up @@ -836,9 +837,13 @@ OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value
}
}

// Enable journal omits for this operation
op_args_.db_cntx.is_omittable_operation = true;

// We can use std::nullopt here because SET command can change the key type to string
auto op_res = db_slice.AddOrFind(op_args_.db_cntx, key, std::nullopt);
RETURN_ON_BAD_STATUS(op_res);
skip_journal_ = op_res->omitted_journal;

if (!op_res->is_new) {
if (auto status = CachePrevIfNeeded(params, op_res->it); status != OpStatus::OK)
Expand Down Expand Up @@ -934,7 +939,7 @@ void SetCmd::PostEdit(const SetParams& params, std::string_view key, std::string
StashPrimeValue(op_args_.db_cntx.db_index, key, pv, ts, params.backpressure);
}

if (explicit_journal_ && op_args_.shard->journal()) {
if (!skip_journal_ && explicit_journal_ && op_args_.shard->journal()) {
RecordJournal(params, key, value);
}
}
Expand Down
Loading
Loading