Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: book keep dirty_ratio in disk_log_impl #24649

Open
wants to merge 4 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 69 additions & 8 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ disk_log_impl::disk_log_impl(
if (_compaction_enabled) {
s->mark_as_compacted_segment();
}
auto seg_size_bytes = s->size_bytes();
update_dirty_segment_bytes<add_tag>(seg_size_bytes);
add_closed_segment_bytes(seg_size_bytes);
}
_probe->initial_segments_count(_segs.size());
_probe->setup_metrics(this->config().ntp());
Expand Down Expand Up @@ -613,8 +616,12 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
// For all intents and purposes, these segments are already cleanly
// compacted.
auto seg = segs.front();
co_await internal::mark_segment_as_finished_window_compaction(
seg, true, *_probe);
bool marked_as_cleanly_compacted
= co_await internal::mark_segment_as_finished_window_compaction(
seg, true, *_probe);
if (marked_as_cleanly_compacted) {
update_dirty_segment_bytes<subtract_tag>(seg->size_bytes());
}
segs.pop_front();
}
if (segs.empty()) {
Expand Down Expand Up @@ -708,8 +715,12 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
// higher than those indexed, it may be because the segment is
// entirely comprised of non-data batches. Mark it as compacted so
// we can progress through compactions.
co_await internal::mark_segment_as_finished_window_compaction(
seg, is_clean_compacted, *_probe);
bool marked_as_cleanly_compacted
= co_await internal::mark_segment_as_finished_window_compaction(
seg, is_clean_compacted, *_probe);
if (marked_as_cleanly_compacted) {
update_dirty_segment_bytes<subtract_tag>(seg->size_bytes());
}

vlog(
gclog.debug,
Expand All @@ -723,8 +734,12 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
if (!seg->may_have_compactible_records()) {
// All data records are already compacted away. Skip to avoid a
// needless rewrite.
co_await internal::mark_segment_as_finished_window_compaction(
seg, is_clean_compacted, *_probe);
bool marked_as_cleanly_compacted
= co_await internal::mark_segment_as_finished_window_compaction(
seg, is_clean_compacted, *_probe);
if (marked_as_cleanly_compacted) {
update_dirty_segment_bytes<subtract_tag>(seg->size_bytes());
}

vlog(
gclog.trace,
Expand Down Expand Up @@ -823,8 +838,12 @@ ss::future<bool> disk_log_impl::sliding_window_compact(

// Mark the segment as completed window compaction, and possibly set the
// clean_compact_timestamp in it's index.
co_await internal::mark_segment_as_finished_window_compaction(
seg, is_clean_compacted, *_probe);
bool marked_as_cleanly_compacted
= co_await internal::mark_segment_as_finished_window_compaction(
seg, is_clean_compacted, *_probe);
if (marked_as_cleanly_compacted) {
update_dirty_segment_bytes<subtract_tag>(seg->size_bytes());
}

co_await seg->index().flush();
co_await ss::rename_file(
Expand Down Expand Up @@ -1632,6 +1651,12 @@ ss::future<> disk_log_impl::new_segment(
if (config().is_compacted()) {
h->mark_as_compacted_segment();
}
if (!_segs.empty()) {
// Update dirty AND closed bytes
const auto closed_segment_size = _segs.back()->size_bytes();
update_dirty_segment_bytes<add_tag>(closed_segment_size);
add_closed_segment_bytes(closed_segment_size);
}
_segs.add(std::move(h));
_probe->segment_created();
_stm_manager->make_snapshot_in_background();
Expand Down Expand Up @@ -3858,4 +3883,40 @@ ss::future<> disk_log_impl::remove_kvstore_state(
.discard_result();
}

double disk_log_impl::dirty_ratio() const {
// Avoid division by 0.
if (_closed_segment_bytes == 0) {
return 0.0;
}

return static_cast<double>(_dirty_segment_bytes)
/ static_cast<double>(_closed_segment_bytes);
}

template<typename tag>
void disk_log_impl::update_dirty_segment_bytes(uint64_t bytes) {
static constexpr auto is_add_tag = std::is_same_v<tag, add_tag>;
static constexpr auto is_subtract_tag = std::is_same_v<tag, subtract_tag>;
static_assert(
is_add_tag || is_subtract_tag,
"update_dirty_segment_bytes must be called with subtract_tag or "
"add_tag");
if constexpr (is_add_tag) {
_dirty_segment_bytes += bytes;
} else if constexpr (is_subtract_tag) {
if (bytes > _dirty_segment_bytes) {
// Avoid unsigned underflow
_dirty_segment_bytes = 0;
} else {
_dirty_segment_bytes -= bytes;
}
}
_probe->set_dirty_segment_bytes(_dirty_segment_bytes);
}

void disk_log_impl::add_closed_segment_bytes(uint64_t bytes) {
_closed_segment_bytes += bytes;
_probe->set_closed_segment_bytes(_closed_segment_bytes);
}

} // namespace storage
33 changes: 33 additions & 0 deletions src/v/storage/disk_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,15 @@ class disk_log_impl final : public log {

size_t max_segment_size() const;

uint64_t dirty_segment_bytes() const { return _dirty_segment_bytes; }

uint64_t closed_segment_bytes() const { return _closed_segment_bytes; }

// Returns the dirty ratio of the log.
// The dirty ratio is the ratio of bytes in closed, dirty segments to the
// total number of bytes in all closed segments in the log.
double dirty_ratio() const;

private:
friend class disk_log_appender; // for multi-term appends
friend class disk_log_builder; // for tests
Expand Down Expand Up @@ -442,6 +451,30 @@ class disk_log_impl final : public log {
std::optional<model::offset> _last_compaction_window_start_offset;

size_t _reclaimable_size_bytes{0};

uint64_t _dirty_segment_bytes{0};
uint64_t _closed_segment_bytes{0};

struct add_tag {};
struct subtract_tag {};

// Update the number of bytes in dirty segments.
//
// Dirty segments are closed segments which have not yet been cleanly
// compacted- i.e, duplicates for keys in this segment _could_ be found in
// the prefix of the log up to this segment.
//
// This value can increase AND decrease. It increases
// when a segment is closed, and decreases when the segment is marked as
// cleanly compacted. For that reason, one of the tags add_tag or
// subtract_tag must be used.
template<typename tag>
void update_dirty_segment_bytes(uint64_t bytes);

// Update the number of bytes in closed segments.
// Unlike _dirty_segment_bytes, this can only increase.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently true, but maybe it should be decremented during the case of retention enforcement/segment eviction.

void add_closed_segment_bytes(uint64_t bytes);

bool _compaction_enabled;
};

Expand Down
10 changes: 10 additions & 0 deletions src/v/storage/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,16 @@ void probe::setup_metrics(const model::ntp& ntp) {
sm::description("Number of rounds of sliding window compaction that "
"have been driven to completion."),
labels),
sm::make_counter(
"dirty_segment_bytes",
[this] { return _dirty_segment_bytes; },
sm::description("Number of bytes within dirty segments of the log"),
labels),
sm::make_counter(
"closed_segment_bytes",
[this] { return _closed_segment_bytes; },
sm::description("Number of bytes within closed segments of the log"),
labels),
},
{},
{sm::shard_label, partition_label});
Expand Down
11 changes: 11 additions & 0 deletions src/v/storage/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ class probe {
_bytes_prefix_truncated += bytes;
}

void set_dirty_segment_bytes(uint64_t bytes) {
_dirty_segment_bytes = bytes;
}

void set_closed_segment_bytes(uint64_t bytes) {
_closed_segment_bytes = bytes;
}

private:
uint64_t _partition_bytes = 0;
uint64_t _bytes_written = 0;
Expand All @@ -151,6 +159,9 @@ class probe {
uint64_t _segments_marked_tombstone_free = 0;
uint64_t _num_rounds_window_compaction = 0;

uint64_t _dirty_segment_bytes = 0;
uint64_t _closed_segment_bytes = 0;

ssize_t _compaction_removed_bytes = 0;

metrics::internal_metric_groups _metrics;
Expand Down
6 changes: 3 additions & 3 deletions src/v/storage/segment_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1147,19 +1147,19 @@ offset_delta_time should_apply_delta_time_offset(
&& feature_table.local().is_active(features::feature::node_isolation)};
}

ss::future<> mark_segment_as_finished_window_compaction(
ss::future<bool> mark_segment_as_finished_window_compaction(
ss::lw_shared_ptr<segment> seg, bool set_clean_compact_timestamp, probe& pb) {
seg->mark_as_finished_windowed_compaction();
if (set_clean_compact_timestamp) {
bool did_set = seg->index().maybe_set_clean_compact_timestamp(
model::timestamp::now());
if (did_set) {
pb.add_cleanly_compacted_segment();
return seg->index().flush();
return seg->index().flush().then([] { return true; });
}
}

return ss::now();
return ss::make_ready_future<bool>(false);
}

bool is_past_tombstone_delete_horizon(
Expand Down
5 changes: 4 additions & 1 deletion src/v/storage/segment_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ bool may_have_removable_tombstones(
// which case the `clean_compact_timestamp` is set in the segment's index).
// Also potentially issues a call to seg->index()->flush(), if the
// `clean_compact_timestamp` was set in the index.
ss::future<> mark_segment_as_finished_window_compaction(
//
// Returns a boolean indicating if the segment was marked as cleanly compacted
// for the first time and assigned a cleanly compacted timestamp.
ss::future<bool> mark_segment_as_finished_window_compaction(
ss::lw_shared_ptr<segment> seg, bool set_clean_compact_timestamp, probe& pb);

template<typename Func>
Expand Down
85 changes: 85 additions & 0 deletions src/v/storage/tests/storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "random/generators.h"
#include "reflection/adl.h"
#include "storage/batch_cache.h"
#include "storage/disk_log_impl.h"
#include "storage/log_manager.h"
#include "storage/log_reader.h"
#include "storage/ntp_config.h"
Expand Down Expand Up @@ -5179,3 +5180,87 @@ FIXTURE_TEST(test_offset_range_size_incremental, storage_test_fixture) {
}
}
};

FIXTURE_TEST(dirty_ratio, storage_test_fixture) {
auto cfg = default_log_config(test_dir);
cfg.max_compacted_segment_size = config::mock_binding<size_t>(100_MiB);
cfg.cache = storage::with_cache::yes;
storage::ntp_config::default_overrides overrides;
overrides.cleanup_policy_bitflags
= model::cleanup_policy_bitflags::compaction;

ss::abort_source as;
storage::log_manager mgr = make_log_manager(cfg);
auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get(); });
auto ntp = model::ntp("default", "test", 0);
auto log = mgr
.manage(storage::ntp_config(
ntp,
mgr.config().base_dir,
std::make_unique<storage::ntp_config::default_overrides>(
overrides)))
.get();

auto* disk_log = static_cast<storage::disk_log_impl*>(log.get());

// add a segment with random keys until a certain size
auto add_segment = [log](size_t size, model::term_id term) {
do {
append_single_record_batch(log, 1, term, 16_KiB, true);
} while (log->segments().back()->size_bytes() < size);
};

static constexpr double tolerance = 1.0e-6;
uint64_t closed_segments_size_bytes = 0;

auto assert_on_new_segment = [&disk_log, &closed_segments_size_bytes, &as](
size_t index) {
auto new_segment_size_bytes = disk_log->segments()[index]->size_bytes();
closed_segments_size_bytes += new_segment_size_bytes;
auto expected_dirty_ratio = static_cast<double>(new_segment_size_bytes)
/ static_cast<double>(
closed_segments_size_bytes);
BOOST_REQUIRE_EQUAL(
disk_log->dirty_segment_bytes(), new_segment_size_bytes);
BOOST_REQUIRE_EQUAL(
disk_log->closed_segment_bytes(), closed_segments_size_bytes);
BOOST_REQUIRE_CLOSE(
disk_log->dirty_ratio(), expected_dirty_ratio, tolerance);

// Perform sliding window compaction, which will fully cleanly compact
// the log.
static const storage::compaction_config compact_cfg(
model::offset::max(), std::nullopt, ss::default_priority_class(), as);
disk_log->sliding_window_compact(compact_cfg).get();

BOOST_REQUIRE_EQUAL(disk_log->dirty_segment_bytes(), 0);
BOOST_REQUIRE_EQUAL(
disk_log->closed_segment_bytes(), closed_segments_size_bytes);
BOOST_REQUIRE_CLOSE(disk_log->dirty_ratio(), 0.0, tolerance);
};

add_segment(2_MiB, model::term_id(1));
disk_log->force_roll(ss::default_priority_class()).get();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 2);
assert_on_new_segment(0);

add_segment(2_MiB, model::term_id(1));
disk_log->force_roll(ss::default_priority_class()).get();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 3);
assert_on_new_segment(1);

add_segment(5_MiB, model::term_id(1));
disk_log->force_roll(ss::default_priority_class()).get();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 4);
assert_on_new_segment(2);

add_segment(16_KiB, model::term_id(1));
disk_log->force_roll(ss::default_priority_class()).get();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 5);
assert_on_new_segment(3);

add_segment(16_KiB, model::term_id(1));
disk_log->force_roll(ss::default_priority_class()).get();
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 6);
assert_on_new_segment(4);
}
Loading