Skip to content

storage: book keep dirty_ratio in disk_log_impl #24649

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 109 additions & 15 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ disk_log_impl::disk_log_impl(
if (_compaction_enabled) {
s->mark_as_compacted_segment();
}
if (s != _segs.back()) {
auto seg_size_bytes = s->size_bytes();
if (!s->index().has_clean_compact_timestamp()) {
add_dirty_segment_bytes(seg_size_bytes);
}
add_closed_segment_bytes(seg_size_bytes);
}
}
_probe->initial_segments_count(_segs.size());
_probe->setup_metrics(this->config().ntp());
Expand Down Expand Up @@ -499,6 +506,10 @@ ss::future<> disk_log_impl::adjacent_merge_compact(
if (result.did_compact()) {
segment->clear_cached_disk_usage();
_compaction_ratio.update(result.compaction_ratio());
const auto removed_bytes = result.size_before - result.size_after;
subtract_dirty_segment_bytes(removed_bytes);
subtract_closed_segment_bytes(removed_bytes);

co_return;
}
}
Expand Down Expand Up @@ -586,6 +597,11 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
continue;
}

const auto removed_bytes = result.size_before - result.size_after;
if (!seg->index().has_clean_compact_timestamp()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm a bit confused about this check here. presumably the segment was just compacted above (did_compact() is true) so wouldn't it always be clean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the did_compact result from self compaction, a process that cannot mark a segment as cleanly compacted. Only sliding window compaction can mark a segment as cleanly compacted.

This check is to ensure any bytes removed by self compaction are only deducted from the dirty segment bytes counter if the segment in question is in fact dirty. If the segment has already been marked as cleanly compacted, we shouldn't remove bytes from the counter here.

subtract_dirty_segment_bytes(removed_bytes);
}
subtract_closed_segment_bytes(removed_bytes);
vlog(
gclog.debug,
"[{}] segment {} self compaction result: {}",
Expand Down Expand Up @@ -614,8 +630,12 @@ ss::future<bool> disk_log_impl::sliding_window_compact(
// For all intents and purposes, these segments are already cleanly
// compacted.
auto seg = segs.front();
co_await internal::mark_segment_as_finished_window_compaction(
seg, true, *_probe);
bool marked_as_cleanly_compacted
= co_await internal::mark_segment_as_finished_window_compaction(
seg, true, *_probe);
if (marked_as_cleanly_compacted) {
subtract_dirty_segment_bytes(seg->size_bytes());
}
segs.pop_front();
}
if (segs.empty()) {
Expand Down Expand Up @@ -814,6 +834,9 @@ disk_log_impl::compact_adjacent_segments(storage::compaction_config cfg) {
r);
if (r->did_compact()) {
_compaction_ratio.update(r->compaction_ratio());
const auto removed_bytes = r->size_before - r->size_after;
subtract_dirty_segment_bytes(removed_bytes);
subtract_closed_segment_bytes(removed_bytes);
}
} else {
vlog(
Expand Down Expand Up @@ -1260,13 +1283,21 @@ ss::future<bool> disk_log_impl::chunked_sliding_window_compact(

// Segments can now be marked as finished window compaction
for (auto& s : segs) {
co_await internal::mark_segment_as_finished_window_compaction(
s, false, *_probe);
std::ignore
= co_await internal::mark_segment_as_finished_window_compaction(
s, false, *_probe);
}

// We can also mark the last segment as cleanly compacted now
co_await internal::mark_segment_as_finished_window_compaction(
seg, true, *_probe);
// We can also mark the last segment as cleanly compacted now.
bool marked_as_cleanly_compacted
= co_await internal::mark_segment_as_finished_window_compaction(
seg, true, *_probe);

// This should always be true - the unindexed segment would not have been
// cleanly compacted to begin with.
if (marked_as_cleanly_compacted) {
subtract_dirty_segment_bytes(seg->size_bytes());
}

_last_compaction_window_start_offset = seg->offsets().get_base_offset();
_probe->add_chunked_compaction_run();
Expand Down Expand Up @@ -1308,8 +1339,12 @@ ss::future<> disk_log_impl::rewrite_segment_with_offset_map(
// higher than those indexed, it may be because the segment is
// entirely comprised of non-data batches. Mark it as compacted so
// we can progress through compactions.
co_await internal::mark_segment_as_finished_window_compaction(
seg, is_clean_compacted, *_probe);
bool marked_as_cleanly_compacted
= co_await internal::mark_segment_as_finished_window_compaction(
seg, is_clean_compacted, *_probe);
if (marked_as_cleanly_compacted) {
subtract_dirty_segment_bytes(seg->size_bytes());
}

vlog(
gclog.debug,
Expand All @@ -1323,8 +1358,12 @@ ss::future<> disk_log_impl::rewrite_segment_with_offset_map(
if (!seg->may_have_compactible_records()) {
// All data records are already compacted away. Skip to avoid a
// needless rewrite.
co_await internal::mark_segment_as_finished_window_compaction(
seg, is_clean_compacted, *_probe);
bool marked_as_cleanly_compacted
= co_await internal::mark_segment_as_finished_window_compaction(
seg, is_clean_compacted, *_probe);
if (marked_as_cleanly_compacted) {
subtract_dirty_segment_bytes(seg->size_bytes());
}

vlog(
gclog.trace,
Expand Down Expand Up @@ -1419,18 +1458,32 @@ ss::future<> disk_log_impl::rewrite_segment_with_offset_map(
seg->force_set_commit_offset_from_index();
seg->release_batch_cache_index();

const auto removed_bytes = ssize_t(size_before) - ssize_t(size_after);

// We must deduct the removed bytes from the dirty segment bytes.
// However, if the segment is marked as cleanly compacted below, the
// entire seg size (before compaction) will instead be removed from dirty
// segment bytes.
auto dirty_removed_bytes = removed_bytes;

if (is_finished_window_compaction) {
// Mark the segment as completed window compaction, and possibly set the
// clean_compact_timestamp in it's index.
co_await internal::mark_segment_as_finished_window_compaction(
seg, is_clean_compacted, *_probe);
bool marked_as_cleanly_compacted
= co_await internal::mark_segment_as_finished_window_compaction(
seg, is_clean_compacted, *_probe);
if (marked_as_cleanly_compacted) {
dirty_removed_bytes = size_before;
}
}

subtract_dirty_segment_bytes(dirty_removed_bytes);
subtract_closed_segment_bytes(removed_bytes);

co_await seg->index().flush();
co_await ss::rename_file(cmp_idx_tmpname.string(), cmp_idx_name.string());
_probe->segment_compacted();
_probe->add_compaction_removed_bytes(
ssize_t(size_before) - ssize_t(size_after));
_probe->add_compaction_removed_bytes(removed_bytes);

compaction_result res(size_before, size_after);
_compaction_ratio.update(res.compaction_ratio());
Expand Down Expand Up @@ -1748,6 +1801,14 @@ ss::future<> disk_log_impl::new_segment(
if (config().is_compacted()) {
h->mark_as_compacted_segment();
}
if (!_segs.empty()) {
auto rolled_seg = _segs.back();
const auto closed_segment_size = rolled_seg->size_bytes();
if (!rolled_seg->index().has_clean_compact_timestamp()) {
add_dirty_segment_bytes(closed_segment_size);
}
add_closed_segment_bytes(closed_segment_size);
}
_segs.add(std::move(h));
_probe->segment_created();
_stm_manager->make_snapshot_in_background();
Expand Down Expand Up @@ -2686,6 +2747,11 @@ ss::future<> disk_log_impl::remove_segment_permanently(
vlog(stlog.info, "Removing \"{}\" ({}, {})", s->filename(), ctx, s);
// stats accounting must happen synchronously
_probe->delete_segment(*s);
auto removed_segment_bytes = s->size_bytes();
if (!s->index().has_clean_compact_timestamp()) {
subtract_dirty_segment_bytes(removed_segment_bytes);
}
subtract_closed_segment_bytes(removed_segment_bytes);
// background close
s->tombstone();
if (s->has_outstanding_locks()) {
Expand Down Expand Up @@ -3974,4 +4040,32 @@ ss::future<> disk_log_impl::remove_kvstore_state(
.discard_result();
}

double disk_log_impl::dirty_ratio() const {
// Avoid division by 0.
return (_closed_segment_bytes == 0)
? 0.0
: static_cast<double>(_dirty_segment_bytes)
/ static_cast<double>(_closed_segment_bytes);
}

void disk_log_impl::add_dirty_segment_bytes(uint64_t bytes) {
_dirty_segment_bytes += bytes;
_probe->set_dirty_segment_bytes(_dirty_segment_bytes);
}

void disk_log_impl::add_closed_segment_bytes(uint64_t bytes) {
_closed_segment_bytes += bytes;
_probe->set_closed_segment_bytes(_closed_segment_bytes);
}

void disk_log_impl::subtract_dirty_segment_bytes(uint64_t bytes) {
_dirty_segment_bytes -= std::min(bytes, _dirty_segment_bytes);
_probe->set_dirty_segment_bytes(_dirty_segment_bytes);
}

void disk_log_impl::subtract_closed_segment_bytes(uint64_t bytes) {
_closed_segment_bytes -= std::min(bytes, _closed_segment_bytes);
_probe->set_closed_segment_bytes(_closed_segment_bytes);
}

} // namespace storage
36 changes: 36 additions & 0 deletions src/v/storage/disk_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,15 @@ class disk_log_impl final : public log {

size_t max_segment_size() const;

uint64_t dirty_segment_bytes() const { return _dirty_segment_bytes; }

uint64_t closed_segment_bytes() const { return _closed_segment_bytes; }

// Returns the dirty ratio of the log.
// The dirty ratio is the ratio of bytes in closed, dirty segments to the
// total number of bytes in all closed segments in the log.
double dirty_ratio() const;

private:
friend class disk_log_appender; // for multi-term appends
friend class disk_log_builder; // for tests
Expand Down Expand Up @@ -454,6 +463,33 @@ class disk_log_impl final : public log {
std::optional<model::offset> _last_compaction_window_start_offset;

size_t _reclaimable_size_bytes{0};

uint64_t _dirty_segment_bytes{0};
uint64_t _closed_segment_bytes{0};

// Update the number of bytes in dirty segments.
//
// Dirty segments are closed segments which have not yet been cleanly
// compacted- i.e, duplicates for keys in this segment _could_ be found in
// the prefix of the log up to this segment.
//
// This value can increase AND decrease. It increases
// when a new segment is rolled, and decreases when the segment is marked as
// cleanly compacted, closed segments are evicted from the log, or when
// bytes are removed by compaction. For that reason, one of the tags add_tag
// or subtract_tag must be used.
void add_dirty_segment_bytes(uint64_t bytes);
void subtract_dirty_segment_bytes(uint64_t bytes);

// Update the number of bytes in closed segments.
//
// This value can increase AND decrease. It increases when a new
// segment is rolled, and decreases when closed segments are evicted from
// the log, or when bytes are removed by compaction. For that reason, one of
// the tags add_tag or subtract_tag must be used.
void add_closed_segment_bytes(uint64_t bytes);
void subtract_closed_segment_bytes(uint64_t bytes);

bool _compaction_enabled;
};

Expand Down
10 changes: 10 additions & 0 deletions src/v/storage/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,16 @@ void probe::setup_metrics(const model::ntp& ntp) {
"corresponds to the number of times the compaction key-offset map "
"was unable to be built for a single segment."),
labels),
sm::make_gauge(
"dirty_segment_bytes",
[this] { return _dirty_segment_bytes; },
sm::description("Number of bytes within dirty segments of the log"),
labels),
sm::make_gauge(
"closed_segment_bytes",
[this] { return _closed_segment_bytes; },
sm::description("Number of bytes within closed segments of the log"),
labels),
},
{},
{sm::shard_label, partition_label});
Expand Down
12 changes: 12 additions & 0 deletions src/v/storage/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ class probe {
_bytes_prefix_truncated += bytes;
}

void set_dirty_segment_bytes(uint64_t bytes) {
_dirty_segment_bytes = bytes;
}

void set_closed_segment_bytes(uint64_t bytes) {
_closed_segment_bytes = bytes;
}

private:
uint64_t _partition_bytes = 0;
uint64_t _bytes_written = 0;
Expand All @@ -157,6 +165,10 @@ class probe {
uint64_t _segments_marked_tombstone_free = 0;
uint64_t _num_rounds_window_compaction = 0;
uint64_t _num_chunked_compaction_runs = 0;

uint64_t _dirty_segment_bytes = 0;
uint64_t _closed_segment_bytes = 0;

ssize_t _compaction_removed_bytes = 0;

metrics::internal_metric_groups _metrics;
Expand Down
6 changes: 3 additions & 3 deletions src/v/storage/segment_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1165,19 +1165,19 @@ offset_delta_time should_apply_delta_time_offset(
&& feature_table.local().is_active(features::feature::node_isolation)};
}

ss::future<> mark_segment_as_finished_window_compaction(
ss::future<bool> mark_segment_as_finished_window_compaction(
ss::lw_shared_ptr<segment> seg, bool set_clean_compact_timestamp, probe& pb) {
seg->mark_as_finished_windowed_compaction();
if (set_clean_compact_timestamp) {
bool did_set = seg->index().maybe_set_clean_compact_timestamp(
model::timestamp::now());
if (did_set) {
pb.add_cleanly_compacted_segment();
return seg->index().flush();
return seg->index().flush().then([] { return true; });
}
}

return ss::now();
return ss::make_ready_future<bool>(false);
}

bool is_past_tombstone_delete_horizon(
Expand Down
5 changes: 4 additions & 1 deletion src/v/storage/segment_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,10 @@ bool may_have_removable_tombstones(
// which case the `clean_compact_timestamp` is set in the segment's index).
// Also potentially issues a call to seg->index()->flush(), if the
// `clean_compact_timestamp` was set in the index.
ss::future<> mark_segment_as_finished_window_compaction(
//
// Returns a boolean indicating if the segment was marked as cleanly compacted
// for the first time and assigned a cleanly compacted timestamp.
ss::future<bool> mark_segment_as_finished_window_compaction(
ss::lw_shared_ptr<segment> seg, bool set_clean_compact_timestamp, probe& pb);

template<typename Func>
Expand Down
Loading