diff --git a/src/v/redpanda/admin/debug.cc b/src/v/redpanda/admin/debug.cc index 79bdc4a10bf5..66c0bec6ac62 100644 --- a/src/v/redpanda/admin/debug.cc +++ b/src/v/redpanda/admin/debug.cc @@ -418,6 +418,13 @@ void admin_server::register_debug_routes() { return get_partition_state_handler(std::move(req)); }); + register_route( + seastar::httpd::debug_json::get_partition_producers, + [this](std::unique_ptr req) + -> ss::future { + return get_producers_state_handler(std::move(req)); + }); + register_route( ss::httpd::debug_json::cpu_profile, [this](std::unique_ptr req) @@ -810,6 +817,79 @@ admin_server::get_partition_state_handler( co_return ss::json::json_return_type(std::move(response)); } +ss::future +admin_server::get_producers_state_handler( + std::unique_ptr req) { + const model::ntp ntp = parse_ntp_from_request(req->param); + auto timeout = std::chrono::duration_cast( + 10s); + auto result = co_await _tx_gateway_frontend.local().get_producers( + cluster::get_producers_request{ntp, timeout}); + if (result.error_code != cluster::tx::errc::none) { + throw ss::httpd::server_error_exception(fmt::format( + "Error {} processing partition state for ntp: {}", + result.error_code, + ntp)); + } + vlog( + adminlog.debug, + "producers for {}, size: {}", + ntp, + result.producers.size()); + ss::httpd::debug_json::partition_producers producers; + producers.ntp = fmt::format("{}", ntp); + for (auto& producer : result.producers) { + ss::httpd::debug_json::partition_producer_state producer_state; + producer_state.id = producer.pid.id(); + producer_state.epoch = producer.pid.epoch(); + for (const auto& req : producer.inflight_requests) { + ss::httpd::debug_json::idempotent_producer_request_state inflight; + inflight.first_sequence = req.first_sequence; + inflight.last_sequence = req.last_sequence; + inflight.term = req.term(); + producer_state.inflight_idempotent_requests.push( + std::move(inflight)); + } + for (const auto& req : producer.finished_requests) { + ss::httpd::debug_json::idempotent_producer_request_state finished; + finished.first_sequence = req.first_sequence; + finished.last_sequence = req.last_sequence; + finished.term = req.term(); + producer_state.finished_idempotent_requests.push( + std::move(finished)); + } + if (producer.last_update) { + producer_state.last_update_timestamp + = producer.last_update.value()(); + } + if (producer.tx_begin_offset) { + producer_state.transaction_begin_offset + = producer.tx_begin_offset.value(); + } + if (producer.tx_end_offset) { + producer_state.transaction_last_offset + = producer.tx_end_offset.value(); + } + if (producer.tx_seq) { + producer_state.transaction_sequence = producer.tx_seq.value(); + } + if (producer.tx_timeout) { + producer_state.transaction_timeout_ms + = producer.tx_timeout.value().count(); + } + if (producer.coordinator_partition) { + producer_state.transaction_coordinator_partition + = producer.coordinator_partition.value()(); + } + if (producer.group_id) { + producer_state.transaction_group_id = producer.group_id.value(); + } + producers.producers.push(std::move(producer_state)); + co_await ss::coroutine::maybe_yield(); + } + co_return ss::json::json_return_type(std::move(producers)); +} + ss::future admin_server::get_node_uuid_handler() { ss::httpd::debug_json::broker_uuid uuid; uuid.node_uuid = ssx::sformat( diff --git a/src/v/redpanda/admin/server.h b/src/v/redpanda/admin/server.h index be0c900c6f25..1d399fe8a513 100644 --- a/src/v/redpanda/admin/server.h +++ b/src/v/redpanda/admin/server.h @@ -619,6 +619,8 @@ class admin_server { ss::future get_partition_state_handler(std::unique_ptr); + ss::future + get_producers_state_handler(std::unique_ptr); ss::future get_local_storage_usage_handler(std::unique_ptr); ss::future diff --git a/tests/rptest/services/admin.py b/tests/rptest/services/admin.py index 246ecc052852..0ae64e289efc 100644 --- a/tests/rptest/services/admin.py +++ b/tests/rptest/services/admin.py @@ -1563,6 +1563,10 @@ def get_partition_state(self, namespace, topic, partition, node=None): path = f"debug/partition/{namespace}/{topic}/{partition}" return self._request("GET", path, node=node).json() + def get_producers_state(self, namespace, topic, partition, node=None): + path = f"debug/producers/{namespace}/{topic}/{partition}" + return self._request("GET", path, node=node).json() + def get_local_storage_usage(self, node=None): """ Get the local storage usage report. diff --git a/tests/rptest/transactions/producers_api_test.py b/tests/rptest/transactions/producers_api_test.py new file mode 100644 index 000000000000..05af93815eee --- /dev/null +++ b/tests/rptest/transactions/producers_api_test.py @@ -0,0 +1,70 @@ +# Copyright 2024 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 +import threading +from time import sleep +from rptest.transactions.verifiers.consumer_offsets_verifier import ConsumerOffsetsVerifier +from rptest.services.admin import Admin +from rptest.tests.redpanda_test import RedpandaTest +from rptest.services.cluster import cluster + + +class ProducersAdminAPITest(RedpandaTest): + def __init__(self, test_context): + super(ProducersAdminAPITest, + self).__init__(test_context=test_context, + num_brokers=3, + extra_rp_conf={ + "group_topic_partitions": 1, + "group_new_member_join_timeout": 3000, + "enable_leader_balancer": False + }) + self._stop_scraping = threading.Event() + + def get_producer_state(self, topic: str): + admin = self.redpanda._admin + return admin.get_producers_state(namespace="kafka", + topic=topic, + partition=0) + + @cluster(num_nodes=3) + def test_producers_state_api_during_load(self): + verifier = ConsumerOffsetsVerifier(self.redpanda, self._client) + self.redpanda._admin.await_stable_leader(topic="__consumer_offsets") + self.redpanda._admin.await_stable_leader(topic=verifier._topic) + + # Run a scraper in background as the verifier is running to ensure it doesn't + # interact with the workloads incorrectly + def scraper(): + while not self._stop_scraping.isSet(): + self.get_producer_state(verifier._topic) + self.get_producer_state("__consumer_offsets") + sleep(1) + + bg_scraper = threading.Thread(target=scraper, daemon=True) + bg_scraper.start() + verifier.verify() + self._stop_scraping.set() + bg_scraper.join() + + # Basic sanity checks + co_producers = self.get_producer_state("__consumer_offsets") + assert len( + co_producers["producers"] + ) == 10, "Not all producers states found in consumer_offsets partition" + expected_groups = set([f"group-{i}" for i in range(10)]) + state_groups = set([ + producer["'transaction_group_id"] + for producer in co_producers["producers"] + ]) + assert expected_groups == state_groups, f"Not all groups reported. expected: {expected_groups}, repoted: {state_groups}" + + topic_producers = self.get_producer_state(verifier._topic) + assert len( + topic_producers["producers"] + ) == 10, "Not all producers states found in data topic partition"