From 0e1a24ccccfa48120fa291245109c1b522013083 Mon Sep 17 00:00:00 2001 From: Willem Kaufmann Date: Wed, 18 Dec 2024 16:37:48 -0500 Subject: [PATCH] `datalake`: add `compaction_test.py` Adds a new `test_compaction` test, which uses the `KgoVerifierSeqConsumer` to validate a fully compacted log, along with the `datalake_verifier` service to validate the Iceberg table. Also moves the contents of `compaction_gaps_test.py` into `compaction_test.py`. --- ...action_gaps_test.py => compaction_test.py} | 127 +++++++++++++++++- 1 file changed, 126 insertions(+), 1 deletion(-) rename tests/rptest/tests/datalake/{compaction_gaps_test.py => compaction_test.py} (51%) diff --git a/tests/rptest/tests/datalake/compaction_gaps_test.py b/tests/rptest/tests/datalake/compaction_test.py similarity index 51% rename from tests/rptest/tests/datalake/compaction_gaps_test.py rename to tests/rptest/tests/datalake/compaction_test.py index e7ebeeee60edb..0f201260ff64e 100644 --- a/tests/rptest/tests/datalake/compaction_gaps_test.py +++ b/tests/rptest/tests/datalake/compaction_test.py @@ -10,9 +10,10 @@ from time import time from rptest.clients.kafka_cat import KafkaCat from rptest.clients.types import TopicSpec -from rptest.services.kgo_verifier_services import KgoVerifierProducer +from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierSeqConsumer from rptest.tests.datalake.datalake_services import DatalakeServices from rptest.tests.datalake.query_engine_base import QueryEngineType +from rptest.services.redpanda import MetricsEndpoint from rptest.tests.redpanda_test import RedpandaTest from rptest.services.redpanda import SISettings from rptest.tests.datalake.utils import supported_storage_types @@ -20,6 +21,7 @@ from ducktape.utils.util import wait_until from rptest.services.cluster import cluster from rptest.utils.mode_checks import skip_debug_mode +from rptest.tests.datalake.datalake_verifier import DatalakeVerifier class CompactionGapsTest(RedpandaTest): @@ -118,3 +120,126 @@ def test_translation_no_gaps(self, cloud_storage_type): include_query_engines=[QueryEngineType.TRINO ]) as dl: self.do_test_no_gaps(dl) + + +class CompactionTest(RedpandaTest): + def __init__(self, test_ctx, *args, **kwargs): + self.extra_rp_conf = { + "iceberg_enabled": "true", + "iceberg_catalog_commit_interval_ms": 5000, + "datalake_coordinator_snapshot_max_delay_secs": 10, + "log_compaction_interval_ms": 2000, + "log_segment_size": 2 * 1024**2, # 2 MiB + "compacted_log_segment_size": 1024**2 # 1 MiB + } + + super(CompactionTest, + self).__init__(test_ctx, + num_brokers=1, + si_settings=SISettings(test_context=test_ctx, + fast_uploads=True), + extra_rp_conf=self.extra_rp_conf, + *args, + **kwargs) + self.test_ctx = test_ctx + self.topic_name = "tapioca" + self.msg_size = 1024 # 1 KiB + self.total_data = 100 * 1024**2 # 100 MiB + self.msg_count = int(self.total_data / self.msg_size) + self.kafka_cat = KafkaCat(self.redpanda) + + def setUp(self): + # redpanda will be started by DatalakeServices + pass + + def produce(self): + producer = KgoVerifierProducer(self.test_ctx, + self.redpanda, + self.topic_name, + msg_size=self.msg_size, + msg_count=self.msg_count, + key_set_cardinality=100, + validate_latest_values=True) + + producer.start(clean=True) + producer.wait() + producer.free() + + def wait_for_compaction(self): + # Restart each redpanda broker to force roll segments + self.redpanda.restart_nodes(self.redpanda.nodes) + + def get_complete_sliding_window_rounds(): + return self.redpanda.metric_sum( + metric_name= + "vectorized_storage_log_complete_sliding_window_rounds_total", + metrics_endpoint=MetricsEndpoint.METRICS, + topic=self.topic_name) + + # Sleep until the log has been fully compacted. + self.prev_sliding_window_rounds = -1 + + def compaction_has_completed(): + new_sliding_window_rounds = get_complete_sliding_window_rounds() + res = self.prev_sliding_window_rounds == new_sliding_window_rounds + self.prev_sliding_window_rounds = new_sliding_window_rounds + return res + + wait_until( + compaction_has_completed, + timeout_sec=120, + backoff_sec=self.extra_rp_conf['log_compaction_interval_ms'] / + 1000 * 10, + err_msg="Compaction did not stabilize.") + + def verify_log_and_table(self, dl: DatalakeServices): + # Verify a fully compacted log with a sequential consumer + consumer = KgoVerifierSeqConsumer( + self.test_ctx, + self.redpanda, + self.topic_name, + compacted=True, + validate_latest_values=True, + ) + + consumer.start(clean=False) + consumer.wait(timeout_sec=120) + consumer.free() + + verifier = DatalakeVerifier(self.redpanda, + self.topic_name, + dl.trino(), + compacted=True) + + verifier.start() + verifier.wait() + + for partition, offset_consumed in verifier._max_consumed_offsets.items( + ): + assert consumer.consumer_status.validator.max_offsets_consumed[str( + partition)] == offset_consumed + + def do_test_compaction(self, dl: DatalakeServices): + dl.create_iceberg_enabled_topic( + self.topic_name, + iceberg_mode="key_value", + config={"cleanup.policy": TopicSpec.CLEANUP_COMPACT}) + + for _ in range(5): + self.produce() + # Compact the data. Compaction settling indicates that + # everything will have been translated into the Iceberg table already. + self.wait_for_compaction() + # Assert on read log and on iceberg table. + self.verify_log_and_table(dl) + + @cluster(num_nodes=4) + #@skip_debug_mode + @matrix(cloud_storage_type=supported_storage_types()) + def test_compaction(self, cloud_storage_type): + with DatalakeServices(self.test_ctx, + redpanda=self.redpanda, + filesystem_catalog_mode=False, + include_query_engines=[QueryEngineType.TRINO + ]) as dl: + self.do_test_compaction(dl)