@@ -196,8 +196,16 @@ ss::future<>
196
196
log_manager::housekeeping_scan (model::timestamp collection_threshold) {
197
197
using bflags = log_housekeeping_meta::bitflags;
198
198
199
- if (_logs_list.empty ()) {
200
- co_return ;
199
+ static constexpr auto is_not_set = [](bflags var, auto flag) {
200
+ return (var & flag) != flag;
201
+ };
202
+
203
+ // reset flags for the next two loops, segment_ms and compaction.
204
+ // since there are suspension points during the traversal of _logs_list, the
205
+ // algorithm is: mark the logs visited, rotate _logs_list, op, and loop
206
+ // until empty or reaching a marked log
207
+ for (auto & log_meta : _logs_list) {
208
+ log_meta.flags &= ~(bflags::compacted | bflags::lifetime_checked);
201
209
}
202
210
203
211
/*
@@ -211,29 +219,40 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) {
211
219
* compaction is already sequential when this will be unified with
212
220
* compaction, the whole task could be made concurrent
213
221
*/
214
- for (auto & log_meta : _logs_list) {
215
- co_await log_meta.handle ->apply_segment_ms ();
216
- }
222
+ while (!_logs_list.empty ()
223
+ && is_not_set (_logs_list.front ().flags , bflags::lifetime_checked)) {
224
+ if (_abort_source.abort_requested ()) {
225
+ co_return ;
226
+ }
217
227
218
- for (auto & log_meta : _logs_list) {
219
- log_meta.flags &= ~bflags::compacted;
228
+ auto & current_log = _logs_list.front ();
229
+ _logs_list.shift_forward ();
230
+
231
+ current_log.flags |= bflags::lifetime_checked;
232
+ // NOTE: apply_segment_ms holds _compaction_housekeeping_gate, that
233
+ // prevents the removal of the parent object. this makes awaiting
234
+ // apply_segment_ms safe against removal of segments from _logs_list
235
+ co_await current_log.handle ->apply_segment_ms ();
220
236
}
221
237
222
- while ((_logs_list.front ().flags & bflags::compacted) == bflags::none) {
238
+ while (!_logs_list.empty ()
239
+ && is_not_set (_logs_list.front ().flags , bflags::compacted)) {
223
240
if (_abort_source.abort_requested ()) {
224
241
co_return ;
225
242
}
226
243
227
244
auto & current_log = _logs_list.front ();
228
245
229
- _logs_list.pop_front ();
230
- _logs_list.push_back (current_log);
246
+ _logs_list.shift_forward ();
231
247
232
248
current_log.flags |= bflags::compacted;
233
249
current_log.last_compaction = ss::lowres_clock::now ();
234
250
235
251
auto ntp_sanitizer_cfg = _config.maybe_get_ntp_sanitizer_config (
236
252
current_log.handle ->config ().ntp ());
253
+ // NOTE: housekeeping holds _compaction_housekeeping_gate, that prevents
254
+ // the removal of the parent object. this makes awaiting housekeeping
255
+ // safe against removal of segments from _logs_list
237
256
co_await current_log.handle ->housekeeping (housekeeping_config (
238
257
collection_threshold,
239
258
_config.retention_bytes (),
@@ -242,10 +261,6 @@ log_manager::housekeeping_scan(model::timestamp collection_threshold) {
242
261
_abort_source,
243
262
std::move (ntp_sanitizer_cfg)));
244
263
245
- if (_logs_list.empty ()) {
246
- co_return ;
247
- }
248
-
249
264
// bail out of compaction early in order to get back to gc
250
265
if (_gc_triggered) {
251
266
co_return ;
0 commit comments