Skip to content

Commit 0831b62

Browse files
committed
storage: fix race between segment.ms and appends
H/T to VladLazar for a similar change that inspired this one: VladLazar@682aea5 The problem statement: ``` We have seen a couple of races between the application of `segment.ms` and the normal append path. They had the following pattern in common: 1. application of `segment.ms` begins 2. a call to `segment::append` is interleaved 3. the append finishes first and and advances the dirty offsets, which the rolling logic in `segment.ms` does not expect -- or -- 4. `segment.ms` releases the current appender while the append is ongoing, which the append logic does not expect ``` The proposed fix was to introduce a new appender lock to the segment, and ensure that it is held while appending an while segment.ms rolling. This addressed problem #3, but wasn't sufficient to address redpanda-data#4. The issue with introducing another lock to the segment is that the unexpected behavior when appending to a segment happens in the context of an already referenced segment. I.e. the appending fiber may proceed to reference an appender, only for it to be destructed by the housekeeping fiber before segment::append() is called, resulting in a segfault. This patch extends usage of the existing disk_log_impl::_segments_rolling_lock to cover the entire duration of append (i.e. not just the underlying segment::append() call), ensuring that segment.ms rolls and appends are mutually exclusive.
1 parent f4c4593 commit 0831b62

File tree

3 files changed

+57
-23
lines changed

3 files changed

+57
-23
lines changed

src/v/storage/disk_log_appender.cc

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,17 +50,22 @@ ss::future<> disk_log_appender::initialize() {
5050
}
5151

