Skip to content

Commit

Permalink
tests: added scaling up test with recovered topic
Browse files Browse the repository at this point in the history
Added tests triggering partition replica set reconfiguration with
topics which are recovered.

Signed-off-by: Michał Maślanka <[email protected]>
(cherry picked from commit fff2b72)
  • Loading branch information
mmaslankaprv authored and vbotbuildovich committed Jan 3, 2025
1 parent 73d378e commit 0233c0e
Showing 1 changed file with 62 additions and 1 deletion.
63 changes: 62 additions & 1 deletion tests/rptest/tests/scaling_up_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from collections import defaultdict
import random, math, time
from rptest.clients.rpk import RpkTool
from rptest.services.admin import Admin
from rptest.services.cluster import cluster
from ducktape.utils.util import wait_until
Expand All @@ -18,9 +19,10 @@
from rptest.clients.default import DefaultClient
from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierSeqConsumer, KgoVerifierProducer
from rptest.services.redpanda import SISettings
import concurrent
from rptest.utils.node_operations import verify_offset_translator_state_consistent

from rptest.tests.prealloc_nodes import PreallocNodesTest
from rptest.util import KafkaCliTools
from rptest.utils.mode_checks import skip_debug_mode


Expand Down Expand Up @@ -627,3 +629,62 @@ def disk_usage_correct(nodes, node_id):
assert self.consumer.consumer_status.validator.invalid_reads == 0, \
f"Invalid reads in topic: {topic.name}, invalid reads count: "
"{self.consumer.consumer_status.validator.invalid_reads}"

@skip_debug_mode
@cluster(num_nodes=6)
def test_scaling_up_with_recovered_topic(self):
log_segment_size = 2 * 1024 * 1024
segments_per_partition = 40
msg_size = 256 * 1024
partition_count = 10
total_records = int(
(segments_per_partition * partition_count * log_segment_size) /
msg_size)

si_settings = SISettings(test_context=self.test_context,
log_segment_size=log_segment_size,
retention_local_strict=True,
fast_uploads=True)

self.redpanda.set_si_settings(si_settings)
#start 3 node cluster
self.redpanda.start(nodes=self.redpanda.nodes[0:3])
# create test topic
cli = KafkaCliTools(self.redpanda)
topic = TopicSpec(name="recovery-topic",
replication_factor=3,
partition_count=partition_count)
rpk = RpkTool(self.redpanda)
rpk.create_topic(topic.name,
partition_count,
3,
config={
'redpanda.remote.write': 'true',
'redpanda.remote.read': 'true',
'redpanda.remote.delete': 'false'
})
# produce some data
cli.produce(topic.name, total_records, msg_size)
self.redpanda.wait_for_manifest_uploads()

total_replicas = 3 * partition_count
rpk.delete_topic(topic.name)

rpk.create_topic(topic.name,
partition_count,
3,
config={
'redpanda.remote.recovery': 'true',
'redpanda.remote.write': 'true',
'redpanda.remote.read': 'true',
'redpanda.remote.delete': 'true'
})

cli.produce(topic.name, total_records, msg_size)

self.redpanda.start_node(self.redpanda.nodes[3])
self.redpanda.start_node(self.redpanda.nodes[4])
self.wait_for_partitions_rebalanced(total_replicas=total_replicas,
timeout_sec=self.rebalance_timeout)
self.redpanda.wait_for_manifest_uploads()
verify_offset_translator_state_consistent(self.redpanda)

0 comments on commit 0233c0e

Please sign in to comment.