Skip to content

Commit

Permalink
rptest: make datalake_verifier compaction aware
Browse files Browse the repository at this point in the history
By handling gaps in offsets and recording seen keys, we can validate the
correctness of a compacted log that has been translated (fully) into an
iceberg table.
  • Loading branch information
WillemKauf committed Dec 23, 2024
1 parent 5b49904 commit 16504a8
Showing 1 changed file with 36 additions and 18 deletions.
54 changes: 36 additions & 18 deletions tests/rptest/tests/datalake/datalake_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,24 @@

class DatalakeVerifier():
"""
Verifier that does the verification of the data in the redpanda Iceberg table.
The verifier consumes offsets from specified topic and verifies it the data
Verifier that does the verification of the data in the redpanda Iceberg table.
The verifier consumes offsets from specified topic and verifies it the data
in the iceberg table matches.
The verifier runs two threads:
- one of them consumes messages from the specified topic and buffers them in memory.
The verifier runs two threads:
- one of them consumes messages from the specified topic and buffers them in memory.
The semaphore is used to limit the number of messages buffered in memory.
- second thread executes a per partition query that fetches the messages
- second thread executes a per partition query that fetches the messages
from the iceberg table
"""

#TODO: add an ability to pass lambda to verify the message content
#TODO: add tolerance for compacted topics
def __init__(self, redpanda: RedpandaService, topic: str,
query_engine: QueryEngineBase):
def __init__(self,
redpanda: RedpandaService,
topic: str,
query_engine: QueryEngineBase,
compacted: bool = False):
self.redpanda = redpanda
self.topic = topic
self.logger = redpanda.logger
Expand Down Expand Up @@ -70,6 +72,8 @@ def __init__(self, redpanda: RedpandaService, topic: str,
# map of last queried offset for each partition
self._max_queried_offsets = {}
self._last_checkpoint = {}
self._expected_compacted_keys = set()
self._compacted = compacted

def create_consumer(self):
c = Consumer({
Expand Down Expand Up @@ -139,13 +143,13 @@ def maybe_update_positions():

def _get_query(self, partition, last_queried_offset, max_consumed_offset):
return f"\
SELECT redpanda.offset FROM redpanda.{self._query.escape_identifier(self.topic)} \
SELECT redpanda.offset, redpanda.key FROM redpanda.{self._query.escape_identifier(self.topic)} \
WHERE redpanda.partition={partition} \
AND redpanda.offset>{last_queried_offset} \
AND redpanda.offset<={max_consumed_offset} \
ORDER BY redpanda.offset"

def _verify_next_message(self, partition, iceberg_offset):
def _verify_next_message(self, partition, iceberg_offset, iceberg_key):
if partition not in self._consumed_messages:
self._errors.append(
f"Partition {partition} returned from Iceberg query not found in consumed messages"
Expand All @@ -156,7 +160,8 @@ def _verify_next_message(self, partition, iceberg_offset):
if len(p_messages) == 0:
return

consumer_offset = self._consumed_messages[partition][0].offset()
message = p_messages[0]
consumer_offset = message.offset()
if iceberg_offset > consumer_offset:
self._errors.append(
f"Offset from Iceberg table {iceberg_offset} is greater than next consumed offset {consumer_offset} for partition {partition}, most likely there is a gap in the table"
Expand All @@ -172,10 +177,19 @@ def _verify_next_message(self, partition, iceberg_offset):
self._max_queried_offsets[partition] = iceberg_offset

if consumer_offset != iceberg_offset:
self._errors.append(
f"Offset from iceberg table {iceberg_offset} for {partition} does not match the next consumed offset {consumer_offset}"
)
return
if self._compacted:
self._expected_compacted_keys.add(iceberg_key)
return
else:
self._errors.append(
f"Offset from iceberg table {iceberg_offset} for {partition} does not match the next consumed offset {consumer_offset}"
)
return
else:
if self._compacted:
if iceberg_key in self._expected_compacted_keys:
self._expected_compacted_keys.remove(iceberg_key)

self._consumed_messages[partition].pop(0)
self._msg_semaphore.release()

Expand Down Expand Up @@ -205,7 +219,7 @@ def _query_thread(self):
with self._query.run_query(query) as cursor:
with self._lock:
for row in cursor:
self._verify_next_message(partition, row[0])
self._verify_next_message(partition, *row)
if len(self._errors) > 0:
self.logger.error(
f"violations detected: {self._errors}, stopping verifier"
Expand Down Expand Up @@ -287,3 +301,7 @@ def stop(self):
self.logger.debug(f"queried offsets: {self._max_queried_offsets}")

assert self._max_queried_offsets == self._max_consumed_offsets, "Mismatch between maximum offsets in topic vs iceberg table"

assert len(
self._expected_compacted_keys
) == 0, f"Some keys which were compacted away were not seen later in the consumer's log"

0 comments on commit 16504a8

Please sign in to comment.