Skip to content

Commit

Permalink
tx/observability: REST endpoint to fetch all producers from a partition
Browse files Browse the repository at this point in the history
/v1/debug/producers/{namespace}/{topic}/{partition}

.. includes low level debug information about producers for
idempotency/transactional state.
  • Loading branch information
bharathv committed Dec 23, 2024
1 parent de32443 commit 2c28789
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 0 deletions.
80 changes: 80 additions & 0 deletions src/v/redpanda/admin/debug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,13 @@ void admin_server::register_debug_routes() {
return get_partition_state_handler(std::move(req));
});

register_route<user>(
seastar::httpd::debug_json::get_partition_producers,
[this](std::unique_ptr<ss::http::request> req)
-> ss::future<ss::json::json_return_type> {
return get_producers_state_handler(std::move(req));
});

register_route<superuser>(
ss::httpd::debug_json::cpu_profile,
[this](std::unique_ptr<ss::http::request> req)
Expand Down Expand Up @@ -810,6 +817,79 @@ admin_server::get_partition_state_handler(
co_return ss::json::json_return_type(std::move(response));
}

ss::future<ss::json::json_return_type>
admin_server::get_producers_state_handler(
std::unique_ptr<ss::http::request> req) {
const model::ntp ntp = parse_ntp_from_request(req->param);
auto timeout = std::chrono::duration_cast<model::timeout_clock::duration>(
10s);
auto result = co_await _tx_gateway_frontend.local().get_producers(
cluster::get_producers_request{ntp, timeout});
if (result.error_code != cluster::tx::errc::none) {
throw ss::httpd::server_error_exception(fmt::format(
"Error {} processing partition state for ntp: {}",
result.error_code,
ntp));
}
vlog(
adminlog.debug,
"producers for {}, size: {}",
ntp,
result.producers.size());
ss::httpd::debug_json::partition_producers producers;
producers.ntp = fmt::format("{}", ntp);
for (auto& producer : result.producers) {
ss::httpd::debug_json::partition_producer_state producer_state;
producer_state.id = producer.pid.id();
producer_state.epoch = producer.pid.epoch();
for (const auto& req : producer.inflight_requests) {
ss::httpd::debug_json::idempotent_producer_request_state inflight;
inflight.first_sequence = req.first_sequence;
inflight.last_sequence = req.last_sequence;
inflight.term = req.term();
producer_state.inflight_idempotent_requests.push(
std::move(inflight));
}
for (const auto& req : producer.finished_requests) {
ss::httpd::debug_json::idempotent_producer_request_state finished;
finished.first_sequence = req.first_sequence;
finished.last_sequence = req.last_sequence;
finished.term = req.term();
producer_state.finished_idempotent_requests.push(
std::move(finished));
}
if (producer.last_update) {
producer_state.last_update_timestamp
= producer.last_update.value()();
}
if (producer.tx_begin_offset) {
producer_state.transaction_begin_offset
= producer.tx_begin_offset.value();
}
if (producer.tx_end_offset) {
producer_state.transaction_last_offset
= producer.tx_end_offset.value();
}
if (producer.tx_seq) {
producer_state.transaction_sequence = producer.tx_seq.value();
}
if (producer.tx_timeout) {
producer_state.transaction_timeout_ms
= producer.tx_timeout.value().count();
}
if (producer.coordinator_partition) {
producer_state.transaction_coordinator_partition
= producer.coordinator_partition.value()();
}
if (producer.group_id) {
producer_state.transaction_group_id = producer.group_id.value();
}
producers.producers.push(std::move(producer_state));
co_await ss::coroutine::maybe_yield();
}
co_return ss::json::json_return_type(std::move(producers));
}

ss::future<ss::json::json_return_type> admin_server::get_node_uuid_handler() {
ss::httpd::debug_json::broker_uuid uuid;
uuid.node_uuid = ssx::sformat(
Expand Down
2 changes: 2 additions & 0 deletions src/v/redpanda/admin/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,8 @@ class admin_server {

ss::future<ss::json::json_return_type>
get_partition_state_handler(std::unique_ptr<ss::http::request>);
ss::future<ss::json::json_return_type>
get_producers_state_handler(std::unique_ptr<ss::http::request>);
ss::future<ss::json::json_return_type>
get_local_storage_usage_handler(std::unique_ptr<ss::http::request>);
ss::future<ss::json::json_return_type>
Expand Down
4 changes: 4 additions & 0 deletions tests/rptest/services/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1563,6 +1563,10 @@ def get_partition_state(self, namespace, topic, partition, node=None):
path = f"debug/partition/{namespace}/{topic}/{partition}"
return self._request("GET", path, node=node).json()

def get_producers_state(self, namespace, topic, partition, node=None):
path = f"debug/producers/{namespace}/{topic}/{partition}"
return self._request("GET", path, node=node).json()

def get_local_storage_usage(self, node=None):
"""
Get the local storage usage report.
Expand Down
70 changes: 70 additions & 0 deletions tests/rptest/transactions/producers_api_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2024 Redpanda Data, Inc.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.md
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0
import threading
from time import sleep
from rptest.transactions.verifiers.consumer_offsets_verifier import ConsumerOffsetsVerifier
from rptest.services.admin import Admin
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.cluster import cluster


class ProducersAdminAPITest(RedpandaTest):
def __init__(self, test_context):
super(ProducersAdminAPITest,
self).__init__(test_context=test_context,
num_brokers=3,
extra_rp_conf={
"group_topic_partitions": 1,
"group_new_member_join_timeout": 3000,
"enable_leader_balancer": False
})
self._stop_scraping = threading.Event()

def get_producer_state(self, topic: str):
admin = self.redpanda._admin
return admin.get_producers_state(namespace="kafka",
topic=topic,
partition=0)

@cluster(num_nodes=3)
def test_producers_state_api_during_load(self):
verifier = ConsumerOffsetsVerifier(self.redpanda, self._client)
self.redpanda._admin.await_stable_leader(topic="__consumer_offsets")
self.redpanda._admin.await_stable_leader(topic=verifier._topic)

# Run a scraper in background as the verifier is running to ensure it doesn't
# interact with the workloads incorrectly
def scraper():
while not self._stop_scraping.isSet():
self.get_producer_state(verifier._topic)
self.get_producer_state("__consumer_offsets")
sleep(1)

bg_scraper = threading.Thread(target=scraper, daemon=True)
bg_scraper.start()
verifier.verify()
self._stop_scraping.set()
bg_scraper.join()

# Basic sanity checks
co_producers = self.get_producer_state("__consumer_offsets")
assert len(
co_producers["producers"]
) == 10, "Not all producers states found in consumer_offsets partition"
expected_groups = set([f"group-{i}" for i in range(10)])
state_groups = set([
producer["'transaction_group_id"]
for producer in co_producers["producers"]
])
assert expected_groups == state_groups, f"Not all groups reported. expected: {expected_groups}, repoted: {state_groups}"

topic_producers = self.get_producer_state(verifier._topic)
assert len(
topic_producers["producers"]
) == 10, "Not all producers states found in data topic partition"

0 comments on commit 2c28789

Please sign in to comment.