Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.3.x] Do not download the partition manifest when creating learner #24678

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/v/cluster/controller_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1435,6 +1435,20 @@ ss::future<std::error_code> controller_backend::create_partition(
= storage::topic_recovery_enabled::yes;
rtp.emplace(remote_rev, cfg.partition_count);
}
/**
* Reset remote topic properties if a topic is recovered from tiered
* storage and current node is joining replica set. A node is joining
* replica set if its initial nodes set is empty.
*/
if (initial_brokers.empty() && rtp.has_value()) {
// reset remote topic properties
vlog(
clusterlog.info,
"[{}] Disabling remote recovery while creating partition "
"replica. Current node is added to the replica set as learner.",
ntp);
rtp.reset();
}
// we use offset as an rev as it is always increasing and it
// increases while ntp is being created again
try {
Expand Down
63 changes: 62 additions & 1 deletion tests/rptest/tests/scaling_up_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from collections import defaultdict
import random, math, time
from rptest.clients.rpk import RpkTool
from rptest.services.admin import Admin
from rptest.services.cluster import cluster
from ducktape.utils.util import wait_until
Expand All @@ -18,9 +19,10 @@
from rptest.clients.default import DefaultClient
from rptest.services.kgo_verifier_services import KgoVerifierConsumerGroupConsumer, KgoVerifierSeqConsumer, KgoVerifierProducer
from rptest.services.redpanda import SISettings
import concurrent
from rptest.utils.node_operations import verify_offset_translator_state_consistent

from rptest.tests.prealloc_nodes import PreallocNodesTest
from rptest.util import KafkaCliTools
from rptest.utils.mode_checks import skip_debug_mode


Expand Down Expand Up @@ -627,3 +629,62 @@ def disk_usage_correct(nodes, node_id):
assert self.consumer.consumer_status.validator.invalid_reads == 0, \
f"Invalid reads in topic: {topic.name}, invalid reads count: "
"{self.consumer.consumer_status.validator.invalid_reads}"

@skip_debug_mode
@cluster(num_nodes=6)
def test_scaling_up_with_recovered_topic(self):
log_segment_size = 2 * 1024 * 1024
segments_per_partition = 40
msg_size = 256 * 1024
partition_count = 10
total_records = int(
(segments_per_partition * partition_count * log_segment_size) /
msg_size)

si_settings = SISettings(test_context=self.test_context,
log_segment_size=log_segment_size,
retention_local_strict=True,
fast_uploads=True)

self.redpanda.set_si_settings(si_settings)
#start 3 node cluster
self.redpanda.start(nodes=self.redpanda.nodes[0:3])
# create test topic
cli = KafkaCliTools(self.redpanda)
topic = TopicSpec(name="recovery-topic",
replication_factor=3,
partition_count=partition_count)
rpk = RpkTool(self.redpanda)
rpk.create_topic(topic.name,
partition_count,
3,
config={
'redpanda.remote.write': 'true',
'redpanda.remote.read': 'true',
'redpanda.remote.delete': 'false'
})
# produce some data
cli.produce(topic.name, total_records, msg_size)
self.redpanda.wait_for_manifest_uploads()

total_replicas = 3 * partition_count
rpk.delete_topic(topic.name)

rpk.create_topic(topic.name,
partition_count,
3,
config={
'redpanda.remote.recovery': 'true',
'redpanda.remote.write': 'true',
'redpanda.remote.read': 'true',
'redpanda.remote.delete': 'true'
})

cli.produce(topic.name, total_records, msg_size)

self.redpanda.start_node(self.redpanda.nodes[3])
self.redpanda.start_node(self.redpanda.nodes[4])
self.wait_for_partitions_rebalanced(total_replicas=total_replicas,
timeout_sec=self.rebalance_timeout)
self.redpanda.wait_for_manifest_uploads()
verify_offset_translator_state_consistent(self.redpanda)
Loading