diff --git a/tests/rptest/tests/consumer_group_test.py b/tests/rptest/tests/consumer_group_test.py index da71d683078f3..46b0296a72866 100644 --- a/tests/rptest/tests/consumer_group_test.py +++ b/tests/rptest/tests/consumer_group_test.py @@ -575,6 +575,89 @@ async def create_groups(r): assert len(list) == groups_in_round * rounds + @cluster(num_nodes=5) + def test_consumer_static_member_update(self): + """ + Test validating that re-joining static member will update the client id + """ + self.create_topic(20) + + group = 'test-gr-1' + + rpk = RpkTool(self.redpanda) + + # create and start first consumer + consumer1 = self.create_consumer( + topic=self.topic_spec.name, + group=group, + instance_name="static-consumer", + instance_id="panda-instance", + consumer_properties={"client.id": "my-client-1"}) + + consumer1.start() + + self.wait_for_members(group, 1) + + # wait for some messages + self.start_producer() + wait_until( + lambda: ConsumerGroupTest.consumed_at_least([consumer1], 50), + timeout_sec=30, + backoff_sec=2, + err_msg="consumer1 did not consume messages") + + # validate initial state + rpk_group_1 = rpk.group_describe(group) + + assert rpk_group_1.state == "Stable", f"Describe: {rpk_group_1}" + assert rpk_group_1.members == 1, f"Describe: {rpk_group_1}" + for p in rpk_group_1.partitions: + assert p.client_id == 'my-client-1', f"Describe: {p}" + + # clean up + self.producer.wait() + self.producer.free() + + consumer1.stop() + consumer1.wait() + consumer1.free() + + # create and start consumer with same instance_id but different cliend_id + consumer2 = self.create_consumer( + topic=self.topic_spec.name, + group=group, + instance_name="static-consumer", + instance_id="panda-instance", + consumer_properties={"client.id": "my-client-2"}) + + consumer2.start() + + self.wait_for_members(group, 1) + + # wait for some messages + self.start_producer() + wait_until( + lambda: ConsumerGroupTest.consumed_at_least([consumer2], 50), + timeout_sec=30, + backoff_sec=2, + err_msg="consumer2 did not consume messages") + + # validate updated state + rpk_group_2 = rpk.group_describe(group) + + assert rpk_group_2.state == "Stable", f"Describe: {rpk_group_2}" + assert rpk_group_2.members == 1, f"Describe: {rpk_group_2}" + for p in rpk_group_2.partitions: + assert p.client_id == 'my-client-2', f"Describe: {p}" + + # clean up + consumer2.stop() + consumer2.wait() + consumer2.free() + + self.producer.wait() + self.producer.free() + @dataclass class OffsetAndMetadata():