-
Notifications
You must be signed in to change notification settings - Fork 601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CORE-8160] storage
: add chunked compaction routine
#24423
[CORE-8160] storage
: add chunked compaction routine
#24423
Conversation
1d4f546
to
2ca689c
Compare
Can you explain what "chunked compaction" is? When would sliding window fail to index segments, and why do we care? |
Added more detail to cover letter to address these points. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty much looks good! No major complains about structure, just some naming suggestions. Nice work!
Also could probably use some ducktape testing, though IIRC you mentioned a separate PR for stress testing compaction
That PR is merged, I'm going to parameterize it in order to test chunked compaction and assert on some added metrics. Will have updates to this |
co_await map.reset(); | ||
auto read_holder = co_await seg->read_lock(); | ||
auto start_offset_inclusive = model::next_offset(last_indexed_offset); | ||
auto rdr = internal::create_segment_full_reader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recreating this full segment reader for each round of chunked compaction is a bummer.
Not sure if we have any abstractions to get around this- log_reader::reset_config() gave me some hope that the segment's lease/lock could be reused, but it doesn't seem to allow us to reset with a start_offset
lower than what has been currently read.
For context, we have to do this because in the chunked_compaction_reducer
, once we fail to index an offset for a record in a batch, we break out of the loop and will have to re-read that batch in the next round using that offset as the start, inclusively.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving comment to rehighlight this point, in case @andrwng or anyone else has any ideas or comments on the cost of this repeated operation/possible tools at our disposal here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving comment to rehighlight this point, in case @andrwng or anyone else has any ideas or comments on the cost of this repeated operation/possible tools at our disposal here.
well we have the readers cache, but i dunno if it is useful in this context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this code is new and not used often, i think we should favor simplicity, unless of course something would be worse than just 'not optimal'.
2ca689c
to
4d14fd5
Compare
4d14fd5
to
fc57dff
Compare
Force push to:
|
ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/59620#0193b7c9-24ab-4e30-86c0-486e483807a5 |
/ci-repeat 5 |
/ci-repeat 5 |
Retry command for Build#59673please wait until all jobs are finished before running the slash command
|
Possibly bad interaction between partition movement and Seemingly unrelated to compaction changes. |
CI test resultstest results on build#59673
test results on build#59782
test results on build#60281
test results on build#60366
test results on build#60412
test results on build#60610
test results on build#61021
|
fc57dff
to
fe0991e
Compare
Force push to:
|
Retry command for Build#59782please wait until all jobs are finished before running the slash command
|
/ci-repeat |
ss::future<ss::stop_iteration> | ||
map_building_reducer::operator()(model::record_batch batch) { | ||
bool fully_indexed_batch = true; | ||
auto b = co_await decompress_batch(std::move(batch)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optimization: don't call _map->put()
for records in non-compactible batch types, they would just be a waste of map space?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you do that right above this line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And better define it for `simple_key_offset_map`.
Optionally provide a starting offset from which the reader's `min_offset` value is assigned (otherwise, the `base_offset()` of the `segment` is used).
Uses the `map_building_reducer` to perform a linear read of a `segment` and index its keys and offsets, starting from a provided offset.
71d6a22
to
fc4b9dd
Compare
Force push to:
|
Retry command for Build#60366please wait until all jobs are finished before running the slash command
|
fc4b9dd
to
1acfc4e
Compare
Force push to:
|
Nice |
In the case that zero segments were able to be indexed for a round of sliding window compaction, chunked compaction must be performed. This implementation uses some of the current abstractions from the compaction utilities to perform several rounds of sliding window compaction with a partially indexed map created from the un-indexed segment in a linear fashion. This implementation is sub-optimal for a number of reasons- namely, that segment indexes are read and rewritten each time a round of chunked compaction is performed. These intermediate states are then used for the next round of chunked compaction. In the future, there may be a more optimal way to perform these steps using less IO by holding more information in memory before flushing the final results to disk, and not every intermediate stage.
GTest `ASSERT_*` macros cannot be used in non-`void` returning functions. Add `RPTEST_EXPECT_EQ` to provide flexibility in testing for non-`void` functions.
To move away from hardcoded boost asserts and provide compatibility in a GTest environment.
This would previously overshoot the `size_bytes` provided to it by filling with `elements_per_fragment()` at least once. In the lower limit, when `required_entries` is less than `elements_per_fragment()`, we should be taking the minimum of the two values and pushing back that number of objects to the `entries` container.
In order to test the chunked compaction routine, parameterize the existing compaction test suite with `storage_compaction_key_map_memory_kb`. By limiting this value, we can force compaction to go down the chunked compaction path, and verify the log using the existing utilities after compaction settles. Some added asserts are used to verify chunked compaction is taken or not taken as a code path, depending on the memory constraints specified.
1acfc4e
to
0de4571
Compare
Force push to:
Unfortunate that |
hehe. we don't need to do that in the context of this PR or is it related to chunked compaction? |
The potential race mentioned was related to the added chunked compaction code, yes. |
co_await map.reset(); | ||
auto read_holder = co_await seg->read_lock(); | ||
auto start_offset_inclusive = model::next_offset(last_indexed_offset); | ||
auto rdr = internal::create_segment_full_reader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving comment to rehighlight this point, in case @andrwng or anyone else has any ideas or comments on the cost of this repeated operation/possible tools at our disposal here.
well we have the readers cache, but i dunno if it is useful in this context.
co_await map.reset(); | ||
auto read_holder = co_await seg->read_lock(); | ||
auto start_offset_inclusive = model::next_offset(last_indexed_offset); | ||
auto rdr = internal::create_segment_full_reader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this code is new and not used often, i think we should favor simplicity, unless of course something would be worse than just 'not optimal'.
Add a new function `map_building_reducer::maybe_index_record_in_map()` to avoid possibly dangling reference in a continuation. While this code was "technically" safe due to the fact `key_offset_map::put()` didn't have any scheduling points in it, this refactor avoids the problem entirely by moving all defined stack variables into a new coroutine function.
Push to:
|
This PR deals with the case in which zero segments were indexed for a round of sliding window compaction. This can happen for segments with a large number of unique keys, and per the memory constraints imposed on our key-offset hash map by
storage_compaction_key_map_memory
(128MiB by default).This (historically) has not come about often, and may also be naturally alleviated by deduplicating or partially indexing the problem segment in question during future rounds of compaction (provided there is a steady ingress rate to the partition, and that keys in the problem segment are present in newer segments in the log), but added here is a routine that can handle this corner case when it arises.
Instead of throwing and logging an error when zero segments are indexed, we will now fall back to a "chunked" compaction routine.
This implementation uses some of the current abstractions from the compaction utilities to perform several rounds (chunks) of sliding window compaction with a partially indexed map created from the un-indexed segment by reading it in a linear fashion.
This implementation is sub-optimal for a number of reasons- primarily, segment indexes are read and rewritten each time a round of chunked compaction is performed. These intermediate states are then used for the next round of chunked compaction.
In the future, there may be a more optimal way to perform these steps using less IO by holding more information in memory before flushing the final results to disk, instead of flushing every intermediate stage. However, this case in which chunked compaction is required has seemed to be infrequent enough that merely having the implementation is valuable.
Backports Required
Release Notes
Improvements