Skip to content

Commit

Permalink
tests: add ducktape test for leaders preference
Browse files Browse the repository at this point in the history
  • Loading branch information
ztlpn committed Oct 9, 2024
1 parent a7a3628 commit a767db9
Showing 1 changed file with 178 additions and 0 deletions.
178 changes: 178 additions & 0 deletions tests/rptest/tests/leadership_transfer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from rptest.services.redpanda import RESTART_LOG_ALLOW_LIST
from ducktape.utils.util import wait_until
from rptest.clients.kafka_cat import KafkaCat
from rptest.clients.rpk import RpkTool
from rptest.util import wait_until_result
from rptest.clients.types import TopicSpec
from rptest.services.admin import Admin
Expand Down Expand Up @@ -339,3 +340,180 @@ def all_partitions_present(num_nodes, per_node=None):
expected_min = math.floor(expected_on_shard * 0.8)
assert count >= expected_min, \
f"leader count on shard {s} ({count}) is < {expected_min}"


class LeadershipPinningTest(RedpandaTest):
def __init__(self, test_context):
super(LeadershipPinningTest, self).__init__(
test_context=test_context,
num_brokers=6,
extra_rp_conf={
'enable_rack_awareness': True,
},
)

def setUp(self):
pass

RACK_LAYOUT = ['A', 'A', 'B', 'B', 'C', 'C']

def _get_topic2node2leaders(self):
kc = KafkaCat(self.redpanda)
md = kc.metadata()
ret = dict()
for topic in md["topics"]:
name = topic["topic"]
node2leaders = dict(
collections.Counter(p["leader"] for p in topic["partitions"]))
self.logger.debug(
f"topic {name} leaders: {sorted(node2leaders.items())}")

ret[name] = node2leaders
return ret

def _rack_counts(self, node_counts):
rack2count = dict()
for ix, node in enumerate(self.redpanda.nodes):
node_id = self.redpanda.node_id(node)
leaders = node_counts.get(node_id, 0)
if leaders > 0:
rack = self.RACK_LAYOUT[ix]
rack2count[rack] = rack2count.setdefault(rack, 0) + leaders
return rack2count

def wait_for_racks(self,
partition_counts,
topic2expected_racks,
check_balance=True,
timeout_sec=60):
def predicate():
t2n2l = self._get_topic2node2leaders()

for topic, expected_count in partition_counts.items():
node2leaders = t2n2l.get(topic, dict())

count = sum(node2leaders.values())
if count != expected_count:
self.logger.debug(
f"not all leaders for topic {topic} present, "
f"expected {expected_count}, got {count}")
return False

expected_racks = topic2expected_racks.get(topic, {})
rack2leaders = self._rack_counts(node2leaders)

if expected_racks != rack2leaders.keys():
self.logger.debug(
f"leader rack sets for topic {topic} differ"
f"expected: {expected_racks}, actual counts: {rack2leaders}"
)
return False

if check_balance:
nonzero_counts = [
l for l in node2leaders.values() if l > 0
]
if min(nonzero_counts) + 2 < max(nonzero_counts):
self.logger.debug(
f"leader counts unbalanced for topic {topic}: "
f"{sorted(node2leaders.items())}")
return False

return True

wait_until(predicate, timeout_sec=timeout_sec, backoff_sec=5)

@cluster(num_nodes=6, log_allow_list=RESTART_LOG_ALLOW_LIST)
def test_leadership_pinning(self):
for ix, node in enumerate(self.redpanda.nodes):
self.redpanda.set_extra_node_conf(node, {
'rack': self.RACK_LAYOUT[ix],
})
self.redpanda.add_extra_rp_conf(
{'default_leaders_preference': "racks: A"})
self.redpanda.start()

rpk = RpkTool(self.redpanda)

partition_counts = {"foo": 60, "bar": 20}

self.logger.info("creating topics")

rpk.create_topic("foo", partitions=60, replicas=3)
rpk.create_topic("bar",
partitions=20,
replicas=3,
config={"redpanda.leaders.preference": "racks: C"})

# bigger timeout to allow balancer to activate, health reports to propagate, etc.
self.wait_for_racks(partition_counts, {
"foo": {"A"},
"bar": {"C"}
},
timeout_sec=90)

self.logger.info("altering topic preference")

rpk.alter_topic_config("bar", "redpanda.leaders.preference",
"racks: B, C")

self.wait_for_racks(partition_counts, {
"foo": {"A"},
"bar": {"B", "C"}
},
timeout_sec=30)

# Decrease idle timeout to not wait too long after nodes are killed
self.redpanda.set_cluster_config({"enable_leader_balancer": False})
self.redpanda.set_cluster_config(
{"leader_balancer_idle_timeout": 20000})
self.redpanda.set_cluster_config({"enable_leader_balancer": True})

self.logger.info("killing rack B")

for ix, node in enumerate(self.redpanda.nodes):
if self.RACK_LAYOUT[ix] == "B":
self.redpanda.stop_node(node)

self.wait_for_racks(partition_counts, {
"foo": {"A"},
"bar": {"C"}
},
timeout_sec=60)

self.logger.info("explicitly disabling for topic")
rpk.alter_topic_config("foo", "redpanda.leaders.preference", "none")

# There is cross-talk between partition counts of foo and bar, so we don't
# require balanced counts.
self.wait_for_racks(partition_counts, {
"foo": {"A", "C"},
"bar": {"C"}
},
check_balance=False,
timeout_sec=30)

self.logger.info("unset topic configs")

rpk.delete_topic_config("foo", "redpanda.leaders.preference")
rpk.delete_topic_config("bar", "redpanda.leaders.preference")

self.wait_for_racks(partition_counts, {
"foo": {"A"},
"bar": {"A"}
},
timeout_sec=30)

self.logger.info("unset default preference")

for ix, node in enumerate(self.redpanda.nodes):
if self.RACK_LAYOUT[ix] == "B":
self.redpanda.start_node(node)

self.redpanda.set_cluster_config(
{"default_leaders_preference": "none"})
self.wait_for_racks(partition_counts, {
"foo": {"A", "B", "C"},
"bar": {"A", "B", "C"}
},
timeout_sec=90)

0 comments on commit a767db9

Please sign in to comment.