Skip to content

Commit

Permalink
rptest: add parameterization and asserts to log_compaction_test.py
Browse files Browse the repository at this point in the history
In order to test the chunked compaction routine, parameterize the existing
compaction test suite with `storage_compaction_key_map_memory_kb`.

By limiting this value, we can force compaction to go down the chunked compaction
path, and verify the log using the existing utilities after compaction settles.

Some added asserts are used to verify chunked compaction is taken or not taken
as a code path, depending on the memory constraints specified.
  • Loading branch information
WillemKauf committed Jan 7, 2025
1 parent ab9bb7b commit f84c307
Showing 1 changed file with 45 additions and 10 deletions.
55 changes: 45 additions & 10 deletions tests/rptest/tests/log_compaction_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,37 @@ class LogCompactionTest(PreallocNodesTest, PartitionMovementMixin):
def __init__(self, test_context):
self.test_context = test_context
# Run with small segments, a low retention value and a very frequent compaction interval.
key_map_memory_kb = self.test_context.injected_args[
'storage_compaction_key_map_memory_kb']
key_set_cardinality = self.test_context.injected_args[
'key_set_cardinality']
self.extra_rp_conf = {
'log_compaction_interval_ms': 4000,
'log_segment_size': 2 * 1024**2, # 2 MiB
'retention_bytes': 25 * 1024**2, # 25 MiB
'compacted_log_segment_size': 1024**2 # 1 MiB
'compacted_log_segment_size': 1024**2, # 1 MiB
'storage_compaction_key_map_memory': key_map_memory_kb * 1024
}

# This environment variable is required to get around the map memory bounds
# of > 16MiB.
environment = {"__REDPANDA_TEST_DISABLE_BOUNDED_PROPERTY_CHECKS": "ON"}

# Assume that all of the key set will comfortably fit in one segment.
# If test parameters are changed, this may have to be re-estimated.
keys_per_segment = key_set_cardinality

# hash_key_offset_map::entry is exactly 40 bytes-
# a 32 byte digest, and an 8 byte offset.
# See key_offset_map.h.
entry_size = 40
indexed_key_estimation = key_map_memory_kb * 1024 // entry_size
self.needs_chunked_compaction = indexed_key_estimation < keys_per_segment
super().__init__(test_context=test_context,
num_brokers=3,
node_prealloc_count=1,
extra_rp_conf=self.extra_rp_conf)
extra_rp_conf=self.extra_rp_conf,
environment=environment)

def topic_setup(self, cleanup_policy, key_set_cardinality):
"""
Expand Down Expand Up @@ -85,6 +106,12 @@ def get_complete_sliding_window_rounds(self):
metrics_endpoint=MetricsEndpoint.METRICS,
topic=self.topic_spec.name)

def get_chunked_compaction_runs(self):
return self.redpanda.metric_sum(
metric_name="vectorized_storage_log_chunked_compaction_runs_total",
metrics_endpoint=MetricsEndpoint.METRICS,
topic=self.topic_spec.name)

def produce_and_consume(self):
"""
Creates producer and consumer. Asserts that tombstones are seen
Expand Down Expand Up @@ -148,6 +175,7 @@ def validate_log(self):
# Sleep until the log has been fully compacted.
self.prev_sliding_window_rounds = -1
self.prev_tombstones_removed = -1
self.prev_chunked_compaction_runs = -1

def compaction_has_completed():
# In order to be confident that compaction has settled,
Expand All @@ -158,9 +186,11 @@ def compaction_has_completed():
new_sliding_window_rounds = self.get_complete_sliding_window_rounds(
)
new_tombstones_removed = self.get_removed_tombstones()
res = self.prev_sliding_window_rounds == new_sliding_window_rounds and self.prev_tombstones_removed == new_tombstones_removed
new_chunked_compaction_runs = self.get_chunked_compaction_runs()
res = self.prev_sliding_window_rounds == new_sliding_window_rounds and self.prev_tombstones_removed == new_tombstones_removed and self.prev_chunked_compaction_runs == new_chunked_compaction_runs
self.prev_sliding_window_rounds = new_sliding_window_rounds
self.prev_tombstones_removed = new_tombstones_removed
self.prev_chunked_compaction_runs = new_chunked_compaction_runs
return res

wait_until(
Expand All @@ -174,6 +204,11 @@ def compaction_has_completed():
assert self.get_cleanly_compacted_segments() > 0
assert self.get_segments_marked_tombstone_free() > 0

if self.needs_chunked_compaction:
assert self.get_chunked_compaction_runs() > 0
else:
assert self.get_chunked_compaction_runs() == 0

consumer = KgoVerifierSeqConsumer(self.test_context,
self.redpanda,
self.topic_spec.name,
Expand All @@ -197,13 +232,13 @@ def compaction_has_completed():

@skip_debug_mode
@cluster(num_nodes=4)
@matrix(
cleanup_policy=[
TopicSpec.CLEANUP_COMPACT, TopicSpec.CLEANUP_COMPACT_DELETE
],
key_set_cardinality=[100, 1000],
)
def compaction_stress_test(self, cleanup_policy, key_set_cardinality):
@matrix(cleanup_policy=[
TopicSpec.CLEANUP_COMPACT, TopicSpec.CLEANUP_COMPACT_DELETE
],
key_set_cardinality=[100, 1000],
storage_compaction_key_map_memory_kb=[3, 10, 128 * 1024])
def compaction_stress_test(self, cleanup_policy, key_set_cardinality,
storage_compaction_key_map_memory_kb):
"""
Uses partition movement and frequent compaction/garbage collecting to
validate tombstone removal and general compaction behavior.
Expand Down

0 comments on commit f84c307

Please sign in to comment.