From f35fda78da814e0bb2b3d1110f82044468bdf49d Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Mon, 2 Dec 2024 19:47:46 +0000 Subject: [PATCH 1/3] rptest: await all replicas to recover before validation --- tests/rptest/tests/topic_recovery_test.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/rptest/tests/topic_recovery_test.py b/tests/rptest/tests/topic_recovery_test.py index ccf4351e5d50e..966907e06a34e 100644 --- a/tests/rptest/tests/topic_recovery_test.py +++ b/tests/rptest/tests/topic_recovery_test.py @@ -1211,6 +1211,7 @@ def __init__(self, test_context: TestContext, *args, **kwargs): self._started = True self.rpk = RpkTool(self.redpanda) + self.admin = Admin(self.redpanda) def rpk_producer_maker(self, topic: str, @@ -1467,6 +1468,17 @@ def _wait_for_topic(self, expected_num_leaders = sum( [t.partition_count for t in recovered_topics]) + def all_replicas_in_sync(topic, *, partition, num_replicas): + partition_state = self.admin.get_partition_state( + "kafka", topic, partition) + if len(partition_state["replicas"]) != num_replicas: + return False + hwms = [ + replica["high_watermark"] + for replica in partition_state["replicas"] + ] + return all([hwm == hwms[0] for hwm in hwms]) + def verify(): num_leaders = 0 try: @@ -1478,6 +1490,15 @@ def verify(): self.logger.info(f"partition: {partition}") if partition.leader in partition.replicas: num_leaders += 1 + + # If we have a leader, we can check if all replicas are in sync + if not all_replicas_in_sync( + topic.name, + partition=partition.id, + num_replicas=len(partition.replicas)): + self.logger.debug( + "partition replicas are not in sync yet") + return False except: return False return num_leaders == expected_num_leaders From 2d6baaac02a17cc14f79ecad6137a7376c7027a8 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Mon, 2 Dec 2024 20:39:15 +0000 Subject: [PATCH 2/3] rptest: let wait_until handle exception retries Much more helpful error messages as the exceptions are stored and re thrown at the end --- tests/rptest/tests/topic_recovery_test.py | 42 +++++++++++------------ 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/tests/rptest/tests/topic_recovery_test.py b/tests/rptest/tests/topic_recovery_test.py index 966907e06a34e..b41ea25249a59 100644 --- a/tests/rptest/tests/topic_recovery_test.py +++ b/tests/rptest/tests/topic_recovery_test.py @@ -1481,29 +1481,29 @@ def all_replicas_in_sync(topic, *, partition, num_replicas): def verify(): num_leaders = 0 - try: - for topic in recovered_topics: - topic_state = self.rpk.describe_topic(topic.name) - # Describe topics only works after leader election succeded. - # We can use it to wait until the recovery is completed. - for partition in topic_state: - self.logger.info(f"partition: {partition}") - if partition.leader in partition.replicas: - num_leaders += 1 - - # If we have a leader, we can check if all replicas are in sync - if not all_replicas_in_sync( - topic.name, - partition=partition.id, - num_replicas=len(partition.replicas)): - self.logger.debug( - "partition replicas are not in sync yet") - return False - except: - return False + for topic in recovered_topics: + topic_state = self.rpk.describe_topic(topic.name) + # Describe topics only works after leader election succeded. + # We can use it to wait until the recovery is completed. + for partition in topic_state: + self.logger.info(f"partition: {partition}") + if partition.leader in partition.replicas: + num_leaders += 1 + + # If we have a leader, we can check if all replicas are in sync + if not all_replicas_in_sync(topic.name, + partition=partition.id, + num_replicas=len( + partition.replicas)): + self.logger.debug( + "partition replicas are not in sync yet") + return False return num_leaders == expected_num_leaders - wait_until(verify, timeout_sec=timeout.total_seconds(), backoff_sec=1) + wait_until(verify, + timeout_sec=timeout.total_seconds(), + backoff_sec=1, + retry_on_exc=True) def do_run(self, test_case: BaseCase, upload_delay_sec=60): """Template method invoked by all tests.""" From f9477eb612e6a13edbcdb79a8b93c6b02f92eaf5 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Mon, 2 Dec 2024 21:15:45 +0000 Subject: [PATCH 3/3] rptest: stricter verify_file_layout Catches the case of a replica missing all segments. --- tests/rptest/utils/si_utils.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/rptest/utils/si_utils.py b/tests/rptest/utils/si_utils.py index 1c2b2889dee4e..aed9f50a4af94 100644 --- a/tests/rptest/utils/si_utils.py +++ b/tests/rptest/utils/si_utils.py @@ -407,8 +407,10 @@ def get_ntp_sizes(fdata_per_host, hosts_can_vary=True): that maps host to dict of ntps where each ntp is mapped to the list of segments. The result is a map from ntp to the partition size on disk. """ + first_host = None + ntps = defaultdict(int) - for _, fdata in fdata_per_host.items(): + for host, fdata in fdata_per_host.items(): ntp_size = defaultdict(int) for path, entry in fdata.items(): it = _parse_checksum_entry(path, entry, ignore_rev=True) @@ -418,6 +420,13 @@ def get_ntp_sizes(fdata_per_host, hosts_can_vary=True): # which are created after recovery ntp_size[it.ntp] += it.size + if first_host is None: + first_host = host + else: + assert set(ntps.keys()) == set( + ntp_size.keys() + ), f"NTPs on {host} differ from first host {first_host}: {set(ntps.keys())} vs {host}: {set(ntp_size.keys())}" + for ntp, total_size in ntp_size.items(): if ntp in ntps and not hosts_can_vary: # the size of the partition should be the