Skip to content

Commit

Permalink
Merge pull request #24401 from nvartolomei/nv/CORE-8199-await-all-rep…
Browse files Browse the repository at this point in the history
…licas-recover

rptest: await all replicas to recover before validation
  • Loading branch information
nvartolomei authored Dec 3, 2024
2 parents 60ad2f4 + f9477eb commit 1769a76
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 13 deletions.
45 changes: 33 additions & 12 deletions tests/rptest/tests/topic_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,7 @@ def __init__(self, test_context: TestContext, *args, **kwargs):
self._started = True

self.rpk = RpkTool(self.redpanda)
self.admin = Admin(self.redpanda)

def rpk_producer_maker(self,
topic: str,
Expand Down Expand Up @@ -1467,22 +1468,42 @@ def _wait_for_topic(self,
expected_num_leaders = sum(
[t.partition_count for t in recovered_topics])

def all_replicas_in_sync(topic, *, partition, num_replicas):
partition_state = self.admin.get_partition_state(
"kafka", topic, partition)
if len(partition_state["replicas"]) != num_replicas:
return False
hwms = [
replica["high_watermark"]
for replica in partition_state["replicas"]
]
return all([hwm == hwms[0] for hwm in hwms])

def verify():
num_leaders = 0
try:
for topic in recovered_topics:
topic_state = self.rpk.describe_topic(topic.name)
# Describe topics only works after leader election succeded.
# We can use it to wait until the recovery is completed.
for partition in topic_state:
self.logger.info(f"partition: {partition}")
if partition.leader in partition.replicas:
num_leaders += 1
except:
return False
for topic in recovered_topics:
topic_state = self.rpk.describe_topic(topic.name)
# Describe topics only works after leader election succeded.
# We can use it to wait until the recovery is completed.
for partition in topic_state:
self.logger.info(f"partition: {partition}")
if partition.leader in partition.replicas:
num_leaders += 1

# If we have a leader, we can check if all replicas are in sync
if not all_replicas_in_sync(topic.name,
partition=partition.id,
num_replicas=len(
partition.replicas)):
self.logger.debug(
"partition replicas are not in sync yet")
return False
return num_leaders == expected_num_leaders

wait_until(verify, timeout_sec=timeout.total_seconds(), backoff_sec=1)
wait_until(verify,
timeout_sec=timeout.total_seconds(),
backoff_sec=1,
retry_on_exc=True)

def do_run(self, test_case: BaseCase, upload_delay_sec=60):
"""Template method invoked by all tests."""
Expand Down
11 changes: 10 additions & 1 deletion tests/rptest/utils/si_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,10 @@ def get_ntp_sizes(fdata_per_host, hosts_can_vary=True):
that maps host to dict of ntps where each ntp is mapped to the list of
segments. The result is a map from ntp to the partition size on disk.
"""
first_host = None

ntps = defaultdict(int)
for _, fdata in fdata_per_host.items():
for host, fdata in fdata_per_host.items():
ntp_size = defaultdict(int)
for path, entry in fdata.items():
it = _parse_checksum_entry(path, entry, ignore_rev=True)
Expand All @@ -418,6 +420,13 @@ def get_ntp_sizes(fdata_per_host, hosts_can_vary=True):
# which are created after recovery
ntp_size[it.ntp] += it.size

if first_host is None:
first_host = host
else:
assert set(ntps.keys()) == set(
ntp_size.keys()
), f"NTPs on {host} differ from first host {first_host}: {set(ntps.keys())} vs {host}: {set(ntp_size.keys())}"

for ntp, total_size in ntp_size.items():
if ntp in ntps and not hosts_can_vary:
# the size of the partition should be the
Expand Down

0 comments on commit 1769a76

Please sign in to comment.