5252
bool disk_log_appender::segment_is_appendable(model::term_id batch_term) const {
53-
/**
54-
* _log._segs.empty() is a tricky condition. It is here to suppor concurrent
55-
* truncation (from 0) of an active log segment while we hold the lock of a
56-
* valid segment.
57-
*
58-
* Checking for term is because we support multiple term appends which
59-
* always roll
60-
*
61-
* _bytes_left_in_segment is for initial condition
62-
*
63-
*/
53+
if (!_seg || !_seg->has_appender()) {
54+
// The latest segment with which this log_appender has called
55+
// initialize() has been rolled and no longer has an segment appender
56+
// (e.g. because segment.ms rolled onto a new segment). There is likely
57+
// already a new segment and segment appender and we should reset to
58+
// use them.
59+
return false;
60+
}
61+
// _log._segs.empty() is a tricky condition. It is here to support
62+
// concurrent truncation (from 0) of an active log segment while we hold the
63+
// lock of a valid segment.
64+
//
65+
// Checking for term is because we support multiple term appends which
66+
// always roll
67+
//
68+
// _bytes_left_in_segment is for initial condition
6469
return _bytes_left_in_segment > 0 && _log.term() == batch_term
6570
&& !_log._segs.empty() /*see above before removing this condition*/;
6671
}
@@ -73,6 +78,18 @@ void disk_log_appender::release_lock() {
7378

7479
ss::future<ss::stop_iteration>
7580
disk_log_appender::operator()(model::record_batch& batch) {
81+
// We use a fast path here since this lock should very rarely be contested.
82+
// An open segment may only have one in-flight append at any given time
83+
// and the only other place this lock is held is when enforcing segment.ms
84+
// (which should rarely happen in high throughput scenarios).
85+
auto segment_roll_lock_holder = _log.try_segment_roll_lock();
86+
if (!segment_roll_lock_holder.has_value()) {
87+
vlog(
88+
stlog.warn,
89+
"Segment roll lock contested for {}",
90+
_log.config().ntp());
91+
segment_roll_lock_holder = co_await _log.segment_roll_lock();
92+
}
7693
batch.header().base_offset = _idx;
7794
batch.header().header_crc = model::internal_header_only_crc(batch.header());
7895
if (_last_term != batch.term()) {
@@ -88,7 +105,8 @@ disk_log_appender::operator()(model::record_batch& batch) {
88105
// we might actually have space in the current log, but the
89106
// terms do not match for the current append, so we must roll
90107
release_lock();
91-
co_await _log.maybe_roll(_last_term, _idx, _config.io_priority);
108+
co_await _log.maybe_roll_unlocked(
109+
_last_term, _idx, _config.io_priority);
92110
co_await initialize();
93111
}
94112
co_return co_await append_batch_to_segment(batch);

src/v/storage/disk_log_impl.cc

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1222,15 +1222,11 @@ ss::future<> disk_log_impl::force_roll(ss::io_priority_class iopc) {
12221222
});
12231223
}
12241224

1225-
ss::future<> disk_log_impl::maybe_roll(
1225+
ss::future<> disk_log_impl::maybe_roll_unlocked(
12261226
model::term_id t, model::offset next_offset, ss::io_priority_class iopc) {
1227-
// This lock will only rarely be contended. If it is held, then
1228-
// we must wait for do_housekeeping to complete before proceeding, because
1229-
// the log might be in a state mid-roll where it has no appender.
1230-
// We need to take this irrespective of whether we're actually rolling
1231-
// or not, in order to ensure that writers wait for a background roll
1232-
// to complete if one is ongoing.
1233-
auto roll_lock_holder = co_await _segments_rolling_lock.get_units();
1227+
vassert(
1228+
!_segments_rolling_lock.ready(),
1229+
"Must have taken _segments_rolling_lock");
12341230

12351231
vassert(t >= term(), "Term:{} must be greater than base:{}", t, term());
12361232
if (_segs.empty()) {
@@ -1253,8 +1249,12 @@ ss::future<> disk_log_impl::maybe_roll(
12531249

12541250
ss::future<> disk_log_impl::apply_segment_ms() {
12551251
auto gate = _compaction_housekeeping_gate.hold();
1256-
// do_housekeeping races with maybe_roll to use new_segment.
1257-
// take a lock to prevent problems
1252+
// Holding the lock blocks writes to the last open segment.
1253+
// This is required in order to avoid the logic in this function
1254+
// racing with an inflight append. Contention on this lock should
1255+
// be very light, since we wouldn't need to enforce segment.ms
1256+
// if this partition was high throughput (segment would have rolled
1257+
// naturally).
12581258
auto lock = co_await _segments_rolling_lock.get_units();
12591259

12601260
if (_segs.empty()) {

src/v/storage/disk_log_impl.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ class disk_log_impl final : public log {
9494
std::optional<model::offset> index_lower_bound(model::offset o) const final;
9595
std::ostream& print(std::ostream&) const final;
9696

97-
ss::future<> maybe_roll(
97+
// Must be called while _segments_rolling_lock is held.
98+
ss::future<> maybe_roll_unlocked(
9899
model::term_id, model::offset next_offset, ss::io_priority_class);
99100

100101
// roll immediately with the current term. users should prefer the
@@ -135,6 +136,14 @@ class disk_log_impl final : public log {
135136

136137
ss::future<reclaimable_offsets> get_reclaimable_offsets(gc_config cfg);
137138

139+
std::optional<ssx::semaphore_units> try_segment_roll_lock() {
140+
return _segments_rolling_lock.try_get_units();
141+
}
142+
143+
ss::future<ssx::semaphore_units> segment_roll_lock() {
144+
return _segments_rolling_lock.get_units();
145+
}
146+
138147
private:
139148
friend class disk_log_appender; // for multi-term appends
140149
friend class disk_log_builder; // for tests
@@ -274,6 +283,13 @@ class disk_log_impl final : public log {
274283

275284
// Mutually exclude operations that will cause segment rolling
276285
// do_housekeeping and maybe_roll
286+
//
287+
// This lock will only rarely be contended. If it is held, then we must
288+
// wait for housekeeping or truncation to complete before proceeding,
289+
// because the log might be in a state mid-roll where it has no appender.
290+
// We need to take this irrespective of whether we're actually rolling or
291+
// not, in order to ensure that writers wait for a background roll to
292+
// complete if one is ongoing.
277293
mutex _segments_rolling_lock;
278294

279295
std::optional<model::offset> _cloud_gc_offset;

0 commit comments

Comments
 (0)