Skip to content

Commit

Permalink
pandaproxy: add semamphore usage metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
IoannisRP committed Dec 16, 2024
1 parent efe9624 commit 55c3907
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 12 deletions.
74 changes: 74 additions & 0 deletions src/v/pandaproxy/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,78 @@ void probe::setup_public_metrics() {
.aggregate(aggregate_labels)});
}

server_probe::server_probe(
server::context_t& ctx, const ss::sstring& group_name)
: _ctx(ctx)
, _group_name(group_name)
, _metrics()
, _public_metrics() {
setup_metrics();
}

void server_probe::setup_metrics() {
namespace sm = ss::metrics;

auto setup_common = [this]<typename MetricDef>() {
const auto usage = [](const size_t current, const size_t max) {
constexpr double divide_by_zero = -1.;
constexpr double invalid_values = -2.;
if (max == 0) {
return divide_by_zero;
}
if (current > max) {
return invalid_values;
}
const auto max_d = static_cast<double>(max);
const auto current_d = static_cast<double>(current);
return (max_d - current_d) / max_d;
};

std::vector<MetricDef> defs;
defs.reserve(3);
defs.emplace_back(
sm::make_gauge(
"inflight_requests_usage_ratio",
[this, usage] {
return usage(_ctx.inflight_sem.current(), _ctx.max_inflight);
},
sm::description(ssx::sformat(
"Usage ratio of in-flight requests for {}", _group_name)))
.aggregate({}));
defs.emplace_back(
sm::make_gauge(
"memory_usage_ratio",
[this, usage] {
return usage(_ctx.mem_sem.current(), _ctx.max_memory);
},
sm::description(
ssx::sformat("Usage ratio of memory for {}", _group_name)))
.aggregate({}));
defs.emplace_back(
sm::make_gauge(
"queued_requests_memory_blocked",
[this] { return _ctx.mem_sem.waiters(); },
sm::description(ssx::sformat(
"Number of requests queued in {}, due to memory limitations",
_group_name)))
.aggregate({}));
return defs;
};

if (!config::shard_local_cfg().disable_metrics()) {
_metrics.add_group(
_group_name,
setup_common
.template operator()<ss::metrics::impl::metric_definition_impl>(),
{},
{});
}

if (!config::shard_local_cfg().disable_public_metrics()) {
_public_metrics.add_group(
_group_name,
setup_common.template operator()<ss::metrics::metric_definition>());
}
}

} // namespace pandaproxy
19 changes: 19 additions & 0 deletions src/v/pandaproxy/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#pragma once

#include "metrics/metrics.h"
#include "pandaproxy/server.h"
#include "utils/log_hist.h"

#include <seastar/core/metrics_registration.hh>
Expand Down Expand Up @@ -82,4 +83,22 @@ class probe {
metrics::public_metric_groups _public_metrics;
};

