From 55c390764df86c9bdda2a261967908efd0043a14 Mon Sep 17 00:00:00 2001 From: Ioannis Kavvadias Date: Thu, 12 Dec 2024 18:36:43 +0000 Subject: [PATCH] pandaproxy: add semamphore usage metrics --- src/v/pandaproxy/probe.cc | 74 +++++++++++++++++++++ src/v/pandaproxy/probe.h | 19 ++++++ src/v/pandaproxy/rest/proxy.cc | 9 ++- src/v/pandaproxy/schema_registry/service.cc | 9 ++- src/v/pandaproxy/server.cc | 21 ++++-- src/v/pandaproxy/server.h | 9 ++- 6 files changed, 129 insertions(+), 12 deletions(-) diff --git a/src/v/pandaproxy/probe.cc b/src/v/pandaproxy/probe.cc index a5dd7bb237d3f..bd4820b5dea6d 100644 --- a/src/v/pandaproxy/probe.cc +++ b/src/v/pandaproxy/probe.cc @@ -104,4 +104,78 @@ void probe::setup_public_metrics() { .aggregate(aggregate_labels)}); } +server_probe::server_probe( + server::context_t& ctx, const ss::sstring& group_name) + : _ctx(ctx) + , _group_name(group_name) + , _metrics() + , _public_metrics() { + setup_metrics(); +} + +void server_probe::setup_metrics() { + namespace sm = ss::metrics; + + auto setup_common = [this]() { + const auto usage = [](const size_t current, const size_t max) { + constexpr double divide_by_zero = -1.; + constexpr double invalid_values = -2.; + if (max == 0) { + return divide_by_zero; + } + if (current > max) { + return invalid_values; + } + const auto max_d = static_cast(max); + const auto current_d = static_cast(current); + return (max_d - current_d) / max_d; + }; + + std::vector defs; + defs.reserve(3); + defs.emplace_back( + sm::make_gauge( + "inflight_requests_usage_ratio", + [this, usage] { + return usage(_ctx.inflight_sem.current(), _ctx.max_inflight); + }, + sm::description(ssx::sformat( + "Usage ratio of in-flight requests for {}", _group_name))) + .aggregate({})); + defs.emplace_back( + sm::make_gauge( + "memory_usage_ratio", + [this, usage] { + return usage(_ctx.mem_sem.current(), _ctx.max_memory); + }, + sm::description( + ssx::sformat("Usage ratio of memory for {}", _group_name))) + .aggregate({})); + defs.emplace_back( + sm::make_gauge( + "queued_requests_memory_blocked", + [this] { return _ctx.mem_sem.waiters(); }, + sm::description(ssx::sformat( + "Number of requests queued in {}, due to memory limitations", + _group_name))) + .aggregate({})); + return defs; + }; + + if (!config::shard_local_cfg().disable_metrics()) { + _metrics.add_group( + _group_name, + setup_common + .template operator()(), + {}, + {}); + } + + if (!config::shard_local_cfg().disable_public_metrics()) { + _public_metrics.add_group( + _group_name, + setup_common.template operator()()); + } +} + } // namespace pandaproxy diff --git a/src/v/pandaproxy/probe.h b/src/v/pandaproxy/probe.h index 70270b2b67089..97a251b32f1f2 100644 --- a/src/v/pandaproxy/probe.h +++ b/src/v/pandaproxy/probe.h @@ -12,6 +12,7 @@ #pragma once #include "metrics/metrics.h" +#include "pandaproxy/server.h" #include "utils/log_hist.h" #include @@ -82,4 +83,22 @@ class probe { metrics::public_metric_groups _public_metrics; }; +class server_probe { +public: + server_probe(server::context_t& ctx, const ss::sstring& group_name); + server_probe(const server_probe&) = delete; + server_probe& operator=(const server_probe&) = delete; + server_probe(server_probe&&) = delete; + server_probe& operator=(server_probe&&) = delete; + ~server_probe() = default; + +private: + void setup_metrics(); + + server::context_t& _ctx; + const ss::sstring& _group_name; + metrics::internal_metric_groups _metrics; + metrics::public_metric_groups _public_metrics; +}; + } // namespace pandaproxy diff --git a/src/v/pandaproxy/rest/proxy.cc b/src/v/pandaproxy/rest/proxy.cc index 911c4aef1e660..7eb5eca96f088 100644 --- a/src/v/pandaproxy/rest/proxy.cc +++ b/src/v/pandaproxy/rest/proxy.cc @@ -113,7 +113,7 @@ proxy::proxy( , _inflight_config_binding(config::shard_local_cfg().max_in_flight_pandaproxy_requests_per_shard.bind()) , _client(client) , _client_cache(client_cache) - , _ctx{{{{}, max_memory, _mem_sem, _inflight_sem, {}, smp_sg}, *this}, + , _ctx{{{{}, max_memory, _mem_sem, _inflight_config_binding(), _inflight_sem, {}, smp_sg}, *this}, {config::always_true(), config::shard_local_cfg().superusers.bind(), controller}, _config.pandaproxy_api.value()} , _server( @@ -126,8 +126,11 @@ proxy::proxy( json::serialization_format::application_json) , _ensure_started{[this]() { return do_start(); }} , _controller(controller) { - _inflight_config_binding.watch( - [this]() { _inflight_sem.set_capacity(_inflight_config_binding()); }); + _inflight_config_binding.watch([this]() { + const size_t capacity = _inflight_config_binding(); + _inflight_sem.set_capacity(capacity); + _ctx.max_inflight = capacity; + }); } ss::future<> proxy::start() { diff --git a/src/v/pandaproxy/schema_registry/service.cc b/src/v/pandaproxy/schema_registry/service.cc index 77502d30c5e7e..056f38a73386a 100644 --- a/src/v/pandaproxy/schema_registry/service.cc +++ b/src/v/pandaproxy/schema_registry/service.cc @@ -614,7 +614,7 @@ service::service( config::shard_local_cfg() .max_in_flight_schema_registry_requests_per_shard.bind()) , _client(client) - , _ctx{{{}, max_memory, _mem_sem, _inflight_sem, {}, smp_sg}, *this} + , _ctx{{{}, max_memory, _mem_sem, _inflight_config_binding(), _inflight_sem, {}, smp_sg}, *this} , _server( "schema_registry", // server_name "schema_registry", // public_metric_group_name @@ -632,8 +632,11 @@ service::service( config::always_true(), config::shard_local_cfg().superusers.bind(), controller.get()} { - _inflight_config_binding.watch( - [this]() { _inflight_sem.set_capacity(_inflight_config_binding()); }); + _inflight_config_binding.watch([this]() { + const size_t capacity = _inflight_config_binding(); + _inflight_sem.set_capacity(capacity); + _ctx.max_inflight = capacity; + }); } ss::future<> service::start() { diff --git a/src/v/pandaproxy/server.cc b/src/v/pandaproxy/server.cc index ea8ab73cfa637..1d96ea50a9e93 100644 --- a/src/v/pandaproxy/server.cc +++ b/src/v/pandaproxy/server.cc @@ -9,6 +9,8 @@ #include "pandaproxy/server.h" +#include "config/configuration.h" +#include "metrics/metrics.h" #include "model/metadata.h" #include "net/dns.h" #include "net/tls_certificate_probe.h" @@ -17,6 +19,7 @@ #include "pandaproxy/probe.h" #include "pandaproxy/reply.h" #include "rpc/rpc_utils.h" +#include "ssx/sformat.h" #include #include @@ -28,6 +31,7 @@ #include #include +#include namespace pandaproxy { @@ -156,7 +160,8 @@ server::server( , _api20(std::move(api20)) , _has_routes(false) , _ctx(ctx) - , _exceptional_mime_type(exceptional_mime_type) { + , _exceptional_mime_type(exceptional_mime_type) + , _probe{} { _api20.set_api_doc(_server._routes); _api20.register_api_file(_server._routes, header); _api20.add_definitions_file(_server._routes, definitions); @@ -201,6 +206,9 @@ ss::future<> server::start( const std::vector& advertised) { _server._routes.register_exeption_handler( exception_replier{ss::sstring{name(_exceptional_mime_type)}}); + + _probe = std::make_unique(_ctx, _public_metrics_group_name); + _ctx.advertised_listeners.reserve(endpoints.size()); for (auto& server_endpoint : endpoints) { auto addr = co_await net::resolve_dns(server_endpoint.address); @@ -240,13 +248,18 @@ ss::future<> server::start( } co_await _server.listen(addr, cred); } + co_return; } ss::future<> server::stop() { - return _pending_reqs.close() - .finally([this]() { return _ctx.as.request_abort(); }) - .finally([this]() mutable { return _server.stop(); }); + return _pending_reqs.close().finally([this]() { + _ctx.as.request_abort(); + _probe.reset(nullptr); + return _server.stop(); + }); } +server::~server() noexcept = default; + } // namespace pandaproxy diff --git a/src/v/pandaproxy/server.h b/src/v/pandaproxy/server.h index 7bca466fc052e..e854b0a8e86af 100644 --- a/src/v/pandaproxy/server.h +++ b/src/v/pandaproxy/server.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -40,6 +41,8 @@ namespace pandaproxy { +class server_probe; + inline ss::shard_id user_shard(const ss::sstring& name) { auto hash = xxhash_64(name.data(), name.length()); return jump_consistent_hash(hash, ss::smp::count); @@ -71,6 +74,7 @@ class server { std::vector advertised_listeners; size_t max_memory; ssx::semaphore& mem_sem; + size_t max_inflight; adjustable_semaphore& inflight_sem; ss::abort_source as; ss::smp_service_group smp_sg; @@ -104,9 +108,9 @@ class server { }; server() = delete; - ~server() = default; + ~server() noexcept; server(const server&) = delete; - server(server&&) noexcept = default; + server(server&&) noexcept = delete; server& operator=(const server&) = delete; server& operator=(server&&) = delete; @@ -136,6 +140,7 @@ class server { bool _has_routes; context_t& _ctx; json::serialization_format _exceptional_mime_type; + std::unique_ptr _probe; }; template