Skip to content

Commit f5360bb

Browse files
authored
Merge pull request #27440 from oleiman/ts/core-12871/merged-compacted-segments
CORE-12871: Avoid softlocking adjacent segment merger by skipping across compacted segments
2 parents 58e8ddb + d045933 commit f5360bb

File tree

5 files changed

+175
-13
lines changed

5 files changed

+175
-13
lines changed

src/v/cluster/archival/adjacent_segment_merger.cc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ std::optional<adjacent_segment_run> adjacent_segment_merger::scan_manifest(
8989
so = std::max(
9090
manifest.get_start_offset().value_or(local_start_offset),
9191
local_start_offset);
92-
} else {
92+
} else if (!_is_local) {
9393
// Remote lookup, start from start offset in the manifest (or 0)
9494
so = _archiver.manifest().get_start_offset().value_or(model::offset{0});
9595
}
@@ -225,6 +225,14 @@ adjacent_segment_merger::run(run_quota_t quota) {
225225
};
226226
auto find_res = co_await _archiver.find_reupload_candidate(
227227
scanner, _as);
228+
if (find_res.skip_to.has_value()) {
229+
vlog(
230+
_ctxlog.debug,
231+
"Scanned invalid run, skip to {}",
232+
find_res.skip_to);
233+
_last = model::next_offset(find_res.skip_to.value());
234+
co_return result;
235+
}
228236
if (!find_res.upload_stream.has_value()) {
229237
vlog(_ctxlog.debug, "No more upload candidates");
230238
co_return result;

src/v/cluster/archival/ntp_archiver_service.cc

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3485,8 +3485,7 @@ ntp_archiver::find_reupload_candidate(
34853485
segment_collector_stream& collector_stream) mutable
34863486
-> find_reupload_candidate_result {
34873487
if (
3488-
collector_stream.size != run->meta.size_bytes
3489-
|| collector_stream.start_offset != run->meta.base_offset
3488+
collector_stream.start_offset != run->meta.base_offset
34903489
|| collector_stream.end_offset != run->meta.committed_offset) {
34913490
vlog(
34923491
_rtclog.error,
@@ -3496,6 +3495,15 @@ ntp_archiver::find_reupload_candidate(
34963495
run->meta);
34973496
return {};
34983497
}
3498+
if (collector_stream.size != run->meta.size_bytes) {
3499+
vlog(
3500+
_rtclog.debug,
3501+
"Failed to make reupload candidate due to size mismatch, "
3502+
"skip this range: expected size: {}, actual size: {}",
3503+
human::bytes(run->meta.size_bytes),
3504+
human::bytes(collector_stream.size));
3505+
return {.skip_to = collector_stream.end_offset};
3506+
}
34993507
return {
35003508
.units = std::move(units),
35013509
.upload_stream = std::move(collector_stream),
@@ -3577,7 +3585,7 @@ ss::future<bool> ntp_archiver::do_upload_local(
35773585
if (strm.is_compacted) {
35783586
vlog(
35793587
_rtclog.warn,
3580-
"Upload of the {} requested but sources are empty",
3588+
"Upload of {} requested but sources are compacted",
35813589
sname);
35823590
co_return false;
35833591
}

src/v/cluster/archival/ntp_archiver_service.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,20 @@ class ntp_archiver {
342342
std::optional<ssx::checkpoint_mutex_units> units;
343343
std::optional<segment_collector_stream> upload_stream{};
344344
archival_stm_fence read_write_fence{};
345+
/// Set when find_reupload_candidate (non-compacted reupload) finds a
346+
/// candidate that matches the offset bounds of some
347+
/// adjacent_segment_run but does NOT match the expected size. If set,
348+
/// adjacent_segment_merger moves its internal state to the end of the
349+
/// run and begins its next scan from there.
350+
///
351+
/// This can occur in situations where compaction is disabled before
352+
/// some segment(s) in the manifest have been reuploaded. As a result,
353+
/// the remote_segment sizes in the manifest won't match the size of the
354+
/// (compacted) segments on disk. This situation is not recoverable from
355+
/// the perspective of the housekeeping job, so we skip these offsets,
356+
/// allowing adjacent segment merging to make forward progress on the
357+
/// (presumably uncompacted) remainder of the log.
358+
std::optional<model::offset> skip_to{};
345359
};
346360

347361
/// Find upload candidate
@@ -352,7 +366,9 @@ class ntp_archiver {
352366
/// candidate.remote_segments).
353367
///
354368
/// \param scanner is a user provided function used to find upload candidate
355-
/// \return {nullopt, nullopt} or the archiver lock and upload candidate
369+
/// \return {nullopt, nullopt} OR the archiver lock and upload candidate OR
370+
/// {.skip_to = <offset>} if the candidate contained compacted segments (see
371+
/// find_reupload_candidate_result, above).
356372
ss::future<find_reupload_candidate_result>
357373
find_reupload_candidate(manifest_scanner_t scanner, ss::abort_source& as);
358374

tests/rptest/tests/adjacent_segment_merging_test.py

Lines changed: 130 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@
1717
from rptest.clients.types import TopicSpec
1818
from rptest.clients.rpk import RpkTool
1919
from rptest.clients.kafka_cli_tools import KafkaCliTools
20-
from rptest.util import (
21-
wait_until,
22-
)
20+
from rptest.services.kgo_verifier_services import KgoVerifierProducer
21+
from rptest.util import wait_until, expect_timeout
2322
from rptest.utils.si_utils import BucketView
2423

2524
from ducktape.mark import matrix
@@ -34,11 +33,10 @@
3433
]
3534

3635

37-
class AdjacentSegmentMergingTest(RedpandaTest):
36+
class AdjacentSegmentMergingTestBase(RedpandaTest):
3837
s3_topic_name = "panda-topic"
39-
topics = (TopicSpec(name=s3_topic_name, partition_count=1, replication_factor=3),)
4038

41-
def __init__(self, test_context):
39+
def __init__(self, test_context, extra_rp_conf: dict[str, str] = {}, **kwargs):
4240
si_settings = SISettings(
4341
test_context,
4442
cloud_storage_max_connections=10,
@@ -56,8 +54,11 @@ def __init__(self, test_context):
5654

5755
self.bucket_name = si_settings.cloud_storage_bucket
5856

59-
super(AdjacentSegmentMergingTest, self).__init__(
60-
test_context=test_context, extra_rp_conf=xtra_conf, si_settings=si_settings
57+
super().__init__(
58+
test_context=test_context,
59+
extra_rp_conf={**xtra_conf, **extra_rp_conf},
60+
si_settings=si_settings,
61+
**kwargs,
6162
)
6263

6364
self.kafka_tools = KafkaCliTools(self.redpanda)
@@ -66,6 +67,19 @@ def __init__(self, test_context):
6667
def setUp(self):
6768
super().setUp() # topic is created here
6869

70+
71+
class AdjacentSegmentMergingTest(AdjacentSegmentMergingTestBase):
72+
topics = (
73+
TopicSpec(
74+
name=AdjacentSegmentMergingTestBase.s3_topic_name,
75+
partition_count=1,
76+
replication_factor=3,
77+
),
78+
)
79+
80+
def __init__(self, *args, **kwargs):
81+
super().__init__(*args, **kwargs)
82+
6983
@cluster(num_nodes=3)
7084
@matrix(acks=[-1, 1], cloud_storage_type=get_cloud_storage_type())
7185
def test_reupload_of_local_segments(self, acks, cloud_storage_type):
@@ -110,3 +124,111 @@ def manifest_has_one_segment():
110124
return False
111125

112126
wait_until(manifest_has_one_segment, 60)
127+
128+
129+
class AdjacentSegmentMergingToggleCompactionTest(AdjacentSegmentMergingTestBase):
130+
topics = (
131+
TopicSpec(
132+
name=AdjacentSegmentMergingTestBase.s3_topic_name,
133+
partition_count=1,
134+
replication_factor=1,
135+
cleanup_policy=TopicSpec.CLEANUP_COMPACT,
136+
min_cleanable_dirty_ratio=0.0,
137+
max_compaction_lag_ms=3000,
138+
),
139+
)
140+
141+
def __init__(self, test_context, *args, **kwargs):
142+
xtra_conf = dict(
143+
cloud_storage_enable_compacted_topic_reupload=False,
144+
cloud_storage_enable_segment_merging=True,
145+
log_compaction_interval_ms=50,
146+
log_compaction_use_sliding_window=False,
147+
compacted_log_segment_size=1024 * 512,
148+
max_compaction_lag_ms=3000,
149+
)
150+
self.test_context = test_context
151+
super().__init__(
152+
test_context, extra_rp_conf=xtra_conf, num_brokers=1, *args, **kwargs
153+
)
154+
155+
@cluster(num_nodes=2)
156+
@matrix(
157+
acks=[
158+
-1,
159+
1,
160+
],
161+
cloud_storage_type=get_cloud_storage_type(),
162+
)
163+
def test_reupload_of_local_segments(self, acks, cloud_storage_type):
164+
"""Test adjacent segment merging using using local data.
165+
The test starts by uploading large number of very small segments.
166+
The total amount of data produced is smaller than the target segment
167+
size. Because of that, after the housekeeping we should end up with
168+
only one segment in the cloud.
169+
The retention is not enable so the reupload process can use data
170+
available locally.
171+
"""
172+
173+
def produce_some():
174+
for _ in range(10):
175+
KgoVerifierProducer.oneshot(
176+
context=self.test_context,
177+
redpanda=self.redpanda,
178+
topic=self.topic,
179+
msg_size=1024,
180+
msg_count=1024,
181+
key_set_cardinality=1,
182+
)
183+
# # Every 'produce' call should create at least one segment
184+
# # in the cloud which is 1MiB
185+
# self.kafka_tools.produce(self.topic, 1024, 1024, acks)
186+
time.sleep(1)
187+
time.sleep(5)
188+
189+
produce_some()
190+
191+
self.rpk.alter_topic_config(
192+
self.topic, TopicSpec.PROPERTY_CLEANUP_POLICY, TopicSpec.CLEANUP_DELETE
193+
)
194+
195+
self.redpanda.set_cluster_config(
196+
{"log_compaction_use_sliding_window": True}, expect_restart=True
197+
)
198+
199+
def manifest_has_large_segment():
200+
try:
201+
num_good = 0
202+
for ntp, manifest in BucketView(
203+
self.redpanda
204+
).partition_manifests.items():
205+
target_lower_bound = 1024 * 1024 * 8
206+
for name, meta in manifest["segments"].items():
207+
self.logger.info(f"segment {name}, segment_meta: {meta}")
208+
if meta["size_bytes"] >= target_lower_bound:
209+
# we will only see large segments with size
210+
# greater than lower bound if housekeeping
211+
# is working
212+
num_good += 1
213+
return num_good > 0
214+
except Exception as err:
215+
import traceback
216+
217+
self.logger.info(
218+
"".join(
219+
traceback.format_exception(type(err), err, err.__traceback__)
220+
)
221+
)
222+
return False
223+
224+
self.logger.debug(
225+
"The log is full of small compacted segments, so housekeeping shouldn't have any effect"
226+
)
227+
with expect_timeout():
228+
wait_until(manifest_has_large_segment, 30)
229+
230+
self.logger.debug(
231+
"Produce some more small segments with compaction off. Housekeeping should make progress now"
232+
)
233+
produce_some()
234+
wait_until(manifest_has_large_segment, 60)

tests/rptest/util.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,14 @@ def expect_exception(exception_klass, validator):
392392
raise RuntimeError("Expected an exception!")
393393

394394

395+
def expect_timeout():
396+
"""
397+
expect_exception wrapper for the not uncommon case where the expected exception is
398+
a ducktape.errors.TimeoutError and its contents are of no interest.
399+
"""
400+
return expect_exception(TimeoutError, lambda _: True)
401+
402+
395403
def expect_http_error(status_code: int):
396404
"""
397405
Context manager for HTTP calls expected to result in an HTTP exception

0 commit comments

Comments
 (0)