class server_probe {
public:
server_probe(server::context_t& ctx, const ss::sstring& group_name);
server_probe(const server_probe&) = delete;
server_probe& operator=(const server_probe&) = delete;
server_probe(server_probe&&) = delete;
server_probe& operator=(server_probe&&) = delete;
~server_probe() = default;

private:
void setup_metrics();

server::context_t& _ctx;
const ss::sstring& _group_name;
metrics::internal_metric_groups _metrics;
metrics::public_metric_groups _public_metrics;
};

} // namespace pandaproxy
9 changes: 6 additions & 3 deletions src/v/pandaproxy/rest/proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ proxy::proxy(
, _inflight_config_binding(config::shard_local_cfg().max_in_flight_pandaproxy_requests_per_shard.bind())
, _client(client)
, _client_cache(client_cache)
, _ctx{{{{}, max_memory, _mem_sem, _inflight_sem, {}, smp_sg}, *this},
, _ctx{{{{}, max_memory, _mem_sem, _inflight_config_binding(), _inflight_sem, {}, smp_sg}, *this},
{config::always_true(), config::shard_local_cfg().superusers.bind(), controller},
_config.pandaproxy_api.value()}
, _server(
Expand All @@ -126,8 +126,11 @@ proxy::proxy(
json::serialization_format::application_json)
, _ensure_started{[this]() { return do_start(); }}
, _controller(controller) {
_inflight_config_binding.watch(
[this]() { _inflight_sem.set_capacity(_inflight_config_binding()); });
_inflight_config_binding.watch([this]() {
const size_t capacity = _inflight_config_binding();
_inflight_sem.set_capacity(capacity);
_ctx.max_inflight = capacity;
});
}

ss::future<> proxy::start() {
Expand Down
9 changes: 6 additions & 3 deletions src/v/pandaproxy/schema_registry/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ service::service(
config::shard_local_cfg()
.max_in_flight_schema_registry_requests_per_shard.bind())
, _client(client)
, _ctx{{{}, max_memory, _mem_sem, _inflight_sem, {}, smp_sg}, *this}
, _ctx{{{}, max_memory, _mem_sem, _inflight_config_binding(), _inflight_sem, {}, smp_sg}, *this}
, _server(
"schema_registry", // server_name
"schema_registry", // public_metric_group_name
Expand All @@ -632,8 +632,11 @@ service::service(
config::always_true(),
config::shard_local_cfg().superusers.bind(),
controller.get()} {
_inflight_config_binding.watch(
[this]() { _inflight_sem.set_capacity(_inflight_config_binding()); });
_inflight_config_binding.watch([this]() {
const size_t capacity = _inflight_config_binding();
_inflight_sem.set_capacity(capacity);
_ctx.max_inflight = capacity;
});
}

ss::future<> service::start() {
Expand Down
21 changes: 17 additions & 4 deletions src/v/pandaproxy/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

#include "pandaproxy/server.h"

#include "config/configuration.h"
#include "metrics/metrics.h"
#include "model/metadata.h"
#include "net/dns.h"
#include "net/tls_certificate_probe.h"
Expand All @@ -17,6 +19,7 @@
#include "pandaproxy/probe.h"
#include "pandaproxy/reply.h"
#include "rpc/rpc_utils.h"
#include "ssx/sformat.h"

#include <seastar/core/coroutine.hh>
#include <seastar/http/function_handlers.hh>
Expand All @@ -28,6 +31,7 @@

#include <charconv>
#include <exception>
#include <memory>

namespace pandaproxy {

Expand Down Expand Up @@ -156,7 +160,8 @@ server::server(
, _api20(std::move(api20))
, _has_routes(false)
, _ctx(ctx)
, _exceptional_mime_type(exceptional_mime_type) {
, _exceptional_mime_type(exceptional_mime_type)
, _probe{} {
_api20.set_api_doc(_server._routes);
_api20.register_api_file(_server._routes, header);
_api20.add_definitions_file(_server._routes, definitions);
Expand Down Expand Up @@ -201,6 +206,9 @@ ss::future<> server::start(
const std::vector<model::broker_endpoint>& advertised) {
_server._routes.register_exeption_handler(
exception_replier{ss::sstring{name(_exceptional_mime_type)}});

_probe = std::make_unique<server_probe>(_ctx, _public_metrics_group_name);

_ctx.advertised_listeners.reserve(endpoints.size());
for (auto& server_endpoint : endpoints) {
auto addr = co_await net::resolve_dns(server_endpoint.address);
Expand Down Expand Up @@ -240,13 +248,18 @@ ss::future<> server::start(
}
co_await _server.listen(addr, cred);
}

co_return;
}

ss::future<> server::stop() {
return _pending_reqs.close()
.finally([this]() { return _ctx.as.request_abort(); })
.finally([this]() mutable { return _server.stop(); });
return _pending_reqs.close().finally([this]() {
_ctx.as.request_abort();
_probe.reset(nullptr);
return _server.stop();
});
}

server::~server() noexcept = default;

} // namespace pandaproxy
9 changes: 7 additions & 2 deletions src/v/pandaproxy/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sstring.hh>
#include <seastar/http/api_docs.hh>
#include <seastar/http/handlers.hh>
#include <seastar/http/httpd.hh>
Expand All @@ -40,6 +41,8 @@

namespace pandaproxy {

class server_probe;

inline ss::shard_id user_shard(const ss::sstring& name) {
auto hash = xxhash_64(name.data(), name.length());
return jump_consistent_hash(hash, ss::smp::count);
Expand Down Expand Up @@ -71,6 +74,7 @@ class server {
std::vector<net::unresolved_address> advertised_listeners;
size_t max_memory;
ssx::semaphore& mem_sem;
size_t max_inflight;
adjustable_semaphore& inflight_sem;
ss::abort_source as;
ss::smp_service_group smp_sg;
Expand Down Expand Up @@ -104,9 +108,9 @@ class server {
};

server() = delete;
~server() = default;
~server() noexcept;
server(const server&) = delete;
server(server&&) noexcept = default;
server(server&&) noexcept = delete;
server& operator=(const server&) = delete;
server& operator=(server&&) = delete;

Expand Down Expand Up @@ -136,6 +140,7 @@ class server {
bool _has_routes;
context_t& _ctx;
json::serialization_format _exceptional_mime_type;
std::unique_ptr<server_probe> _probe;
};

template<typename service_t>
Expand Down

0 comments on commit 55c3907

Please sign in to comment.