Skip to content

Commit

Permalink
datalake: add compaction_test.py
Browse files Browse the repository at this point in the history
Adds a new `test_compaction` test, which uses the `KgoVerifierSeqConsumer`
to validate a fully compacted log, along with the `datalake_verifier`
service to validate the Iceberg table.

Also moves the contents of `compaction_gaps_test.py` into
`compaction_test.py`.
  • Loading branch information
WillemKauf committed Dec 20, 2024
1 parent 0d110a0 commit 2fe6c55
Showing 1 changed file with 126 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@
from time import time
from rptest.clients.kafka_cat import KafkaCat
from rptest.clients.types import TopicSpec
from rptest.services.kgo_verifier_services import KgoVerifierProducer
from rptest.services.kgo_verifier_services import KgoVerifierProducer, KgoVerifierSeqConsumer
from rptest.tests.datalake.datalake_services import DatalakeServices
from rptest.tests.datalake.query_engine_base import QueryEngineType
from rptest.services.redpanda import MetricsEndpoint
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.redpanda import SISettings
from rptest.tests.datalake.utils import supported_storage_types
from ducktape.mark import matrix
from ducktape.utils.util import wait_until
from rptest.services.cluster import cluster
from rptest.utils.mode_checks import skip_debug_mode
from rptest.tests.datalake.datalake_verifier import DatalakeVerifier


class CompactionGapsTest(RedpandaTest):
Expand Down Expand Up @@ -118,3 +120,126 @@ def test_translation_no_gaps(self, cloud_storage_type):
include_query_engines=[QueryEngineType.TRINO
]) as dl:
self.do_test_no_gaps(dl)


class CompactionTest(RedpandaTest):
def __init__(self, test_ctx, *args, **kwargs):
self.extra_rp_conf = {
"iceberg_enabled": "true",
"iceberg_catalog_commit_interval_ms": 5000,
"datalake_coordinator_snapshot_max_delay_secs": 10,
"log_compaction_interval_ms": 2000,
"log_segment_size": 2 * 1024**2, # 2 MiB
"compacted_log_segment_size": 1024**2 # 1 MiB
}

super(CompactionTest,
self).__init__(test_ctx,
num_brokers=1,
si_settings=SISettings(test_context=test_ctx,
fast_uploads=True),
extra_rp_conf=self.extra_rp_conf,
*args,
**kwargs)
self.test_ctx = test_ctx
self.topic_name = "tapioca"
self.msg_size = 1024 # 1 KiB
self.total_data = 100 * 1024**2 # 100 MiB
self.msg_count = int(self.total_data / self.msg_size)
self.kafka_cat = KafkaCat(self.redpanda)

def setUp(self):
# redpanda will be started by DatalakeServices
pass

def produce(self):
producer = KgoVerifierProducer(self.test_ctx,
self.redpanda,
self.topic_name,
msg_size=self.msg_size,
msg_count=self.msg_count,
key_set_cardinality=100,
validate_latest_values=True)

producer.start(clean=True)
producer.wait()
producer.free()

def wait_for_compaction(self):
# Restart each redpanda broker to force roll segments
self.redpanda.restart_nodes(self.redpanda.nodes)

def get_complete_sliding_window_rounds():
return self.redpanda.metric_sum(
metric_name=
"vectorized_storage_log_complete_sliding_window_rounds_total",
metrics_endpoint=MetricsEndpoint.METRICS,
topic=self.topic_name)

# Sleep until the log has been fully compacted.
self.prev_sliding_window_rounds = -1

def compaction_has_completed():
new_sliding_window_rounds = get_complete_sliding_window_rounds()
res = self.prev_sliding_window_rounds == new_sliding_window_rounds
self.prev_sliding_window_rounds = new_sliding_window_rounds
return res

wait_until(
compaction_has_completed,
timeout_sec=120,
backoff_sec=self.extra_rp_conf['log_compaction_interval_ms'] /
1000 * 10,
err_msg="Compaction did not stabilize.")

def verify_log_and_table(self, dl: DatalakeServices):
# Verify a fully compacted log with a sequential consumer
consumer = KgoVerifierSeqConsumer(
self.test_ctx,
self.redpanda,
self.topic_name,
compacted=True,
validate_latest_values=True,
)

consumer.start(clean=False)
consumer.wait(timeout_sec=120)
consumer.free()

verifier = DatalakeVerifier(self.redpanda,
self.topic_name,
dl.trino(),
compacted=True)

verifier.start()
verifier.wait()

for partition, offset_consumed in verifier._max_consumed_offsets.items(
):
assert consumer.consumer_status.validator.max_offsets_consumed[str(
partition)] == offset_consumed

def do_test_compaction(self, dl: DatalakeServices):
dl.create_iceberg_enabled_topic(
self.topic_name,
iceberg_mode="key_value",
config={"cleanup.policy": TopicSpec.CLEANUP_COMPACT})

for _ in range(5):
self.produce()
# Compact the data. Compaction settling indicates that
# everything will have been translated into the Iceberg table already.
self.wait_for_compaction()
# Assert on read log and on iceberg table.
self.verify_log_and_table(dl)

@cluster(num_nodes=4)
#@skip_debug_mode
@matrix(cloud_storage_type=supported_storage_types())
def test_compaction(self, cloud_storage_type):
with DatalakeServices(self.test_ctx,
redpanda=self.redpanda,
filesystem_catalog_mode=False,
include_query_engines=[QueryEngineType.TRINO
]) as dl:
self.do_test_compaction(dl)

0 comments on commit 2fe6c55

Please sign in to comment.