diff --git a/tests/rptest/tests/datalake/datalake_verifier.py b/tests/rptest/tests/datalake/datalake_verifier.py index 5ae1676394fc..1392fe6c0ce7 100644 --- a/tests/rptest/tests/datalake/datalake_verifier.py +++ b/tests/rptest/tests/datalake/datalake_verifier.py @@ -24,22 +24,24 @@ class DatalakeVerifier(): """ - Verifier that does the verification of the data in the redpanda Iceberg table. - The verifier consumes offsets from specified topic and verifies it the data + Verifier that does the verification of the data in the redpanda Iceberg table. + The verifier consumes offsets from specified topic and verifies it the data in the iceberg table matches. - - The verifier runs two threads: - - one of them consumes messages from the specified topic and buffers them in memory. + + The verifier runs two threads: + - one of them consumes messages from the specified topic and buffers them in memory. The semaphore is used to limit the number of messages buffered in memory. - - - second thread executes a per partition query that fetches the messages + + - second thread executes a per partition query that fetches the messages from the iceberg table """ #TODO: add an ability to pass lambda to verify the message content - #TODO: add tolerance for compacted topics - def __init__(self, redpanda: RedpandaService, topic: str, - query_engine: QueryEngineBase): + def __init__(self, + redpanda: RedpandaService, + topic: str, + query_engine: QueryEngineBase, + compacted: bool = False): self.redpanda = redpanda self.topic = topic self.logger = redpanda.logger @@ -70,6 +72,8 @@ def __init__(self, redpanda: RedpandaService, topic: str, # map of last queried offset for each partition self._max_queried_offsets = {} self._last_checkpoint = {} + self._expected_compacted_keys = set() + self._compacted = compacted def create_consumer(self): c = Consumer({ @@ -139,13 +143,13 @@ def maybe_update_positions(): def _get_query(self, partition, last_queried_offset, max_consumed_offset): return f"\ - SELECT redpanda.offset FROM redpanda.{self._query.escape_identifier(self.topic)} \ + SELECT redpanda.offset, redpanda.key FROM redpanda.{self._query.escape_identifier(self.topic)} \ WHERE redpanda.partition={partition} \ AND redpanda.offset>{last_queried_offset} \ AND redpanda.offset<={max_consumed_offset} \ ORDER BY redpanda.offset" - def _verify_next_message(self, partition, iceberg_offset): + def _verify_next_message(self, partition, iceberg_offset, iceberg_key): if partition not in self._consumed_messages: self._errors.append( f"Partition {partition} returned from Iceberg query not found in consumed messages" @@ -156,7 +160,8 @@ def _verify_next_message(self, partition, iceberg_offset): if len(p_messages) == 0: return - consumer_offset = self._consumed_messages[partition][0].offset() + message = p_messages[0] + consumer_offset = message.offset() if iceberg_offset > consumer_offset: self._errors.append( f"Offset from Iceberg table {iceberg_offset} is greater than next consumed offset {consumer_offset} for partition {partition}, most likely there is a gap in the table" @@ -172,10 +177,19 @@ def _verify_next_message(self, partition, iceberg_offset): self._max_queried_offsets[partition] = iceberg_offset if consumer_offset != iceberg_offset: - self._errors.append( - f"Offset from iceberg table {iceberg_offset} for {partition} does not match the next consumed offset {consumer_offset}" - ) - return + if self._compacted: + self._expected_compacted_keys.add(iceberg_key) + return + else: + self._errors.append( + f"Offset from iceberg table {iceberg_offset} for {partition} does not match the next consumed offset {consumer_offset}" + ) + return + else: + if self._compacted: + if iceberg_key in self._expected_compacted_keys: + self._expected_compacted_keys.remove(iceberg_key) + self._consumed_messages[partition].pop(0) self._msg_semaphore.release() @@ -205,7 +219,7 @@ def _query_thread(self): with self._query.run_query(query) as cursor: with self._lock: for row in cursor: - self._verify_next_message(partition, row[0]) + self._verify_next_message(partition, *row) if len(self._errors) > 0: self.logger.error( f"violations detected: {self._errors}, stopping verifier" @@ -287,3 +301,7 @@ def stop(self): self.logger.debug(f"queried offsets: {self._max_queried_offsets}") assert self._max_queried_offsets == self._max_consumed_offsets, "Mismatch between maximum offsets in topic vs iceberg table" + + assert len( + self._expected_compacted_keys + ) == 0, f"Some keys which were compacted away were not seen later in the consumer's log"