From d19459c3f9a95fe78c18d9983de483c3c6b016e5 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Mon, 23 Dec 2024 16:03:32 -0500 Subject: [PATCH 1/4] `storage`: add `dirty/closed_segment_bytes` to `probe` --- src/v/storage/probe.cc | 10 ++++++++++ src/v/storage/probe.h | 11 +++++++++++ 2 files changed, 21 insertions(+) diff --git a/src/v/storage/probe.cc b/src/v/storage/probe.cc index de7c96109fb6e..f5a283212e811 100644 --- a/src/v/storage/probe.cc +++ b/src/v/storage/probe.cc @@ -221,6 +221,16 @@ void probe::setup_metrics(const model::ntp& ntp) { sm::description("Number of rounds of sliding window compaction that " "have been driven to completion."), labels), + sm::make_counter( + "dirty_segment_bytes", + [this] { return _dirty_segment_bytes; }, + sm::description("Number of bytes within dirty segments of the log"), + labels), + sm::make_counter( + "closed_segment_bytes", + [this] { return _closed_segment_bytes; }, + sm::description("Number of bytes within closed segments of the log"), + labels), }, {}, {sm::shard_label, partition_label}); diff --git a/src/v/storage/probe.h b/src/v/storage/probe.h index 449e6aefeff2f..185d26db8d334 100644 --- a/src/v/storage/probe.h +++ b/src/v/storage/probe.h @@ -127,6 +127,14 @@ class probe { _bytes_prefix_truncated += bytes; } + void set_dirty_segment_bytes(uint64_t bytes) { + _dirty_segment_bytes = bytes; + } + + void set_closed_segment_bytes(uint64_t bytes) { + _closed_segment_bytes = bytes; + } + private: uint64_t _partition_bytes = 0; uint64_t _bytes_written = 0; @@ -151,6 +159,9 @@ class probe { uint64_t _segments_marked_tombstone_free = 0; uint64_t _num_rounds_window_compaction = 0; + uint64_t _dirty_segment_bytes = 0; + uint64_t _closed_segment_bytes = 0; + ssize_t _compaction_removed_bytes = 0; metrics::internal_metric_groups _metrics; From 18afd0ed51337144bd738d16aeafc46397994e2b Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Mon, 23 Dec 2024 16:05:50 -0500 Subject: [PATCH 2/4] `storage`: add `dirty/closed_segment_bytes` to `disk_log_impl` --- src/v/storage/disk_log_impl.cc | 36 ++++++++++++++++++++++++++++++++++ src/v/storage/disk_log_impl.h | 33 +++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index 91ad021d58d90..ff3930a036229 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -3858,4 +3858,40 @@ ss::future<> disk_log_impl::remove_kvstore_state( .discard_result(); } +double disk_log_impl::dirty_ratio() const { + // Avoid division by 0. + if (_closed_segment_bytes == 0) { + return 0.0; + } + + return static_cast(_dirty_segment_bytes) + / static_cast(_closed_segment_bytes); +} + +template +void disk_log_impl::update_dirty_segment_bytes(uint64_t bytes) { + static constexpr auto is_add_tag = std::is_same_v; + static constexpr auto is_subtract_tag = std::is_same_v; + static_assert( + is_add_tag || is_subtract_tag, + "update_dirty_segment_bytes must be called with subtract_tag or " + "add_tag"); + if constexpr (is_add_tag) { + _dirty_segment_bytes += bytes; + } else if constexpr (is_subtract_tag) { + if (bytes > _dirty_segment_bytes) { + // Avoid unsigned underflow + _dirty_segment_bytes = 0; + } else { + _dirty_segment_bytes -= bytes; + } + } + _probe->set_dirty_segment_bytes(_dirty_segment_bytes); +} + +void disk_log_impl::add_closed_segment_bytes(uint64_t bytes) { + _closed_segment_bytes += bytes; + _probe->set_closed_segment_bytes(_closed_segment_bytes); +} + } // namespace storage diff --git a/src/v/storage/disk_log_impl.h b/src/v/storage/disk_log_impl.h index 00b6f3ed511ee..6504306193621 100644 --- a/src/v/storage/disk_log_impl.h +++ b/src/v/storage/disk_log_impl.h @@ -246,6 +246,15 @@ class disk_log_impl final : public log { size_t max_segment_size() const; + uint64_t dirty_segment_bytes() const { return _dirty_segment_bytes; } + + uint64_t closed_segment_bytes() const { return _closed_segment_bytes; } + + // Returns the dirty ratio of the log. + // The dirty ratio is the ratio of bytes in closed, dirty segments to the + // total number of bytes in all closed segments in the log. + double dirty_ratio() const; + private: friend class disk_log_appender; // for multi-term appends friend class disk_log_builder; // for tests @@ -442,6 +451,30 @@ class disk_log_impl final : public log { std::optional _last_compaction_window_start_offset; size_t _reclaimable_size_bytes{0}; + + uint64_t _dirty_segment_bytes{0}; + uint64_t _closed_segment_bytes{0}; + + struct add_tag {}; + struct subtract_tag {}; + + // Update the number of bytes in dirty segments. + // + // Dirty segments are closed segments which have not yet been cleanly + // compacted- i.e, duplicates for keys in this segment _could_ be found in + // the prefix of the log up to this segment. + // + // This value can increase AND decrease. It increases + // when a segment is closed, and decreases when the segment is marked as + // cleanly compacted. For that reason, one of the tags add_tag or + // subtract_tag must be used. + template + void update_dirty_segment_bytes(uint64_t bytes); + + // Update the number of bytes in closed segments. + // Unlike _dirty_segment_bytes, this can only increase. + void add_closed_segment_bytes(uint64_t bytes); + bool _compaction_enabled; }; From 66fa5d9ff1ece090dfa1e3ad102ffab80b5010d0 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Mon, 23 Dec 2024 16:06:25 -0500 Subject: [PATCH 3/4] `storage`: book keep `dirty` and `closed` segment bytes --- src/v/storage/disk_log_impl.cc | 41 +++++++++++++++++++++++++++------- src/v/storage/segment_utils.cc | 6 ++--- src/v/storage/segment_utils.h | 5 ++++- 3 files changed, 40 insertions(+), 12 deletions(-) diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index ff3930a036229..85123022520d0 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -147,6 +147,9 @@ disk_log_impl::disk_log_impl( if (_compaction_enabled) { s->mark_as_compacted_segment(); } + auto seg_size_bytes = s->size_bytes(); + update_dirty_segment_bytes(seg_size_bytes); + add_closed_segment_bytes(seg_size_bytes); } _probe->initial_segments_count(_segs.size()); _probe->setup_metrics(this->config().ntp()); @@ -613,8 +616,12 @@ ss::future disk_log_impl::sliding_window_compact( // For all intents and purposes, these segments are already cleanly // compacted. auto seg = segs.front(); - co_await internal::mark_segment_as_finished_window_compaction( - seg, true, *_probe); + bool marked_as_cleanly_compacted + = co_await internal::mark_segment_as_finished_window_compaction( + seg, true, *_probe); + if (marked_as_cleanly_compacted) { + update_dirty_segment_bytes(seg->size_bytes()); + } segs.pop_front(); } if (segs.empty()) { @@ -708,8 +715,12 @@ ss::future disk_log_impl::sliding_window_compact( // higher than those indexed, it may be because the segment is // entirely comprised of non-data batches. Mark it as compacted so // we can progress through compactions. - co_await internal::mark_segment_as_finished_window_compaction( - seg, is_clean_compacted, *_probe); + bool marked_as_cleanly_compacted + = co_await internal::mark_segment_as_finished_window_compaction( + seg, is_clean_compacted, *_probe); + if (marked_as_cleanly_compacted) { + update_dirty_segment_bytes(seg->size_bytes()); + } vlog( gclog.debug, @@ -723,8 +734,12 @@ ss::future disk_log_impl::sliding_window_compact( if (!seg->may_have_compactible_records()) { // All data records are already compacted away. Skip to avoid a // needless rewrite. - co_await internal::mark_segment_as_finished_window_compaction( - seg, is_clean_compacted, *_probe); + bool marked_as_cleanly_compacted + = co_await internal::mark_segment_as_finished_window_compaction( + seg, is_clean_compacted, *_probe); + if (marked_as_cleanly_compacted) { + update_dirty_segment_bytes(seg->size_bytes()); + } vlog( gclog.trace, @@ -823,8 +838,12 @@ ss::future disk_log_impl::sliding_window_compact( // Mark the segment as completed window compaction, and possibly set the // clean_compact_timestamp in it's index. - co_await internal::mark_segment_as_finished_window_compaction( - seg, is_clean_compacted, *_probe); + bool marked_as_cleanly_compacted + = co_await internal::mark_segment_as_finished_window_compaction( + seg, is_clean_compacted, *_probe); + if (marked_as_cleanly_compacted) { + update_dirty_segment_bytes(seg->size_bytes()); + } co_await seg->index().flush(); co_await ss::rename_file( @@ -1632,6 +1651,12 @@ ss::future<> disk_log_impl::new_segment( if (config().is_compacted()) { h->mark_as_compacted_segment(); } + if (!_segs.empty()) { + // Update dirty AND closed bytes + const auto closed_segment_size = _segs.back()->size_bytes(); + update_dirty_segment_bytes(closed_segment_size); + add_closed_segment_bytes(closed_segment_size); + } _segs.add(std::move(h)); _probe->segment_created(); _stm_manager->make_snapshot_in_background(); diff --git a/src/v/storage/segment_utils.cc b/src/v/storage/segment_utils.cc index 5ab8341a7d565..4e853459e8f8e 100644 --- a/src/v/storage/segment_utils.cc +++ b/src/v/storage/segment_utils.cc @@ -1147,7 +1147,7 @@ offset_delta_time should_apply_delta_time_offset( && feature_table.local().is_active(features::feature::node_isolation)}; } -ss::future<> mark_segment_as_finished_window_compaction( +ss::future mark_segment_as_finished_window_compaction( ss::lw_shared_ptr seg, bool set_clean_compact_timestamp, probe& pb) { seg->mark_as_finished_windowed_compaction(); if (set_clean_compact_timestamp) { @@ -1155,11 +1155,11 @@ ss::future<> mark_segment_as_finished_window_compaction( model::timestamp::now()); if (did_set) { pb.add_cleanly_compacted_segment(); - return seg->index().flush(); + return seg->index().flush().then([] { return true; }); } } - return ss::now(); + return ss::make_ready_future(false); } bool is_past_tombstone_delete_horizon( diff --git a/src/v/storage/segment_utils.h b/src/v/storage/segment_utils.h index df60ec3a07e47..934b473d74cc4 100644 --- a/src/v/storage/segment_utils.h +++ b/src/v/storage/segment_utils.h @@ -283,7 +283,10 @@ bool may_have_removable_tombstones( // which case the `clean_compact_timestamp` is set in the segment's index). // Also potentially issues a call to seg->index()->flush(), if the // `clean_compact_timestamp` was set in the index. -ss::future<> mark_segment_as_finished_window_compaction( +// +// Returns a boolean indicating if the segment was marked as cleanly compacted +// for the first time and assigned a cleanly compacted timestamp. +ss::future mark_segment_as_finished_window_compaction( ss::lw_shared_ptr seg, bool set_clean_compact_timestamp, probe& pb); template From 018f7a20267f2d672ce0cc2d5815196a3472b21f Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Mon, 23 Dec 2024 16:06:46 -0500 Subject: [PATCH 4/4] `storage`: add `dirty_ratio` test to `storage_e2e_test.cc` --- src/v/storage/tests/storage_e2e_test.cc | 85 +++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/src/v/storage/tests/storage_e2e_test.cc b/src/v/storage/tests/storage_e2e_test.cc index 2f62724672408..e9e8962d4b99d 100644 --- a/src/v/storage/tests/storage_e2e_test.cc +++ b/src/v/storage/tests/storage_e2e_test.cc @@ -24,6 +24,7 @@ #include "random/generators.h" #include "reflection/adl.h" #include "storage/batch_cache.h" +#include "storage/disk_log_impl.h" #include "storage/log_manager.h" #include "storage/log_reader.h" #include "storage/ntp_config.h" @@ -5179,3 +5180,87 @@ FIXTURE_TEST(test_offset_range_size_incremental, storage_test_fixture) { } } }; + +FIXTURE_TEST(dirty_ratio, storage_test_fixture) { + auto cfg = default_log_config(test_dir); + cfg.max_compacted_segment_size = config::mock_binding(100_MiB); + cfg.cache = storage::with_cache::yes; + storage::ntp_config::default_overrides overrides; + overrides.cleanup_policy_bitflags + = model::cleanup_policy_bitflags::compaction; + + ss::abort_source as; + storage::log_manager mgr = make_log_manager(cfg); + auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get(); }); + auto ntp = model::ntp("default", "test", 0); + auto log = mgr + .manage(storage::ntp_config( + ntp, + mgr.config().base_dir, + std::make_unique( + overrides))) + .get(); + + auto* disk_log = static_cast(log.get()); + + // add a segment with random keys until a certain size + auto add_segment = [log](size_t size, model::term_id term) { + do { + append_single_record_batch(log, 1, term, 16_KiB, true); + } while (log->segments().back()->size_bytes() < size); + }; + + static constexpr double tolerance = 1.0e-6; + uint64_t closed_segments_size_bytes = 0; + + auto assert_on_new_segment = [&disk_log, &closed_segments_size_bytes, &as]( + size_t index) { + auto new_segment_size_bytes = disk_log->segments()[index]->size_bytes(); + closed_segments_size_bytes += new_segment_size_bytes; + auto expected_dirty_ratio = static_cast(new_segment_size_bytes) + / static_cast( + closed_segments_size_bytes); + BOOST_REQUIRE_EQUAL( + disk_log->dirty_segment_bytes(), new_segment_size_bytes); + BOOST_REQUIRE_EQUAL( + disk_log->closed_segment_bytes(), closed_segments_size_bytes); + BOOST_REQUIRE_CLOSE( + disk_log->dirty_ratio(), expected_dirty_ratio, tolerance); + + // Perform sliding window compaction, which will fully cleanly compact + // the log. + static const storage::compaction_config compact_cfg( + model::offset::max(), std::nullopt, ss::default_priority_class(), as); + disk_log->sliding_window_compact(compact_cfg).get(); + + BOOST_REQUIRE_EQUAL(disk_log->dirty_segment_bytes(), 0); + BOOST_REQUIRE_EQUAL( + disk_log->closed_segment_bytes(), closed_segments_size_bytes); + BOOST_REQUIRE_CLOSE(disk_log->dirty_ratio(), 0.0, tolerance); + }; + + add_segment(2_MiB, model::term_id(1)); + disk_log->force_roll(ss::default_priority_class()).get(); + BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 2); + assert_on_new_segment(0); + + add_segment(2_MiB, model::term_id(1)); + disk_log->force_roll(ss::default_priority_class()).get(); + BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 3); + assert_on_new_segment(1); + + add_segment(5_MiB, model::term_id(1)); + disk_log->force_roll(ss::default_priority_class()).get(); + BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 4); + assert_on_new_segment(2); + + add_segment(16_KiB, model::term_id(1)); + disk_log->force_roll(ss::default_priority_class()).get(); + BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 5); + assert_on_new_segment(3); + + add_segment(16_KiB, model::term_id(1)); + disk_log->force_roll(ss::default_priority_class()).get(); + BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 6); + assert_on_new_segment(4); +}