From 534ecd17b0c7d2f92b1656066111fb3a17699c69 Mon Sep 17 00:00:00 2001 From: ZhaoMing Date: Wed, 10 Sep 2025 15:04:07 +0800 Subject: [PATCH] Perform post-atomic-flush updates of memtable list in a callback --- db/memtable_list.cc | 146 +++++++++++++++++++++++--------------------- db/merge_test.cc | 17 +++++- 2 files changed, 93 insertions(+), 70 deletions(-) diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 93d8b05f836..f157df60649 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -948,81 +948,89 @@ Status InstallMemtableAtomicFlushResults( assert(0 == num_entries); } - // this can release and reacquire the mutex. - s = vset->LogAndApply(cfds, read_options, write_options, edit_lists, mu, - db_directory); - - for (size_t k = 0; k != cfds.size(); ++k) { - auto* imm = (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k); - imm->InstallNewVersion(); - } - - if (s.ok() || s.IsColumnFamilyDropped()) { - for (size_t i = 0; i != cfds.size(); ++i) { - if (cfds[i]->IsDropped()) { - continue; - } - auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i); - for (auto m : *mems_list[i]) { - assert(m->GetFileNumber() > 0); - uint64_t mem_id = m->GetID(); - - const VersionEdit* const edit = m->GetEdits(); - assert(edit); + const auto manifest_write_cb = [&cfds, imm_lists, &mems_list, log_buffer, + to_delete](const Status& status) { + for (size_t k = 0; k != cfds.size(); ++k) { + auto* imm = (imm_lists == nullptr) ? cfds[k]->imm() : imm_lists->at(k); + imm->InstallNewVersion(); + } - if (edit->GetBlobFileAdditions().empty()) { - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Level-0 commit table #%" PRIu64 - ": memtable #%" PRIu64 " done", - cfds[i]->GetName().c_str(), m->GetFileNumber(), - mem_id); - } else { - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Level-0 commit table #%" PRIu64 - " (+%zu blob files)" - ": memtable #%" PRIu64 " done", - cfds[i]->GetName().c_str(), m->GetFileNumber(), - edit->GetBlobFileAdditions().size(), mem_id); + if (status.ok() || status.IsColumnFamilyDropped()) { + for (size_t i = 0; i != cfds.size(); ++i) { + if (cfds[i]->IsDropped()) { + continue; + } + auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i); + for (auto m : *mems_list[i]) { + assert(m->GetFileNumber() > 0); + uint64_t mem_id = m->GetID(); + + const VersionEdit* const edit = m->GetEdits(); + assert(edit); + + if (edit->GetBlobFileAdditions().empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " done", + cfds[i]->GetName().c_str(), m->GetFileNumber(), + mem_id); + } else { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + " (+%zu blob files)" + ": memtable #%" PRIu64 " done", + cfds[i]->GetName().c_str(), m->GetFileNumber(), + edit->GetBlobFileAdditions().size(), mem_id); + } + + imm->current_->Remove(m, to_delete); + imm->UpdateCachedValuesFromMemTableListVersion(); + imm->ResetTrimHistoryNeeded(); } - - imm->current_->Remove(m, to_delete); - imm->UpdateCachedValuesFromMemTableListVersion(); - imm->ResetTrimHistoryNeeded(); } - } - } else { - for (size_t i = 0; i != cfds.size(); ++i) { - auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i); - for (auto m : *mems_list[i]) { - uint64_t mem_id = m->GetID(); - - const VersionEdit* const edit = m->GetEdits(); - assert(edit); - - if (edit->GetBlobFileAdditions().empty()) { - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Level-0 commit table #%" PRIu64 - ": memtable #%" PRIu64 " failed", - cfds[i]->GetName().c_str(), m->GetFileNumber(), - mem_id); - } else { - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Level-0 commit table #%" PRIu64 - " (+%zu blob files)" - ": memtable #%" PRIu64 " failed", - cfds[i]->GetName().c_str(), m->GetFileNumber(), - edit->GetBlobFileAdditions().size(), mem_id); + } else { + for (size_t i = 0; i != cfds.size(); ++i) { + auto* imm = (imm_lists == nullptr) ? cfds[i]->imm() : imm_lists->at(i); + for (auto m : *mems_list[i]) { + uint64_t mem_id = m->GetID(); + + const VersionEdit* const edit = m->GetEdits(); + assert(edit); + + if (edit->GetBlobFileAdditions().empty()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + ": memtable #%" PRIu64 " failed", + cfds[i]->GetName().c_str(), m->GetFileNumber(), + mem_id); + } else { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Level-0 commit table #%" PRIu64 + " (+%zu blob files)" + ": memtable #%" PRIu64 " failed", + cfds[i]->GetName().c_str(), m->GetFileNumber(), + edit->GetBlobFileAdditions().size(), mem_id); + } + + m->SetFlushCompleted(false); + m->SetFlushInProgress(false); + m->GetEdits()->Clear(); + m->SetFileNumber(0); + imm->num_flush_not_started_++; } - - m->SetFlushCompleted(false); - m->SetFlushInProgress(false); - m->GetEdits()->Clear(); - m->SetFileNumber(0); - imm->num_flush_not_started_++; + imm->imm_flush_needed.store(true, std::memory_order_release); } - imm->imm_flush_needed.store(true, std::memory_order_release); } - } + }; + + std::vector> manifest_write_cb_vec( + cfds.size()); + manifest_write_cb_vec.front() = manifest_write_cb; + // this can release and reacquire the mutex. + s = vset->LogAndApply(cfds, read_options, write_options, edit_lists, mu, + db_directory, /*new_descriptor_log=*/false, + /*column_family_options=*/nullptr, + manifest_write_cb_vec); return s; } diff --git a/db/merge_test.cc b/db/merge_test.cc index 0592856b735..d64d4579db8 100644 --- a/db/merge_test.cc +++ b/db/merge_test.cc @@ -97,12 +97,14 @@ uint64_t EnvMergeTest::now_nanos_count_{0}; std::unique_ptr EnvMergeTest::singleton_; std::shared_ptr OpenDb(const std::string& dbname, const bool ttl = false, - const size_t max_successive_merges = 0) { + const size_t max_successive_merges = 0, + bool atomic_flush = false) { DB* db; Options options; options.create_if_missing = true; options.merge_operator = std::make_shared(); options.max_successive_merges = max_successive_merges; + options.atomic_flush = atomic_flush; options.env = EnvMergeTest::GetInstance(); EXPECT_OK(DestroyDB(dbname, Options())); Status s; @@ -621,6 +623,19 @@ TEST_F(MergeTest, MergeWithCompactionAndFlush) { ASSERT_OK(DestroyDB(dbname, Options())); } +TEST_F(MergeTest, MergeWithCompactionAndAtomicFlush) { + const std::string dbname = + test::PerThreadDBPath("merge_with_compaction_and_flush"); + { + auto db = OpenDb(dbname, false, 0, true); + { + MergeBasedCounters counters(db, 0); + testCountersWithFlushAndCompaction(counters, db.get()); + } + } + ASSERT_OK(DestroyDB(dbname, Options())); +} + TEST_F(MergeTest, FullMergeV3FallbackNewValue) { // Test that the default FullMergeV3 implementation correctly handles the case // when FullMergeV2 results in a new value.