From 4f956d8cd9334dbe455b4fac49e332fe811cf106 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Tue, 18 Apr 2023 16:48:07 -0400 Subject: [PATCH 01/16] kafka/server: add a per-handler probe (cherry picked from commit 31b985c84517b3fec58ded11c01634c8964f9dcb) --- src/v/kafka/CMakeLists.txt | 1 + src/v/kafka/server/handlers/handler_probe.cc | 111 +++++++++++++++++++ src/v/kafka/server/handlers/handler_probe.h | 71 ++++++++++++ 3 files changed, 183 insertions(+) create mode 100644 src/v/kafka/server/handlers/handler_probe.cc create mode 100644 src/v/kafka/server/handlers/handler_probe.h diff --git a/src/v/kafka/CMakeLists.txt b/src/v/kafka/CMakeLists.txt index 46e87f0318637..5e7c2e82bc2d9 100644 --- a/src/v/kafka/CMakeLists.txt +++ b/src/v/kafka/CMakeLists.txt @@ -25,6 +25,7 @@ set(handlers_srcs server/handlers/topics/topic_utils.cc server/handlers/describe_producers.cc server/handlers/describe_transactions.cc + server/handlers/handler_probe.cc server/handlers/list_transactions.cc ) diff --git a/src/v/kafka/server/handlers/handler_probe.cc b/src/v/kafka/server/handlers/handler_probe.cc new file mode 100644 index 0000000000000..b8dfb28d7504e --- /dev/null +++ b/src/v/kafka/server/handlers/handler_probe.cc @@ -0,0 +1,111 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "kafka/server/handlers/handler_probe.h" + +#include "config/configuration.h" +#include "kafka/server/handlers/handler_interface.h" +#include "kafka/server/logger.h" +#include "prometheus/prometheus_sanitize.h" + +#include +#include +#include + +#include +#include + +namespace kafka { + +handler_probe_manager::handler_probe_manager() + : _metrics() + , _probes(max_api_key() + 2) { + const auto unknown_handler_key = max_api_key() + 1; + for (size_t i = 0; i < _probes.size(); i++) { + auto key = api_key{i}; + + if (handler_for_key(key) || i == unknown_handler_key) { + _probes[i].setup_metrics(_metrics, key); + } + } +} + +handler_probe& handler_probe_manager::get_probe(api_key key) { + if (!handler_for_key(key)) { + return _probes.back(); + } + + return _probes[key]; +} + +handler_probe::handler_probe() + : _last_recorded_in_progress(ss::lowres_clock::now()) {} + +void handler_probe::setup_metrics( + ss::metrics::metric_groups& metrics, api_key key) { + namespace sm = ss::metrics; + + if (config::shard_local_cfg().disable_metrics()) { + return; + } + + const char* handler_name; + if (auto handler = handler_for_key(key)) { + handler_name = handler.value()->name(); + } else { + handler_name = "unknown_handler"; + } + + std::vector labels{sm::label("handler")(handler_name)}; + auto aggregate_labels = config::shard_local_cfg().aggregate_metrics() + ? std::vector{sm::shard_label} + : std::vector{}; + + metrics.add_group( + prometheus_sanitize::metrics_name("kafka_handler"), + { + sm::make_counter( + "requests_completed_total", + [this] { return _requests_completed; }, + sm::description("Number of kafka requests completed"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "requests_errored_total", + [this] { return _requests_errored; }, + sm::description("Number of kafka requests errored"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "requests_in_progress_total", + [this] { return _requests_in_progress_every_ns / 1'000'000'000; }, + sm::description("A running total of kafka requests in progress"), + labels) + .aggregate(aggregate_labels), + }); +} + +/* + * This roughly approximates an integral of `_requests_in_progress`. + * By providing Prometheus with a counter of the integral rather than + * a gauge for `_requests_in_progress` we avoid any bias in the value + * that Prometheus's sampling rate could introduce. + */ +void handler_probe::sample_in_progress() { + auto now = ss::lowres_clock::now(); + auto s_diff = (now - _last_recorded_in_progress) + / std::chrono::nanoseconds(1); + + _requests_in_progress_every_ns += _requests_in_progress * s_diff; + _last_recorded_in_progress = now; +} + +} // namespace kafka diff --git a/src/v/kafka/server/handlers/handler_probe.h b/src/v/kafka/server/handlers/handler_probe.h new file mode 100644 index 0000000000000..50147c56bada6 --- /dev/null +++ b/src/v/kafka/server/handlers/handler_probe.h @@ -0,0 +1,71 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "kafka/protocol/types.h" +#include "ssx/metrics.h" + +namespace kafka { + +/** + * Stores per-handler metrics for kafka requests. + * And exposes them to the internal metrics endpoint. + */ +class handler_probe { +public: + explicit handler_probe(); + void setup_metrics(ss::metrics::metric_groups&, api_key); + + void sample_in_progress(); + void request_completed() { + sample_in_progress(); + + _requests_completed++; + _requests_in_progress--; + } + void request_errored() { _requests_errored++; } + void request_started() { + sample_in_progress(); + + _requests_in_progress++; + } + +private: + uint64_t _requests_completed{0}; + uint64_t _requests_errored{0}; + + uint64_t _requests_in_progress{0}; + uint64_t _requests_in_progress_every_ns{0}; + + ss::lowres_clock::time_point _last_recorded_in_progress; +}; + +/** + * Maps Kafka api keys to the `handler_probe` instance + * specific to that key. + */ +class handler_probe_manager { +public: + handler_probe_manager(); + /** + * Maps an `api_key` to the metrics probe it's associate with. + * If the `api_key` isn't valid a probe to an `unknown_handler` + * is returned instead. + */ + handler_probe& get_probe(api_key key); + +private: + ss::metrics::metric_groups _metrics; + std::vector _probes; +}; + +} // namespace kafka From 2c33fb9c13b56b8eb49612d0470db405a7179558 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Tue, 18 Apr 2023 16:49:40 -0400 Subject: [PATCH 02/16] kafka/sever: use a per-handler probe for requests (cherry picked from commit 56659cb0ed022684e6ac9f45a776326017f53ee7) --- src/v/kafka/server/connection_context.cc | 20 ++++++++++++-------- src/v/kafka/server/connection_context.h | 21 ++++++++++++++++++--- src/v/kafka/server/server.h | 7 +++++++ 3 files changed, 37 insertions(+), 11 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 28949f06c2028..37ea64a699521 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -286,7 +286,8 @@ ss::future connection_context::throttle_request( request_data r_data = request_data{ .request_key = hdr.key, .client_id = ss::sstring{hdr.client_id.value_or("")}}; - auto tracker = std::make_unique(_server.probe()); + auto& h_probe = _server.handler_probe(r_data.request_key); + auto tracker = std::make_unique(_server.probe(), h_probe); auto fut = ss::now(); if (delay.enforce > delay_t::clock::duration::zero()) { fut = ss::sleep_abortable(delay.enforce, _server.abort_source()); @@ -422,8 +423,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { seq, correlation, self, - sres = std::move(sres)]( - ss::future<> d) mutable { + sres](ss::future<> d) mutable { /* * if the dispatch/first stage failed, then we need to * need to consume the second stage since it might be @@ -440,7 +440,8 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { "Discarding second stage failure {}", e); }) - .finally([self, d = std::move(d)]() mutable { + .finally([self, d = std::move(d), sres]() mutable { + sres->tracker->mark_errored(); self->_server.probe().service_error(); self->_server.probe().request_completed(); return std::move(d); @@ -454,7 +455,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { _server.conn_gate(), [this, f = std::move(f), - sres = std::move(sres), + sres, seq, correlation]() mutable { return f.then([this, @@ -469,7 +470,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { return maybe_process_responses(); }); }) - .handle_exception([self](std::exception_ptr e) { + .handle_exception([self, sres](std::exception_ptr e) { // ssx::spawn_with_gate already caught // shutdown-like exceptions, so we should only // be taking this path for real errors. That @@ -494,15 +495,17 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { e); } + sres->tracker->mark_errored(); self->_server.probe().service_error(); self->conn->shutdown_input(); }); return d; }) - .handle_exception([self](std::exception_ptr e) { + .handle_exception([self, sres](std::exception_ptr e) { vlog( klog.info, "Detected error dispatching request: {}", e); self->conn->shutdown_input(); + sres->tracker->mark_errored(); }); }); }) @@ -574,8 +577,9 @@ ss::future<> connection_context::maybe_process_responses() { }) // release the resources only once it has been written to the // connection. - .finally([resources = std::move(resp_and_res.resources)] {}); + .finally([resources = resp_and_res.resources] {}); } catch (...) { + resp_and_res.resources->tracker->mark_errored(); vlog( klog.debug, "Failed to process request: {}", diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 778c5779f2d23..424743ca3077c 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -10,6 +10,7 @@ */ #pragma once #include "config/property.h" +#include "kafka/server/handlers/handler_probe.h" #include "kafka/server/response.h" #include "kafka/server/server.h" #include "kafka/types.h" @@ -52,19 +53,33 @@ class request_context; // used to track number of pending requests class request_tracker { public: - explicit request_tracker(net::server_probe& probe) noexcept - : _probe(probe) { + explicit request_tracker( + net::server_probe& probe, handler_probe& h_probe) noexcept + : _probe(probe) + , _h_probe(h_probe) { _probe.request_received(); + _h_probe.request_started(); } request_tracker(const request_tracker&) = delete; request_tracker(request_tracker&&) = delete; request_tracker& operator=(const request_tracker&) = delete; request_tracker& operator=(request_tracker&&) = delete; - ~request_tracker() noexcept { _probe.request_completed(); } + void mark_errored() { _errored = true; } + + ~request_tracker() noexcept { + _probe.request_completed(); + if (_errored) { + _h_probe.request_errored(); + } else { + _h_probe.request_completed(); + } + } private: net::server_probe& _probe; + handler_probe& _h_probe; + bool _errored{false}; }; struct request_data { diff --git a/src/v/kafka/server/server.h b/src/v/kafka/server/server.h index 652470b31d5c6..fc8c89396763f 100644 --- a/src/v/kafka/server/server.h +++ b/src/v/kafka/server/server.h @@ -16,8 +16,10 @@ #include "coproc/fwd.h" #include "features/feature_table.h" #include "kafka/latency_probe.h" +#include "kafka/protocol/types.h" #include "kafka/server/fetch_metadata_cache.hh" #include "kafka/server/fwd.h" +#include "kafka/server/handlers/handler_probe.h" #include "kafka/server/queue_depth_monitor.h" #include "net/server.h" #include "security/fwd.h" @@ -171,6 +173,10 @@ class server final ssx::semaphore& memory_fetch_sem() noexcept { return _memory_fetch_sem; } + handler_probe& handler_probe(api_key key) { + return _handler_probes.get_probe(key); + } + private: void setup_metrics(); @@ -202,6 +208,7 @@ class server final security::gssapi_principal_mapper _gssapi_principal_mapper; security::krb5::configurator _krb_configurator; ssx::semaphore _memory_fetch_sem; + handler_probe_manager _handler_probes; class latency_probe _probe; ss::metrics::metric_groups _metrics; From d00f167e8acc3c6ec115cbb765677bf6b745ca66 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Mon, 8 May 2023 21:04:30 -0400 Subject: [PATCH 03/16] chore: format connection_context (cherry picked from commit afdcdc2e46c218e52d240ecb8136218f7a609384) --- src/v/kafka/server/connection_context.cc | 58 ++++++++++++------------ 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 37ea64a699521..f2f2363ba6cd6 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -470,35 +470,35 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { return maybe_process_responses(); }); }) - .handle_exception([self, sres](std::exception_ptr e) { - // ssx::spawn_with_gate already caught - // shutdown-like exceptions, so we should only - // be taking this path for real errors. That - // also means that on shutdown we don't bother - // to call shutdown_input on the connection, so - // rely on any future reader to check the abort - // source before considering reading the - // connection. - - auto disconnected - = net::is_disconnect_exception(e); - if (disconnected) { - vlog( - klog.info, - "Disconnected {} ({})", - self->conn->addr, - disconnected.value()); - } else { - vlog( - klog.warn, - "Error processing request: {}", - e); - } - - sres->tracker->mark_errored(); - self->_server.probe().service_error(); - self->conn->shutdown_input(); - }); + .handle_exception( + [self, sres](std::exception_ptr e) { + // ssx::spawn_with_gate already caught + // shutdown-like exceptions, so we should only + // be taking this path for real errors. That + // also means that on shutdown we don't bother + // to call shutdown_input on the connection, + // so rely on any future reader to check the + // abort source before considering reading the + // connection. + auto disconnected + = net::is_disconnect_exception(e); + if (disconnected) { + vlog( + klog.info, + "Disconnected {} ({})", + self->conn->addr, + disconnected.value()); + } else { + vlog( + klog.warn, + "Error processing request: {}", + e); + } + + sres->tracker->mark_errored(); + self->_server.probe().service_error(); + self->conn->shutdown_input(); + }); return d; }) .handle_exception([self, sres](std::exception_ptr e) { From 32b115af34336f65653ab6d07d8fc5e9a50eea59 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Fri, 28 Apr 2023 13:11:49 -0400 Subject: [PATCH 04/16] kafka/handlers: add bytes sent/recv metrics (cherry picked from commit d78ed20321a668131085aa4e38face937169bb65) --- src/v/kafka/server/handlers/handler_probe.cc | 12 ++++++++++++ src/v/kafka/server/handlers/handler_probe.h | 7 +++++++ 2 files changed, 19 insertions(+) diff --git a/src/v/kafka/server/handlers/handler_probe.cc b/src/v/kafka/server/handlers/handler_probe.cc index b8dfb28d7504e..6992aa06595a5 100644 --- a/src/v/kafka/server/handlers/handler_probe.cc +++ b/src/v/kafka/server/handlers/handler_probe.cc @@ -90,6 +90,18 @@ void handler_probe::setup_metrics( sm::description("A running total of kafka requests in progress"), labels) .aggregate(aggregate_labels), + sm::make_counter( + "received_bytes_total", + [this] { return _bytes_received; }, + sm::description("Number of bytes received from kafka requests"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "sent_bytes_total", + [this] { return _bytes_sent; }, + sm::description("Number of bytes sent in kafka replies"), + labels) + .aggregate(aggregate_labels), }); } diff --git a/src/v/kafka/server/handlers/handler_probe.h b/src/v/kafka/server/handlers/handler_probe.h index 50147c56bada6..6d7ac196cab3a 100644 --- a/src/v/kafka/server/handlers/handler_probe.h +++ b/src/v/kafka/server/handlers/handler_probe.h @@ -39,6 +39,10 @@ class handler_probe { _requests_in_progress++; } + void add_bytes_received(size_t bytes) { _bytes_received += bytes; } + + void add_bytes_sent(size_t bytes) { _bytes_sent += bytes; } + private: uint64_t _requests_completed{0}; uint64_t _requests_errored{0}; @@ -47,6 +51,9 @@ class handler_probe { uint64_t _requests_in_progress_every_ns{0}; ss::lowres_clock::time_point _last_recorded_in_progress; + + uint64_t _bytes_received{0}; + uint64_t _bytes_sent{0}; }; /** From 6673152339e9b53630d4487b8be23c1c994b4839 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Fri, 28 Apr 2023 13:49:34 -0400 Subject: [PATCH 05/16] kafka/server: integrate metrics for bytes sent/recv into connection_context (cherry picked from commit 9e42f80d430df3d964e1eb72777189679534f49e) --- src/v/kafka/server/connection_context.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index f2f2363ba6cd6..3d16197d237b4 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -118,6 +118,7 @@ ss::future<> connection_context::process_one_request() { _server.probe().header_corrupted(); co_return; } + _server.handler_probe(h->key).add_bytes_received(sz.value()); try { co_return co_await dispatch_method_once( @@ -569,6 +570,7 @@ ss::future<> connection_context::maybe_process_responses() { *_snc_quota_context, response_size); } } + _server.handler_probe(request_key).add_bytes_sent(response_size); try { return conn->write(std::move(msg)) .then([] { From 7baddb497d45a31b2d97f0a6205e2ed9a9ba7ce3 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Thu, 25 May 2023 17:44:56 -0400 Subject: [PATCH 06/16] kafka/server: remove the unneeded completion count in finally block Since `request_tracker` will count the completion of the request when it is destroyed this removed line will double count failed completed requests. (cherry picked from commit 1c222caa788e05e488523665b648e5edfff85ccb) --- src/v/kafka/server/connection_context.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 3d16197d237b4..84baf59a1fd6a 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -444,7 +444,6 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { .finally([self, d = std::move(d), sres]() mutable { sres->tracker->mark_errored(); self->_server.probe().service_error(); - self->_server.probe().request_completed(); return std::move(d); }); } From bd84e41b87e1ba2eaa9b4ac22d93597f3ec35639 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Thu, 25 May 2023 17:50:40 -0400 Subject: [PATCH 07/16] kafka/server: move counting of service errors to request_tracker (cherry picked from commit 1cabd8a4f7cfc21fa3e4c5bb9b56e4da4207f78b) --- src/v/kafka/server/connection_context.cc | 2 -- src/v/kafka/server/connection_context.h | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 84baf59a1fd6a..78dcdfb4c5b76 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -443,7 +443,6 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { }) .finally([self, d = std::move(d), sres]() mutable { sres->tracker->mark_errored(); - self->_server.probe().service_error(); return std::move(d); }); } @@ -496,7 +495,6 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { } sres->tracker->mark_errored(); - self->_server.probe().service_error(); self->conn->shutdown_input(); }); return d; diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 424743ca3077c..208fe89612390 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -71,6 +71,7 @@ class request_tracker { _probe.request_completed(); if (_errored) { _h_probe.request_errored(); + _probe.service_error(); } else { _h_probe.request_completed(); } From 33c5d60f71833f8f126d434725d18ee1cba4de38 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Tue, 11 Jul 2023 15:11:22 -0400 Subject: [PATCH 08/16] kafka/server: add latency histogram to handler probe (cherry picked from commit ca7e46fb637cd57b4218bcee1da4a872073506c1) --- src/v/kafka/server/handlers/handler_probe.cc | 9 +++++++++ src/v/kafka/server/handlers/handler_probe.h | 11 +++++++++++ 2 files changed, 20 insertions(+) diff --git a/src/v/kafka/server/handlers/handler_probe.cc b/src/v/kafka/server/handlers/handler_probe.cc index 6992aa06595a5..a28b903841c83 100644 --- a/src/v/kafka/server/handlers/handler_probe.cc +++ b/src/v/kafka/server/handlers/handler_probe.cc @@ -33,6 +33,7 @@ handler_probe_manager::handler_probe_manager() auto key = api_key{i}; if (handler_for_key(key) || i == unknown_handler_key) { + _probes[i].initialize(); _probes[i].setup_metrics(_metrics, key); } } @@ -102,6 +103,14 @@ void handler_probe::setup_metrics( sm::description("Number of bytes sent in kafka replies"), labels) .aggregate(aggregate_labels), + sm::make_histogram( + "latency_seconds", + sm::description("Latency histogram of kafka requests"), + labels, + [this] { + return ssx::metrics::report_default_histogram(_latency.value()); + }) + .aggregate(aggregate_labels), }); } diff --git a/src/v/kafka/server/handlers/handler_probe.h b/src/v/kafka/server/handlers/handler_probe.h index 6d7ac196cab3a..5b7c9301fa652 100644 --- a/src/v/kafka/server/handlers/handler_probe.h +++ b/src/v/kafka/server/handlers/handler_probe.h @@ -13,6 +13,9 @@ #include "kafka/protocol/types.h" #include "ssx/metrics.h" +#include "utils/hdr_hist.h" + +#include namespace kafka { @@ -43,6 +46,12 @@ class handler_probe { void add_bytes_sent(size_t bytes) { _bytes_sent += bytes; } + void initialize() { _latency = hdr_hist(); } + + std::unique_ptr auto_latency_measurement() { + return _latency->auto_measure(); + } + private: uint64_t _requests_completed{0}; uint64_t _requests_errored{0}; @@ -54,6 +63,8 @@ class handler_probe { uint64_t _bytes_received{0}; uint64_t _bytes_sent{0}; + + std::optional _latency{std::nullopt}; }; /** From ec39b0d4cc56b466055650837ad597499c9e4df0 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Thu, 13 Jul 2023 22:01:57 -0400 Subject: [PATCH 09/16] utils: add a powers of two histogram implementation This histogram type avoids using `hdr_hist` to generate seastar histograms in metrics. It instead implements a simple powers of two histogram that reduces memory utilization from kilobytes to `number_of_buckets` * sizeof(uint64_t). (cherry picked from commit b1ff1b712450fd3958ee6709ce3844062728aebb) --- src/v/utils/log_hist.h | 188 ++++++++++++++++++++ src/v/utils/tests/seastar_histogram_test.cc | 97 ++++++++++ 2 files changed, 285 insertions(+) create mode 100644 src/v/utils/log_hist.h diff --git a/src/v/utils/log_hist.h b/src/v/utils/log_hist.h new file mode 100644 index 0000000000000..0de3ecaede452 --- /dev/null +++ b/src/v/utils/log_hist.h @@ -0,0 +1,188 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include +#include + +#include + +#include +#include +#include +#include + +/* + * A histogram implementation + * The buckets upper bounds are powers of 2 minus 1. + * `first_bucket_upper_bound` therefore must be a power of 2. + * The number of values represented by each bucket increases by powers 2 as + * well. + * + * Assume `number_of_buckets` is 4 and `first_bucket_upper_bound` is 16 the + * bucket value ranges are; + * + * [1, 16), [16, 32), [32, 64), [64, 128) + * + * And if 1, 16, 32, and 33 are recorded the buckets will have the following + * counts; + * + * [1, 16) = 1 + * [16, 32) = 1 + * [32, 64) = 2 + * [64, 128) = 0 + */ +template< + typename duration_t, + int number_of_buckets, + uint64_t first_bucket_upper_bound> +class log_hist { + static_assert( + first_bucket_upper_bound >= 1 + && (first_bucket_upper_bound & (first_bucket_upper_bound - 1)) == 0, + "first bucket bound must be power of 2"); + + using measurement_canary_t = seastar::lw_shared_ptr; + +public: + static constexpr int first_bucket_clz = std::countl_zero( + first_bucket_upper_bound - 1); + static constexpr int first_bucket_exp = 64 - first_bucket_clz; + + using clock_type = std::chrono::high_resolution_clock; + + /// \brief move-only type to tracking durations + /// if the log_hist ptr goes out of scope, it will detach itself + /// and the recording will simply be ignored. + class measurement { + public: + explicit measurement(log_hist& h) + : _canary(h._canary) + , _h(std::ref(h)) + , _begin_t(log_hist::clock_type::now()) {} + measurement(const measurement&) = delete; + measurement& operator=(const measurement&) = delete; + measurement(measurement&& o) noexcept + : _canary(o._canary) + , _h(o._h) + , _begin_t(o._begin_t) { + o.cancel(); + } + measurement& operator=(measurement&& o) noexcept { + if (this != &o) { + this->~measurement(); + new (this) measurement(std::move(o)); + } + return *this; + } + ~measurement() noexcept { + if (_canary && *_canary) { + _h.get().record(compute_duration()); + } + } + + // Cancels this measurements and prevents any values from + // being recorded to the underlying histogram. + void cancel() { _canary = nullptr; } + + private: + int64_t compute_duration() const { + return std::chrono::duration_cast( + log_hist::clock_type::now() - _begin_t) + .count(); + } + + measurement_canary_t _canary; + std::reference_wrapper _h; + log_hist::clock_type::time_point _begin_t; + }; + + std::unique_ptr auto_measure() { + return std::make_unique(*this); + } + + log_hist() + : _canary(seastar::make_lw_shared(true)) + , _counts(number_of_buckets) {} + + ~log_hist() { + // Notify any active measurements that this object no longer exists. + *_canary = false; + } + + /* + * record expects values of that are equivalent to `duration_t::count()` + * so make sure the input is scaled correctly. + */ + void record(uint64_t val) { + _sample_sum += val; + const int i = std::clamp( + first_bucket_clz - std::countl_zero(val), + 0, + static_cast(_counts.size() - 1)); + _counts[i]++; + } + + void record(std::unique_ptr m) { + record(m->compute_duration()); + } + + seastar::metrics::histogram seastar_histogram_logform(int64_t scale) const { + seastar::metrics::histogram hist; + hist.buckets.resize(_counts.size()); + hist.sample_sum = static_cast(_sample_sum) + / static_cast(scale); + + uint64_t cumulative_count = 0; + for (uint64_t i = 0; i < _counts.size(); i++) { + auto& bucket = hist.buckets[i]; + + cumulative_count += _counts[i]; + bucket.count = cumulative_count; + uint64_t unscaled_upper_bound = ((uint64_t)1 + << (first_bucket_exp + i)) + - 1; + bucket.upper_bound = static_cast(unscaled_upper_bound) + / static_cast(scale); + } + + hist.sample_count = cumulative_count; + return hist; + } + +private: + friend measurement; + + // Used to inform measurements whether `log_hist` has been destroyed + measurement_canary_t _canary; + + std::vector _counts; + uint64_t _sample_sum{0}; +}; + +/* + * This histogram produces indentical results as the public metric's `hdr_hist`. + * So if this histogram and `hdr_hist` are create and have the same values + * recorded to them then `log_hist_public::seastar_histogram_logform(1000000)` + * will produce the same seastar histogram as + * `ssx::metrics::report_default_histogram(hdr_hist)`. + */ +using log_hist_public = log_hist; +static constexpr int64_t log_hist_public_scale = 1'000'000; + +/* + * This histogram produces results that are similar, but not indentical to the + * internal metric's `hdr_hist`. Some of the first buckets will have the + * following bounds; [log_hist_internal upper bounds, internal hdr_hist upper + * bounds] [8, 10], [16, 20], [32, 41], [64, 83], [128, 167], [256, 335] + */ +using log_hist_internal = log_hist; diff --git a/src/v/utils/tests/seastar_histogram_test.cc b/src/v/utils/tests/seastar_histogram_test.cc index 3c92f0a387ed2..2ab372f45e612 100644 --- a/src/v/utils/tests/seastar_histogram_test.cc +++ b/src/v/utils/tests/seastar_histogram_test.cc @@ -1,7 +1,14 @@ #include "utils/hdr_hist.h" +#include "utils/log_hist.h" +#include #include +#include + +#include +#include + SEASTAR_THREAD_TEST_CASE(test_seastar_histograms_match) { using namespace std::chrono_literals; @@ -20,3 +27,93 @@ SEASTAR_THREAD_TEST_CASE(test_seastar_histograms_match) { logform_b.buckets[idx].upper_bound); } } + +namespace { +bool approximately_equal(double a, double b) { + constexpr double precision_error = 0.0001; + return std::abs(a - b) <= precision_error; +} + +struct hist_config { + int64_t scale; + bool use_approximately_equal; +}; + +constexpr std::array hist_configs = { + hist_config{log_hist_public_scale, true}, hist_config{1, false}}; + +template +void validate_histograms_equal(const hdr_hist& a, const l_hist& b) { + for (auto cfg : hist_configs) { + const auto logform_a = a.seastar_histogram_logform( + 18, 250, 2.0, cfg.scale); + const auto logform_b = b.seastar_histogram_logform(cfg.scale); + + BOOST_CHECK_EQUAL(logform_a.sample_count, logform_b.sample_count); + if (cfg.use_approximately_equal) { + BOOST_CHECK( + approximately_equal(logform_a.sample_sum, logform_b.sample_sum)); + } else { + BOOST_CHECK_EQUAL(logform_a.sample_sum, logform_b.sample_sum); + } + + for (size_t idx = 0; idx < logform_a.buckets.size(); ++idx) { + if (cfg.use_approximately_equal) { + BOOST_CHECK(approximately_equal( + logform_a.buckets[idx].upper_bound, + logform_b.buckets[idx].upper_bound)); + } else { + BOOST_CHECK_EQUAL( + logform_a.buckets[idx].upper_bound, + logform_b.buckets[idx].upper_bound); + } + BOOST_CHECK_EQUAL( + logform_a.buckets[idx].count, logform_b.buckets[idx].count); + } + } +} +} // namespace + +// ensures both the log_hist_public and the public hdr_hist return identical +// seastar histograms for values recorded around bucket bounds. +SEASTAR_THREAD_TEST_CASE(test_public_log_hist_and_hdr_hist_equal_bounds) { + using namespace std::chrono_literals; + + hdr_hist a; + log_hist_public b; + + a.record(1); + b.record(1); + + for (unsigned i = 0; i < 17; i++) { + auto upper_bound + = (((unsigned)1 << (log_hist_public::first_bucket_exp + i)) - 1); + a.record(upper_bound); + a.record(upper_bound + 1); + b.record(upper_bound); + b.record(upper_bound + 1); + } + + validate_histograms_equal(a, b); +} + +// ensures both the log_hist_public and the public hdr_hist return identical +// seastar histograms for randomly selected values. +SEASTAR_THREAD_TEST_CASE(test_public_log_hist_and_hdr_hist_equal_rand) { + using namespace std::chrono_literals; + + hdr_hist a; + log_hist_public b; + + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution d(1, (1 << (8 + 17)) - 1); + + for (unsigned i = 0; i < 1'000'000; i++) { + auto sample = d(gen); + a.record(sample); + b.record(sample); + } + + validate_histograms_equal(a, b); +} From 686f5f9ff03193c75664611dc445b0bef754b5b0 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Tue, 11 Jul 2023 15:20:11 -0400 Subject: [PATCH 10/16] kafka/server: auto-measure handler latency in request_tracker (cherry picked from commit b774f7f722a590478711b5642e2a3d18b2ab8f7f) --- src/v/kafka/server/connection_context.cc | 8 +++++--- src/v/kafka/server/connection_context.h | 2 ++ src/v/kafka/server/handlers/handler_probe.cc | 6 ++---- src/v/kafka/server/handlers/handler_probe.h | 10 ++++++---- 4 files changed, 15 insertions(+), 11 deletions(-) diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 78dcdfb4c5b76..1b9b8224cc1e4 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -302,15 +302,16 @@ ss::future connection_context::throttle_request( r_data = std::move(r_data), delay = delay.request, track, - tracker = std::move(tracker)](ssx::semaphore_units units) mutable { + tracker = std::move(tracker), + &h_probe](ssx::semaphore_units units) mutable { return server().get_request_unit().then( [this, r_data = std::move(r_data), delay, mem_units = std::move(units), track, - tracker = std::move(tracker)]( - ssx::semaphore_units qd_units) mutable { + tracker = std::move(tracker), + &h_probe](ssx::semaphore_units qd_units) mutable { session_resources r{ .backpressure_delay = delay, .memlocks = std::move(mem_units), @@ -320,6 +321,7 @@ ss::future connection_context::throttle_request( if (track) { r.method_latency = _server.hist().auto_measure(); } + r.handler_latency = h_probe.auto_latency_measurement(); return r; }); }); diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 208fe89612390..3bcb1e45ca3b1 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -22,6 +22,7 @@ #include "security/sasl_authentication.h" #include "ssx/semaphore.h" #include "utils/hdr_hist.h" +#include "utils/log_hist.h" #include "utils/named_type.h" #include @@ -102,6 +103,7 @@ struct session_resources { ssx::semaphore_units memlocks; ssx::semaphore_units queue_units; std::unique_ptr method_latency; + std::unique_ptr handler_latency; std::unique_ptr tracker; request_data request_data; }; diff --git a/src/v/kafka/server/handlers/handler_probe.cc b/src/v/kafka/server/handlers/handler_probe.cc index a28b903841c83..9667851abd353 100644 --- a/src/v/kafka/server/handlers/handler_probe.cc +++ b/src/v/kafka/server/handlers/handler_probe.cc @@ -104,12 +104,10 @@ void handler_probe::setup_metrics( labels) .aggregate(aggregate_labels), sm::make_histogram( - "latency_seconds", + "latency_microseconds", sm::description("Latency histogram of kafka requests"), labels, - [this] { - return ssx::metrics::report_default_histogram(_latency.value()); - }) + [this] { return _latency->seastar_histogram_logform(1); }) .aggregate(aggregate_labels), }); } diff --git a/src/v/kafka/server/handlers/handler_probe.h b/src/v/kafka/server/handlers/handler_probe.h index 5b7c9301fa652..dbea14104b339 100644 --- a/src/v/kafka/server/handlers/handler_probe.h +++ b/src/v/kafka/server/handlers/handler_probe.h @@ -13,7 +13,7 @@ #include "kafka/protocol/types.h" #include "ssx/metrics.h" -#include "utils/hdr_hist.h" +#include "utils/log_hist.h" #include @@ -25,6 +25,8 @@ namespace kafka { */ class handler_probe { public: + using hist_t = log_hist_internal; + explicit handler_probe(); void setup_metrics(ss::metrics::metric_groups&, api_key); @@ -46,9 +48,9 @@ class handler_probe { void add_bytes_sent(size_t bytes) { _bytes_sent += bytes; } - void initialize() { _latency = hdr_hist(); } + void initialize() { _latency = hist_t(); } - std::unique_ptr auto_latency_measurement() { + std::unique_ptr auto_latency_measurement() { return _latency->auto_measure(); } @@ -64,7 +66,7 @@ class handler_probe { uint64_t _bytes_received{0}; uint64_t _bytes_sent{0}; - std::optional _latency{std::nullopt}; + std::optional _latency{std::nullopt}; }; /** From 0b233d780e1fc278c7bccd0aa471f72f5e797ce3 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Tue, 18 Jul 2023 17:37:50 -0400 Subject: [PATCH 11/16] utils: delete move and copy ctors in log_hist (cherry picked from commit 6d314a1375c89ffc873a000841aa92dacc594396) --- src/v/kafka/server/handlers/handler_probe.cc | 3 +-- src/v/kafka/server/handlers/handler_probe.h | 4 +--- src/v/utils/log_hist.h | 5 ++++- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/v/kafka/server/handlers/handler_probe.cc b/src/v/kafka/server/handlers/handler_probe.cc index 9667851abd353..35c75bb94b08a 100644 --- a/src/v/kafka/server/handlers/handler_probe.cc +++ b/src/v/kafka/server/handlers/handler_probe.cc @@ -33,7 +33,6 @@ handler_probe_manager::handler_probe_manager() auto key = api_key{i}; if (handler_for_key(key) || i == unknown_handler_key) { - _probes[i].initialize(); _probes[i].setup_metrics(_metrics, key); } } @@ -107,7 +106,7 @@ void handler_probe::setup_metrics( "latency_microseconds", sm::description("Latency histogram of kafka requests"), labels, - [this] { return _latency->seastar_histogram_logform(1); }) + [this] { return _latency.seastar_histogram_logform(1); }) .aggregate(aggregate_labels), }); } diff --git a/src/v/kafka/server/handlers/handler_probe.h b/src/v/kafka/server/handlers/handler_probe.h index dbea14104b339..a615bdbda54bf 100644 --- a/src/v/kafka/server/handlers/handler_probe.h +++ b/src/v/kafka/server/handlers/handler_probe.h @@ -48,10 +48,8 @@ class handler_probe { void add_bytes_sent(size_t bytes) { _bytes_sent += bytes; } - void initialize() { _latency = hist_t(); } - std::unique_ptr auto_latency_measurement() { - return _latency->auto_measure(); + return _latency.auto_measure(); } private: diff --git a/src/v/utils/log_hist.h b/src/v/utils/log_hist.h index 0de3ecaede452..4b912dc1c8f8c 100644 --- a/src/v/utils/log_hist.h +++ b/src/v/utils/log_hist.h @@ -113,7 +113,10 @@ class log_hist { log_hist() : _canary(seastar::make_lw_shared(true)) , _counts(number_of_buckets) {} - + log_hist(const log_hist& o) = delete; + log_hist& operator=(const log_hist&) = delete; + log_hist(log_hist&& o) = delete; + log_hist& operator=(log_hist&& o) = delete; ~log_hist() { // Notify any active measurements that this object no longer exists. *_canary = false; From 63f341e081596e7e80cd13631da966b198c10b5c Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Tue, 18 Jul 2023 17:38:29 -0400 Subject: [PATCH 12/16] kafka/server: reduce in-progress request count when handler errors (cherry picked from commit 69d98d9f1dc21d87bbe54214d2231eabf3e634f6) --- src/v/kafka/server/handlers/handler_probe.h | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/v/kafka/server/handlers/handler_probe.h b/src/v/kafka/server/handlers/handler_probe.h index a615bdbda54bf..e85e681f75155 100644 --- a/src/v/kafka/server/handlers/handler_probe.h +++ b/src/v/kafka/server/handlers/handler_probe.h @@ -37,7 +37,12 @@ class handler_probe { _requests_completed++; _requests_in_progress--; } - void request_errored() { _requests_errored++; } + void request_errored() { + sample_in_progress(); + + _requests_errored++; + _requests_in_progress--; + } void request_started() { sample_in_progress(); @@ -64,7 +69,7 @@ class handler_probe { uint64_t _bytes_received{0}; uint64_t _bytes_sent{0}; - std::optional _latency{std::nullopt}; + hist_t _latency{}; }; /** From 9a79715cd85779699d3e0460dfaa1e709b07b9f5 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Wed, 26 Jul 2023 17:38:51 -0400 Subject: [PATCH 13/16] utils: remove unneeded movslq from record function (cherry picked from commit 961178be090c53a4998a30e0ebd670b781c21d6b) --- src/v/utils/log_hist.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v/utils/log_hist.h b/src/v/utils/log_hist.h index 4b912dc1c8f8c..33fe85f541e94 100644 --- a/src/v/utils/log_hist.h +++ b/src/v/utils/log_hist.h @@ -128,7 +128,7 @@ class log_hist { */ void record(uint64_t val) { _sample_sum += val; - const int i = std::clamp( + const unsigned i = std::clamp( first_bucket_clz - std::countl_zero(val), 0, static_cast(_counts.size() - 1)); From 0b9faaf61a0c744ddc587a19c3e937e0b7cea0b2 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Tue, 25 Jul 2023 22:04:58 -0400 Subject: [PATCH 14/16] utils: refactor seastar_histogram_logform to allow for conversions from log_hist_internal to a public seastar hist Co-authored-by: Ben Pope (cherry picked from commit 6b578bd978c1ea15b488aa31e751f4f7435beabb) --- src/v/kafka/server/handlers/handler_probe.cc | 2 +- src/v/utils/CMakeLists.txt | 1 + src/v/utils/log_hist.cc | 98 +++++++++++++++++ src/v/utils/log_hist.h | 69 ++++++------ src/v/utils/tests/seastar_histogram_test.cc | 109 ++++++++++++------- 5 files changed, 205 insertions(+), 74 deletions(-) create mode 100644 src/v/utils/log_hist.cc diff --git a/src/v/kafka/server/handlers/handler_probe.cc b/src/v/kafka/server/handlers/handler_probe.cc index 35c75bb94b08a..b682b9f49b9fe 100644 --- a/src/v/kafka/server/handlers/handler_probe.cc +++ b/src/v/kafka/server/handlers/handler_probe.cc @@ -106,7 +106,7 @@ void handler_probe::setup_metrics( "latency_microseconds", sm::description("Latency histogram of kafka requests"), labels, - [this] { return _latency.seastar_histogram_logform(1); }) + [this] { return _latency.internal_histogram_logform(); }) .aggregate(aggregate_labels), }); } diff --git a/src/v/utils/CMakeLists.txt b/src/v/utils/CMakeLists.txt index a6167c0cdf419..47e8352f3b4f7 100644 --- a/src/v/utils/CMakeLists.txt +++ b/src/v/utils/CMakeLists.txt @@ -14,6 +14,7 @@ v_cc_library( uuid.cc request_auth.cc bottomless_token_bucket.cc + log_hist.cc DEPS Seastar::seastar Hdrhistogram::hdr_histogram diff --git a/src/v/utils/log_hist.cc b/src/v/utils/log_hist.cc new file mode 100644 index 0000000000000..3cdf0bdadc9f6 --- /dev/null +++ b/src/v/utils/log_hist.cc @@ -0,0 +1,98 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "utils/log_hist.h" + +template< + typename duration_t, + int number_of_buckets, + uint64_t first_bucket_upper_bound> +template +seastar::metrics::histogram +log_hist:: + seastar_histogram_logform() const { + seastar::metrics::histogram hist; + hist.buckets.resize(cfg::bucket_count); + hist.sample_sum = static_cast(_sample_sum) + / static_cast(cfg::scale); + + const unsigned first_bucket_exp + = 64 - std::countl_zero(first_bucket_upper_bound - 1); + const unsigned cfg_first_bucket_exp + = 64 - std::countl_zero(cfg::first_bucket_bound - 1); + + // Write bounds to seastar histogram + for (int i = 0; i < cfg::bucket_count; i++) { + auto& bucket = hist.buckets[i]; + bucket.count = 0; + + uint64_t unscaled_upper_bound = ((uint64_t)1 + << (cfg_first_bucket_exp + i)) + - 1; + bucket.upper_bound = static_cast(unscaled_upper_bound) + / static_cast(cfg::scale); + } + + uint64_t cumulative_count = 0; + size_t current_hist_idx = 0; + double current_hist_upper_bound = hist.buckets[0].upper_bound; + + // Write _counts to seastar histogram + for (size_t i = 0; i < _counts.size(); i++) { + uint64_t unscaled_upper_bound = ((uint64_t)1 << (first_bucket_exp + i)) + - 1; + double scaled_upper_bound = static_cast(unscaled_upper_bound) + / static_cast(cfg::scale); + + cumulative_count += _counts[i]; + + while (scaled_upper_bound > current_hist_upper_bound + && current_hist_idx != (hist.buckets.size() - 1)) { + current_hist_idx++; + current_hist_upper_bound + = hist.buckets[current_hist_idx].upper_bound; + } + + hist.buckets[current_hist_idx].count = cumulative_count; + } + + hist.sample_count = cumulative_count; + return hist; +} + +template< + typename duration_t, + int number_of_buckets, + uint64_t first_bucket_upper_bound> +seastar::metrics::histogram +log_hist:: + public_histogram_logform() const { + using public_hist_config = logform_config<1'000'000l, 256ul, 18>; + + return seastar_histogram_logform(); +} + +template< + typename duration_t, + int number_of_buckets, + uint64_t first_bucket_upper_bound> +seastar::metrics::histogram +log_hist:: + internal_histogram_logform() const { + using internal_hist_config = logform_config<1l, 8ul, 26>; + + return seastar_histogram_logform(); +} + +// Explicit instantiation for log_hist_public +template class log_hist; +// Explicit instantiation for log_hist_internal +template class log_hist; diff --git a/src/v/utils/log_hist.h b/src/v/utils/log_hist.h index 33fe85f541e94..9f53ab75758de 100644 --- a/src/v/utils/log_hist.h +++ b/src/v/utils/log_hist.h @@ -14,12 +14,10 @@ #include #include -#include - +#include #include #include #include -#include /* * A histogram implementation @@ -112,7 +110,7 @@ class log_hist { log_hist() : _canary(seastar::make_lw_shared(true)) - , _counts(number_of_buckets) {} + , _counts() {} log_hist(const log_hist& o) = delete; log_hist& operator=(const log_hist&) = delete; log_hist(log_hist&& o) = delete; @@ -135,32 +133,38 @@ class log_hist { _counts[i]++; } - void record(std::unique_ptr m) { - record(m->compute_duration()); - } - - seastar::metrics::histogram seastar_histogram_logform(int64_t scale) const { - seastar::metrics::histogram hist; - hist.buckets.resize(_counts.size()); - hist.sample_sum = static_cast(_sample_sum) - / static_cast(scale); - - uint64_t cumulative_count = 0; - for (uint64_t i = 0; i < _counts.size(); i++) { - auto& bucket = hist.buckets[i]; - - cumulative_count += _counts[i]; - bucket.count = cumulative_count; - uint64_t unscaled_upper_bound = ((uint64_t)1 - << (first_bucket_exp + i)) - - 1; - bucket.upper_bound = static_cast(unscaled_upper_bound) - / static_cast(scale); - } + template + struct logform_config { + static constexpr auto bound_is_pow_2 = _first_bucket_bound >= 1 + && (_first_bucket_bound + & (_first_bucket_bound - 1)) + == 0; + static_assert( + bound_is_pow_2, "_first_bucket_bound must be a power of 2"); + + static constexpr auto scale = _scale; + static constexpr auto first_bucket_bound = _first_bucket_bound; + static constexpr auto bucket_count = _bucket_count; + }; - hist.sample_count = cumulative_count; - return hist; - } + template + seastar::metrics::histogram seastar_histogram_logform() const; + /* + * Generates a Prometheus histogram with 18 buckets. The first bucket has an + * upper bound of 256 - 1 and subsequent buckets have an upper bound of 2 + * times the upper bound of the previous bucket. + * + * This is the histogram type used in the `/public_metrics` endpoint + */ + seastar::metrics::histogram public_histogram_logform() const; + /* + * Generates a Prometheus histogram with 26 buckets. The first bucket has an + * upper bound of 8 - 1 and subsequent buckets have an upper bound of 2 + * times the upper bound of the previous bucket. + * + * This is the histogram type used in the `/metrics` endpoint + */ + seastar::metrics::histogram internal_histogram_logform() const; private: friend measurement; @@ -168,7 +172,7 @@ class log_hist { // Used to inform measurements whether `log_hist` has been destroyed measurement_canary_t _canary; - std::vector _counts; + std::array _counts; uint64_t _sample_sum{0}; }; @@ -179,8 +183,7 @@ class log_hist { * will produce the same seastar histogram as * `ssx::metrics::report_default_histogram(hdr_hist)`. */ -using log_hist_public = log_hist; -static constexpr int64_t log_hist_public_scale = 1'000'000; +using log_hist_public = log_hist; /* * This histogram produces results that are similar, but not indentical to the @@ -188,4 +191,4 @@ static constexpr int64_t log_hist_public_scale = 1'000'000; * following bounds; [log_hist_internal upper bounds, internal hdr_hist upper * bounds] [8, 10], [16, 20], [32, 41], [64, 83], [128, 167], [256, 335] */ -using log_hist_internal = log_hist; +using log_hist_internal = log_hist; diff --git a/src/v/utils/tests/seastar_histogram_test.cc b/src/v/utils/tests/seastar_histogram_test.cc index 2ab372f45e612..b610ea597b40c 100644 --- a/src/v/utils/tests/seastar_histogram_test.cc +++ b/src/v/utils/tests/seastar_histogram_test.cc @@ -1,3 +1,4 @@ +#include "ssx/metrics.h" #include "utils/hdr_hist.h" #include "utils/log_hist.h" @@ -34,42 +35,21 @@ bool approximately_equal(double a, double b) { return std::abs(a - b) <= precision_error; } -struct hist_config { - int64_t scale; - bool use_approximately_equal; -}; +template +void validate_public_histograms_equal(const hdr_hist& a, const l_hist& b) { + const auto logform_a = ssx::metrics::report_default_histogram(a); + const auto logform_b = b.public_histogram_logform(); -constexpr std::array hist_configs = { - hist_config{log_hist_public_scale, true}, hist_config{1, false}}; + BOOST_CHECK_EQUAL(logform_a.sample_count, logform_b.sample_count); + BOOST_CHECK( + approximately_equal(logform_a.sample_sum, logform_b.sample_sum)); -template -void validate_histograms_equal(const hdr_hist& a, const l_hist& b) { - for (auto cfg : hist_configs) { - const auto logform_a = a.seastar_histogram_logform( - 18, 250, 2.0, cfg.scale); - const auto logform_b = b.seastar_histogram_logform(cfg.scale); - - BOOST_CHECK_EQUAL(logform_a.sample_count, logform_b.sample_count); - if (cfg.use_approximately_equal) { - BOOST_CHECK( - approximately_equal(logform_a.sample_sum, logform_b.sample_sum)); - } else { - BOOST_CHECK_EQUAL(logform_a.sample_sum, logform_b.sample_sum); - } - - for (size_t idx = 0; idx < logform_a.buckets.size(); ++idx) { - if (cfg.use_approximately_equal) { - BOOST_CHECK(approximately_equal( - logform_a.buckets[idx].upper_bound, - logform_b.buckets[idx].upper_bound)); - } else { - BOOST_CHECK_EQUAL( - logform_a.buckets[idx].upper_bound, - logform_b.buckets[idx].upper_bound); - } - BOOST_CHECK_EQUAL( - logform_a.buckets[idx].count, logform_b.buckets[idx].count); - } + for (size_t idx = 0; idx < logform_a.buckets.size(); ++idx) { + BOOST_CHECK(approximately_equal( + logform_a.buckets[idx].upper_bound, + logform_b.buckets[idx].upper_bound)); + BOOST_CHECK_EQUAL( + logform_a.buckets[idx].count, logform_b.buckets[idx].count); } } } // namespace @@ -77,8 +57,6 @@ void validate_histograms_equal(const hdr_hist& a, const l_hist& b) { // ensures both the log_hist_public and the public hdr_hist return identical // seastar histograms for values recorded around bucket bounds. SEASTAR_THREAD_TEST_CASE(test_public_log_hist_and_hdr_hist_equal_bounds) { - using namespace std::chrono_literals; - hdr_hist a; log_hist_public b; @@ -94,14 +72,12 @@ SEASTAR_THREAD_TEST_CASE(test_public_log_hist_and_hdr_hist_equal_bounds) { b.record(upper_bound + 1); } - validate_histograms_equal(a, b); + validate_public_histograms_equal(a, b); } // ensures both the log_hist_public and the public hdr_hist return identical // seastar histograms for randomly selected values. SEASTAR_THREAD_TEST_CASE(test_public_log_hist_and_hdr_hist_equal_rand) { - using namespace std::chrono_literals; - hdr_hist a; log_hist_public b; @@ -115,5 +91,58 @@ SEASTAR_THREAD_TEST_CASE(test_public_log_hist_and_hdr_hist_equal_rand) { b.record(sample); } - validate_histograms_equal(a, b); + validate_public_histograms_equal(a, b); +} + +// Ensures that an internal histogram is properly converted to a public metrics +// histogram. +SEASTAR_THREAD_TEST_CASE(test_internal_hist_to_public_hist_bounds) { + hdr_hist a; + log_hist_internal b; + + a.record(1); + b.record(1); + + for (unsigned i = 0; i < 17; i++) { + auto upper_bound + = (((unsigned)1 << (log_hist_internal::first_bucket_exp + i)) - 1); + a.record(upper_bound); + a.record(upper_bound + 1); + b.record(upper_bound); + b.record(upper_bound + 1); + } + + validate_public_histograms_equal(a, b); +} + +// Ensures that generating a internal seastar histogram from log_hist_public +// results in the additional buckets for the extended lower bounds having counts +// of zero. +SEASTAR_THREAD_TEST_CASE(test_public_hist_to_internal_hist) { + log_hist_public a; + log_hist_internal b; + + a.record(1); + b.record(1); + + for (unsigned i = 0; i < 17; i++) { + auto upper_bound + = (((unsigned)1 << (log_hist_internal::first_bucket_exp + i)) - 1); + a.record(upper_bound); + a.record(upper_bound + 1); + b.record(upper_bound); + b.record(upper_bound + 1); + } + + auto pub_to_int_hist = a.internal_histogram_logform(); + auto int_to_int_hist = b.internal_histogram_logform(); + + const auto public_ub_exp = 8; + const auto internal_ub_exp = 3; + + // The buckets in the extended lower bounds should be empty + for (int i = 0; i < public_ub_exp - internal_ub_exp; i++) { + BOOST_CHECK_EQUAL(pub_to_int_hist.buckets[i].count, 0); + BOOST_CHECK_NE(int_to_int_hist.buckets[i].count, 0); + } } From 9f3abd2d93fcb6edac11614bdfdda13f89014e6b Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Tue, 25 Jul 2023 23:10:06 -0400 Subject: [PATCH 15/16] utils: allow for pausing auto latency measurements in log_hist (cherry picked from commit caa4e92e6135098ed21a4cb7f4664969b345b1c1) --- src/v/utils/log_hist.h | 39 +++++++++++++---- src/v/utils/tests/seastar_histogram_test.cc | 48 +++++++++++++++++++++ 2 files changed, 78 insertions(+), 9 deletions(-) diff --git a/src/v/utils/log_hist.h b/src/v/utils/log_hist.h index 9f53ab75758de..6fb3272de524b 100644 --- a/src/v/utils/log_hist.h +++ b/src/v/utils/log_hist.h @@ -66,13 +66,15 @@ class log_hist { explicit measurement(log_hist& h) : _canary(h._canary) , _h(std::ref(h)) - , _begin_t(log_hist::clock_type::now()) {} + , _begin_t(log_hist::clock_type::now()) + , _total_latency(duration_t(0)) {} measurement(const measurement&) = delete; measurement& operator=(const measurement&) = delete; measurement(measurement&& o) noexcept : _canary(o._canary) , _h(o._h) - , _begin_t(o._begin_t) { + , _begin_t(o._begin_t) + , _total_latency(o._total_latency) { o.cancel(); } measurement& operator=(measurement&& o) noexcept { @@ -84,7 +86,7 @@ class log_hist { } ~measurement() noexcept { if (_canary && *_canary) { - _h.get().record(compute_duration()); + _h.get().record(compute_total_latency().count()); } } @@ -92,16 +94,35 @@ class log_hist { // being recorded to the underlying histogram. void cancel() { _canary = nullptr; } - private: - int64_t compute_duration() const { - return std::chrono::duration_cast( - log_hist::clock_type::now() - _begin_t) - .count(); + // Temporarily stops measuring latency. + void stop() { + _total_latency = compute_total_latency(); + _begin_t = std::nullopt; + } + + // Resumes measuring latency. + void start() { + if (!_begin_t.has_value()) { + _begin_t = log_hist::clock_type::now(); + } } + // Returns the total latency that has been measured so far. + duration_t compute_total_latency() const { + if (_begin_t) { + return _total_latency + + std::chrono::duration_cast( + log_hist::clock_type::now() - *_begin_t); + } else { + return _total_latency; + } + } + + private: measurement_canary_t _canary; std::reference_wrapper _h; - log_hist::clock_type::time_point _begin_t; + std::optional _begin_t; + duration_t _total_latency; }; std::unique_ptr auto_measure() { diff --git a/src/v/utils/tests/seastar_histogram_test.cc b/src/v/utils/tests/seastar_histogram_test.cc index b610ea597b40c..937c1ce88ab88 100644 --- a/src/v/utils/tests/seastar_histogram_test.cc +++ b/src/v/utils/tests/seastar_histogram_test.cc @@ -7,6 +7,7 @@ #include +#include #include #include @@ -146,3 +147,50 @@ SEASTAR_THREAD_TEST_CASE(test_public_hist_to_internal_hist) { BOOST_CHECK_NE(int_to_int_hist.buckets[i].count, 0); } } + +SEASTAR_THREAD_TEST_CASE(test_log_hist_measure) { + log_hist_internal a; + + { + auto m1 = a.auto_measure(); + ss::sleep(std::chrono::microseconds(1)).get(); + auto m2 = a.auto_measure(); + ss::sleep(std::chrono::microseconds(1)).get(); + } + + auto hist = a.internal_histogram_logform(); + BOOST_CHECK_EQUAL(hist.buckets.back().count, 2); +} + +SEASTAR_THREAD_TEST_CASE(test_log_hist_measure_pause) { + using namespace std::chrono_literals; + + log_hist_internal a; + + { + auto m1 = a.auto_measure(); + auto m2 = a.auto_measure(); + + m1->stop(); + m2->stop(); + + auto m1_dur = m1->compute_total_latency(); + auto m2_dur = m2->compute_total_latency(); + + ss::sleep(std::chrono::microseconds(1)).get(); + + BOOST_CHECK_EQUAL((m1->compute_total_latency() - m1_dur).count(), 0); + BOOST_CHECK_EQUAL((m2->compute_total_latency() - m2_dur).count(), 0); + + m1->start(); + m2->start(); + + ss::sleep(std::chrono::microseconds(1)).get(); + + BOOST_CHECK_GT((m1->compute_total_latency() - m1_dur).count(), 1); + BOOST_CHECK_GT((m2->compute_total_latency() - m2_dur).count(), 1); + } + + auto hist = a.internal_histogram_logform(); + BOOST_CHECK_EQUAL(hist.buckets.back().count, 2); +} From 0329bae6a5abb1768ec002c49846faf8fd8b2175 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Wed, 26 Jul 2023 20:19:09 -0400 Subject: [PATCH 16/16] treewide: replace hdr_hist with log_hist This reduces the memory required to store a histogram from kilobytes to 208 bytes. It also speeds up recording to the histogram by 2x. (cherry picked from commit c2959f541bedc51ba10364b1c0b3be96b96ad3d9) --- src/v/cloud_storage/partition_probe.h | 3 ++ src/v/cloud_storage/probe.h | 3 ++ src/v/cloud_storage_clients/client_pool.h | 1 + src/v/cloud_storage_clients/client_probe.h | 3 ++ .../test_client/abs_test_client_main.cc | 1 - .../test_client/s3_test_client_main.cc | 1 - src/v/http/demo/client.cc | 1 - src/v/kafka/latency_probe.h | 33 +++++++++++-------- src/v/kafka/server/connection_context.h | 3 +- src/v/kafka/server/handlers/fetch.cc | 4 +-- src/v/kafka/server/handlers/fetch.h | 7 ++-- src/v/kafka/server/handlers/produce.cc | 2 +- src/v/kafka/server/server.h | 2 +- src/v/net/server.cc | 4 +-- src/v/net/server.h | 13 +++++--- src/v/pandaproxy/probe.cc | 9 +++-- src/v/pandaproxy/probe.h | 14 ++++---- src/v/rpc/rpc_compiler.py | 2 +- src/v/rpc/rpc_server.cc | 3 +- src/v/rpc/rpc_server.h | 2 +- src/v/rpc/types.h | 12 ++++--- src/v/utils/log_hist.h | 1 + 22 files changed, 73 insertions(+), 51 deletions(-) diff --git a/src/v/cloud_storage/partition_probe.h b/src/v/cloud_storage/partition_probe.h index 00d0cc180e793..fbf2eb14d6c9e 100644 --- a/src/v/cloud_storage/partition_probe.h +++ b/src/v/cloud_storage/partition_probe.h @@ -11,6 +11,7 @@ #pragma once #include "model/fundamental.h" +#include "utils/log_hist.h" #include @@ -18,6 +19,8 @@ namespace cloud_storage { class partition_probe { public: + using hist_t = log_hist_internal; + explicit partition_probe(const model::ntp& ntp); void add_bytes_read(uint64_t read) { _bytes_read += read; } diff --git a/src/v/cloud_storage/probe.h b/src/v/cloud_storage/probe.h index bea26bce376b7..65c8c7f4a5a80 100644 --- a/src/v/cloud_storage/probe.h +++ b/src/v/cloud_storage/probe.h @@ -13,6 +13,7 @@ #include "cloud_storage/types.h" #include "model/fundamental.h" #include "seastarx.h" +#include "utils/log_hist.h" #include @@ -25,6 +26,8 @@ class materialized_segments; /// Cloud storage endpoint level probe class remote_probe { public: + using hist_t = log_hist_internal; + explicit remote_probe( remote_metrics_disabled disabled, remote_metrics_disabled public_disabled, diff --git a/src/v/cloud_storage_clients/client_pool.h b/src/v/cloud_storage_clients/client_pool.h index 1caba7808d2a2..44b37b5e67d43 100644 --- a/src/v/cloud_storage_clients/client_pool.h +++ b/src/v/cloud_storage_clients/client_pool.h @@ -12,6 +12,7 @@ #include "cloud_roles/apply_credentials.h" #include "cloud_storage_clients/client.h" +#include "cloud_storage_clients/client_probe.h" #include "utils/gate_guard.h" #include "utils/intrusive_list_helpers.h" diff --git a/src/v/cloud_storage_clients/client_probe.h b/src/v/cloud_storage_clients/client_probe.h index 26fbf73b438a2..8eb4d290437e8 100644 --- a/src/v/cloud_storage_clients/client_probe.h +++ b/src/v/cloud_storage_clients/client_probe.h @@ -18,6 +18,7 @@ #include "model/fundamental.h" #include "net/types.h" #include "ssx/metrics.h" +#include "utils/log_hist.h" #include @@ -44,6 +45,8 @@ enum class op_type_tag { upload, download }; /// time-series. class client_probe : public http::client_probe { public: + using hist_t = log_hist_internal; + /// \brief Probe c-tor for S3 client /// /// \param disable is used to switch the internal monitoring off diff --git a/src/v/cloud_storage_clients/test_client/abs_test_client_main.cc b/src/v/cloud_storage_clients/test_client/abs_test_client_main.cc index 50c167f853a56..763079729bd57 100644 --- a/src/v/cloud_storage_clients/test_client/abs_test_client_main.cc +++ b/src/v/cloud_storage_clients/test_client/abs_test_client_main.cc @@ -14,7 +14,6 @@ #include "http/client.h" #include "seastarx.h" #include "syschecks/syschecks.h" -#include "utils/hdr_hist.h" #include "vlog.h" #include diff --git a/src/v/cloud_storage_clients/test_client/s3_test_client_main.cc b/src/v/cloud_storage_clients/test_client/s3_test_client_main.cc index 36ffeb0fe152e..c0f85e58bc76a 100644 --- a/src/v/cloud_storage_clients/test_client/s3_test_client_main.cc +++ b/src/v/cloud_storage_clients/test_client/s3_test_client_main.cc @@ -15,7 +15,6 @@ #include "http/client.h" #include "seastarx.h" #include "syschecks/syschecks.h" -#include "utils/hdr_hist.h" #include "vlog.h" #include diff --git a/src/v/http/demo/client.cc b/src/v/http/demo/client.cc index 3c8c6198e5746..1b6915ad3d793 100644 --- a/src/v/http/demo/client.cc +++ b/src/v/http/demo/client.cc @@ -13,7 +13,6 @@ #include "rpc/types.h" #include "seastarx.h" #include "syschecks/syschecks.h" -#include "utils/hdr_hist.h" #include "vlog.h" #include diff --git a/src/v/kafka/latency_probe.h b/src/v/kafka/latency_probe.h index 3946475e6cfbc..7c325074d0672 100644 --- a/src/v/kafka/latency_probe.h +++ b/src/v/kafka/latency_probe.h @@ -14,13 +14,22 @@ #include "config/configuration.h" #include "prometheus/prometheus_sanitize.h" #include "ssx/metrics.h" -#include "utils/hdr_hist.h" +#include "utils/log_hist.h" #include namespace kafka { class latency_probe { public: + using hist_t = log_hist_internal; + + latency_probe() = default; + latency_probe(const latency_probe&) = delete; + latency_probe& operator=(const latency_probe&) = delete; + latency_probe(latency_probe&&) = delete; + latency_probe& operator=(latency_probe&&) = delete; + ~latency_probe() = default; + void setup_metrics() { namespace sm = ss::metrics; @@ -38,13 +47,13 @@ class latency_probe { "fetch_latency_us", sm::description("Fetch Latency"), labels, - [this] { return _fetch_latency.seastar_histogram_logform(); }) + [this] { return _fetch_latency.internal_histogram_logform(); }) .aggregate(aggregate_labels), sm::make_histogram( "produce_latency_us", sm::description("Produce Latency"), labels, - [this] { return _produce_latency.seastar_histogram_logform(); }) + [this] { return _produce_latency.internal_histogram_logform(); }) .aggregate(aggregate_labels)}); } @@ -61,32 +70,28 @@ class latency_probe { "request_latency_seconds", sm::description("Internal latency of kafka produce requests"), {ssx::metrics::make_namespaced_label("request")("produce")}, - [this] { - return ssx::metrics::report_default_histogram( - _produce_latency); - }) + [this] { return _produce_latency.public_histogram_logform(); }) .aggregate({sm::shard_label}), sm::make_histogram( "request_latency_seconds", sm::description("Internal latency of kafka consume requests"), {ssx::metrics::make_namespaced_label("request")("consume")}, - [this] { - return ssx::metrics::report_default_histogram(_fetch_latency); - }) + [this] { return _fetch_latency.public_histogram_logform(); }) .aggregate({sm::shard_label}), }); } - std::unique_ptr auto_produce_measurement() { + std::unique_ptr auto_produce_measurement() { return _produce_latency.auto_measure(); } - std::unique_ptr auto_fetch_measurement() { + + std::unique_ptr auto_fetch_measurement() { return _fetch_latency.auto_measure(); } private: - hdr_hist _produce_latency; - hdr_hist _fetch_latency; + hist_t _produce_latency; + hist_t _fetch_latency; ss::metrics::metric_groups _metrics; ss::metrics::metric_groups _public_metrics{ ssx::metrics::public_metrics_handle}; diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 3bcb1e45ca3b1..3b19c6a2a1a6b 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -21,7 +21,6 @@ #include "security/mtls.h" #include "security/sasl_authentication.h" #include "ssx/semaphore.h" -#include "utils/hdr_hist.h" #include "utils/log_hist.h" #include "utils/named_type.h" @@ -102,7 +101,7 @@ struct session_resources { ss::lowres_clock::duration backpressure_delay; ssx::semaphore_units memlocks; ssx::semaphore_units queue_units; - std::unique_ptr method_latency; + std::unique_ptr method_latency; std::unique_ptr handler_latency; std::unique_ptr tracker; request_data request_data; diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 8b1c0c7d0c478..b87dba663462b 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -363,7 +363,7 @@ static void fill_fetch_responses( op_context& octx, std::vector results, std::vector responses, - std::vector> metrics) { + std::vector> metrics) { auto range = boost::irange(0, results.size()); if (unlikely( results.size() != responses.size() @@ -389,7 +389,7 @@ static void fill_fetch_responses( if (unlikely(res.error != error_code::none)) { resp_it->set( make_partition_response_error(res.partition, res.error)); - metric->set_trace(false); + metric->cancel(); continue; } diff --git a/src/v/kafka/server/handlers/fetch.h b/src/v/kafka/server/handlers/fetch.h index 72ce88af8f05c..671559776ef4e 100644 --- a/src/v/kafka/server/handlers/fetch.h +++ b/src/v/kafka/server/handlers/fetch.h @@ -14,6 +14,7 @@ #include "kafka/server/handlers/handler.h" #include "kafka/types.h" #include "utils/intrusive_list_helpers.h" +#include "utils/log_hist.h" namespace kafka { @@ -293,10 +294,12 @@ struct read_result { // struct aggregating fetch requests and corresponding response iterators for // the same shard struct shard_fetch { + using hist_t = log_hist_internal; + void push_back( ntp_fetch_config config, op_context::response_placeholder_ptr r_ph, - std::unique_ptr m) { + std::unique_ptr m) { requests.push_back(std::move(config)); responses.push_back(r_ph); metrics.push_back(std::move(m)); @@ -306,7 +309,7 @@ struct shard_fetch { ss::shard_id shard; std::vector requests; std::vector responses; - std::vector> metrics; + std::vector> metrics; friend std::ostream& operator<<(std::ostream& o, const shard_fetch& sf) { fmt::print(o, "{}", sf.requests); diff --git a/src/v/kafka/server/handlers/produce.cc b/src/v/kafka/server/handlers/produce.cc index 0c9f1e8b9e22f..772d8a1a9a9d2 100644 --- a/src/v/kafka/server/handlers/produce.cc +++ b/src/v/kafka/server/handlers/produce.cc @@ -376,7 +376,7 @@ static partition_produce_stages produce_topic_partition( auto dur = std::chrono::steady_clock::now() - start; octx.rctx.connection()->server().update_produce_latency(dur); } else { - m->set_trace(false); + m->cancel(); } return p; }); diff --git a/src/v/kafka/server/server.h b/src/v/kafka/server/server.h index fc8c89396763f..64117f5eee75c 100644 --- a/src/v/kafka/server/server.h +++ b/src/v/kafka/server/server.h @@ -68,7 +68,7 @@ class server final ~server() noexcept override = default; server(const server&) = delete; server& operator=(const server&) = delete; - server(server&&) noexcept = default; + server(server&&) noexcept = delete; server& operator=(server&&) noexcept = delete; std::string_view name() const final { return "kafka rpc protocol"; } diff --git a/src/v/net/server.cc b/src/v/net/server.cc index 815d906de99ed..5aeb6aea9cba8 100644 --- a/src/v/net/server.cc +++ b/src/v/net/server.cc @@ -331,7 +331,7 @@ void server::setup_metrics() { "{}: Memory consumed by request processing", cfg.name))), sm::make_histogram( "dispatch_handler_latency", - [this] { return _hist.seastar_histogram_logform(); }, + [this] { return _hist.internal_histogram_logform(); }, sm::description(ssx::sformat("{}: Latency ", cfg.name)))}); } @@ -352,7 +352,7 @@ void server::setup_public_metrics() { "latency_seconds", sm::description("RPC latency"), {server_label(server_name)}, - [this] { return ssx::metrics::report_default_histogram(_hist); }) + [this] { return _hist.public_histogram_logform(); }) .aggregate({sm::shard_label})}); } diff --git a/src/v/net/server.h b/src/v/net/server.h index 854de3dc27452..952bc03e508df 100644 --- a/src/v/net/server.h +++ b/src/v/net/server.h @@ -16,8 +16,9 @@ #include "net/connection.h" #include "net/connection_rate.h" #include "net/types.h" +#include "ssx/metrics.h" #include "ssx/semaphore.h" -#include "utils/hdr_hist.h" +#include "utils/log_hist.h" #include #include @@ -104,9 +105,11 @@ struct server_configuration { class server { public: + using hist_t = log_hist_internal; + explicit server(server_configuration, ss::logger&); explicit server(ss::sharded* s, ss::logger&); - server(server&&) noexcept = default; + server(server&&) noexcept = delete; server& operator=(server&&) noexcept = delete; server(const server&) = delete; server& operator=(const server&) = delete; @@ -131,7 +134,7 @@ class server { ss::future<> stop(); const server_configuration cfg; // NOLINT - const hdr_hist& histogram() const { return _hist; } + const hist_t& histogram() const { return _hist; } virtual std::string_view name() const = 0; virtual ss::future<> apply(ss::lw_shared_ptr) = 0; @@ -139,7 +142,7 @@ class server { server_probe& probe() { return _probe; } ssx::semaphore& memory() { return _memory; } ss::gate& conn_gate() { return _conn_gate; } - hdr_hist& hist() { return _hist; } + hist_t& hist() { return _hist; } ss::abort_source& abort_source() { return _as; } bool abort_requested() const { return _as.abort_requested(); } @@ -169,7 +172,7 @@ class server { boost::intrusive::list _connections; ss::abort_source _as; ss::gate _conn_gate; - hdr_hist _hist; + hist_t _hist; server_probe _probe; ss::metrics::metric_groups _metrics; ss::metrics::metric_groups _public_metrics; diff --git a/src/v/pandaproxy/probe.cc b/src/v/pandaproxy/probe.cc index 7b9fdd5c74ee5..b5098d5582e4b 100644 --- a/src/v/pandaproxy/probe.cc +++ b/src/v/pandaproxy/probe.cc @@ -55,7 +55,9 @@ void probe::setup_metrics() { "request_latency", sm::description("Request latency"), labels, - [this] { return _request_metrics.hist().seastar_histogram_logform(); }) + [this] { + return _request_metrics.hist().internal_histogram_logform(); + }) .aggregate(internal_aggregate_labels)}); } @@ -82,10 +84,7 @@ void probe::setup_public_metrics() { sm::description( ssx::sformat("Internal latency of request for {}", _group_name)), labels, - [this] { - return ssx::metrics::report_default_histogram( - _request_metrics.hist()); - }) + [this] { return _request_metrics.hist().public_histogram_logform(); }) .aggregate(aggregate_labels), sm::make_counter( diff --git a/src/v/pandaproxy/probe.h b/src/v/pandaproxy/probe.h index fb2e7300de0ef..09505ef8ff88b 100644 --- a/src/v/pandaproxy/probe.h +++ b/src/v/pandaproxy/probe.h @@ -11,7 +11,8 @@ #pragma once -#include "utils/hdr_hist.h" +#include "seastarx.h" +#include "utils/log_hist.h" #include #include @@ -22,10 +23,11 @@ namespace pandaproxy { /// If the request is good, measure latency, otherwise record the error. class http_status_metric { public: + using hist_t = log_hist_internal; class measurement { public: measurement( - http_status_metric* p, std::unique_ptr m) + http_status_metric* p, std::unique_ptr m) : _p(p) , _m(std::move(m)) {} @@ -41,17 +43,17 @@ class http_status_metric { } else { ++_p->_5xx_count; } - _m->set_trace(false); + _m->cancel(); } private: http_status_metric* _p; - std::unique_ptr _m; + std::unique_ptr _m; }; - hdr_hist& hist() { return _hist; } + hist_t& hist() { return _hist; } auto auto_measure() { return measurement{this, _hist.auto_measure()}; } - hdr_hist _hist; + hist_t _hist; int64_t _5xx_count{0}; int64_t _4xx_count{0}; int64_t _3xx_count{0}; diff --git a/src/v/rpc/rpc_compiler.py b/src/v/rpc/rpc_compiler.py index 9191a0ca147dc..8b971df2d97dc 100755 --- a/src/v/rpc/rpc_compiler.py +++ b/src/v/rpc/rpc_compiler.py @@ -96,7 +96,7 @@ class failure_probes; prometheus_sanitize::metrics_name("internal_rpc"), {sm::make_histogram( "latency", - [this] { return _methods[{{loop.index-1}}].probes.latency_hist().seastar_histogram_logform(); }, + [this] { return _methods[{{loop.index-1}}].probes.latency_hist().internal_histogram_logform(); }, sm::description("Internal RPC service latency"), labels) .aggregate(aggregate_labels)}); diff --git a/src/v/rpc/rpc_server.cc b/src/v/rpc/rpc_server.cc index 3493625b13142..4c875ea164f95 100644 --- a/src/v/rpc/rpc_server.cc +++ b/src/v/rpc/rpc_server.cc @@ -260,7 +260,8 @@ ss::future<> rpc_server::dispatch_method_once( } return send_reply(ctx, std::move(reply_buf)) .finally([m, l = std::move(l)]() mutable { - m->probes.latency_hist().record(std::move(l)); + m->probes.latency_hist().record( + l->compute_total_latency().count()); }); }); }) diff --git a/src/v/rpc/rpc_server.h b/src/v/rpc/rpc_server.h index 14d5b39a95a77..a34fcfcf9c89d 100644 --- a/src/v/rpc/rpc_server.h +++ b/src/v/rpc/rpc_server.h @@ -28,7 +28,7 @@ class rpc_server : public net::server { explicit rpc_server(ss::sharded* s) : net::server(s, rpclog) {} - rpc_server(rpc_server&&) noexcept = default; + rpc_server(rpc_server&&) noexcept = delete; ~rpc_server() = default; rpc_server& operator=(rpc_server&&) noexcept = delete; diff --git a/src/v/rpc/types.h b/src/v/rpc/types.h index ea28994fdf3fb..c35875af54368 100644 --- a/src/v/rpc/types.h +++ b/src/v/rpc/types.h @@ -18,7 +18,7 @@ #include "outcome.h" #include "seastarx.h" #include "ssx/semaphore.h" -#include "utils/hdr_hist.h" +#include "utils/log_hist.h" #include #include @@ -406,12 +406,14 @@ inline void netbuf::set_min_compression_bytes(size_t min) { class method_probes { public: - hdr_hist& latency_hist() { return _latency_hist; } - const hdr_hist& latency_hist() const { return _latency_hist; } + using hist_t = log_hist_internal; + + hist_t& latency_hist() { return _latency_hist; } + const hist_t& latency_hist() const { return _latency_hist; } private: - // roughly 2024 bytes - hdr_hist _latency_hist{120s, 1ms}; + // roughly 208 bytes + hist_t _latency_hist; }; /// \brief most method implementations will be codegenerated diff --git a/src/v/utils/log_hist.h b/src/v/utils/log_hist.h index 6fb3272de524b..abc436f4392f4 100644 --- a/src/v/utils/log_hist.h +++ b/src/v/utils/log_hist.h @@ -18,6 +18,7 @@ #include #include #include +#include /* * A histogram implementation