diff --git a/src/v/cloud_storage/partition_probe.h b/src/v/cloud_storage/partition_probe.h index 00d0cc180e793..fbf2eb14d6c9e 100644 --- a/src/v/cloud_storage/partition_probe.h +++ b/src/v/cloud_storage/partition_probe.h @@ -11,6 +11,7 @@ #pragma once #include "model/fundamental.h" +#include "utils/log_hist.h" #include @@ -18,6 +19,8 @@ namespace cloud_storage { class partition_probe { public: + using hist_t = log_hist_internal; + explicit partition_probe(const model::ntp& ntp); void add_bytes_read(uint64_t read) { _bytes_read += read; } diff --git a/src/v/cloud_storage/probe.h b/src/v/cloud_storage/probe.h index bea26bce376b7..65c8c7f4a5a80 100644 --- a/src/v/cloud_storage/probe.h +++ b/src/v/cloud_storage/probe.h @@ -13,6 +13,7 @@ #include "cloud_storage/types.h" #include "model/fundamental.h" #include "seastarx.h" +#include "utils/log_hist.h" #include @@ -25,6 +26,8 @@ class materialized_segments; /// Cloud storage endpoint level probe class remote_probe { public: + using hist_t = log_hist_internal; + explicit remote_probe( remote_metrics_disabled disabled, remote_metrics_disabled public_disabled, diff --git a/src/v/cloud_storage_clients/client_pool.h b/src/v/cloud_storage_clients/client_pool.h index 1caba7808d2a2..44b37b5e67d43 100644 --- a/src/v/cloud_storage_clients/client_pool.h +++ b/src/v/cloud_storage_clients/client_pool.h @@ -12,6 +12,7 @@ #include "cloud_roles/apply_credentials.h" #include "cloud_storage_clients/client.h" +#include "cloud_storage_clients/client_probe.h" #include "utils/gate_guard.h" #include "utils/intrusive_list_helpers.h" diff --git a/src/v/cloud_storage_clients/client_probe.h b/src/v/cloud_storage_clients/client_probe.h index 26fbf73b438a2..8eb4d290437e8 100644 --- a/src/v/cloud_storage_clients/client_probe.h +++ b/src/v/cloud_storage_clients/client_probe.h @@ -18,6 +18,7 @@ #include "model/fundamental.h" #include "net/types.h" #include "ssx/metrics.h" +#include "utils/log_hist.h" #include @@ -44,6 +45,8 @@ enum class op_type_tag { upload, download }; /// time-series. class client_probe : public http::client_probe { public: + using hist_t = log_hist_internal; + /// \brief Probe c-tor for S3 client /// /// \param disable is used to switch the internal monitoring off diff --git a/src/v/cloud_storage_clients/test_client/abs_test_client_main.cc b/src/v/cloud_storage_clients/test_client/abs_test_client_main.cc index 50c167f853a56..763079729bd57 100644 --- a/src/v/cloud_storage_clients/test_client/abs_test_client_main.cc +++ b/src/v/cloud_storage_clients/test_client/abs_test_client_main.cc @@ -14,7 +14,6 @@ #include "http/client.h" #include "seastarx.h" #include "syschecks/syschecks.h" -#include "utils/hdr_hist.h" #include "vlog.h" #include diff --git a/src/v/cloud_storage_clients/test_client/s3_test_client_main.cc b/src/v/cloud_storage_clients/test_client/s3_test_client_main.cc index 36ffeb0fe152e..c0f85e58bc76a 100644 --- a/src/v/cloud_storage_clients/test_client/s3_test_client_main.cc +++ b/src/v/cloud_storage_clients/test_client/s3_test_client_main.cc @@ -15,7 +15,6 @@ #include "http/client.h" #include "seastarx.h" #include "syschecks/syschecks.h" -#include "utils/hdr_hist.h" #include "vlog.h" #include diff --git a/src/v/http/demo/client.cc b/src/v/http/demo/client.cc index 3c8c6198e5746..1b6915ad3d793 100644 --- a/src/v/http/demo/client.cc +++ b/src/v/http/demo/client.cc @@ -13,7 +13,6 @@ #include "rpc/types.h" #include "seastarx.h" #include "syschecks/syschecks.h" -#include "utils/hdr_hist.h" #include "vlog.h" #include diff --git a/src/v/kafka/CMakeLists.txt b/src/v/kafka/CMakeLists.txt index 46e87f0318637..5e7c2e82bc2d9 100644 --- a/src/v/kafka/CMakeLists.txt +++ b/src/v/kafka/CMakeLists.txt @@ -25,6 +25,7 @@ set(handlers_srcs server/handlers/topics/topic_utils.cc server/handlers/describe_producers.cc server/handlers/describe_transactions.cc + server/handlers/handler_probe.cc server/handlers/list_transactions.cc ) diff --git a/src/v/kafka/latency_probe.h b/src/v/kafka/latency_probe.h index 3946475e6cfbc..7c325074d0672 100644 --- a/src/v/kafka/latency_probe.h +++ b/src/v/kafka/latency_probe.h @@ -14,13 +14,22 @@ #include "config/configuration.h" #include "prometheus/prometheus_sanitize.h" #include "ssx/metrics.h" -#include "utils/hdr_hist.h" +#include "utils/log_hist.h" #include namespace kafka { class latency_probe { public: + using hist_t = log_hist_internal; + + latency_probe() = default; + latency_probe(const latency_probe&) = delete; + latency_probe& operator=(const latency_probe&) = delete; + latency_probe(latency_probe&&) = delete; + latency_probe& operator=(latency_probe&&) = delete; + ~latency_probe() = default; + void setup_metrics() { namespace sm = ss::metrics; @@ -38,13 +47,13 @@ class latency_probe { "fetch_latency_us", sm::description("Fetch Latency"), labels, - [this] { return _fetch_latency.seastar_histogram_logform(); }) + [this] { return _fetch_latency.internal_histogram_logform(); }) .aggregate(aggregate_labels), sm::make_histogram( "produce_latency_us", sm::description("Produce Latency"), labels, - [this] { return _produce_latency.seastar_histogram_logform(); }) + [this] { return _produce_latency.internal_histogram_logform(); }) .aggregate(aggregate_labels)}); } @@ -61,32 +70,28 @@ class latency_probe { "request_latency_seconds", sm::description("Internal latency of kafka produce requests"), {ssx::metrics::make_namespaced_label("request")("produce")}, - [this] { - return ssx::metrics::report_default_histogram( - _produce_latency); - }) + [this] { return _produce_latency.public_histogram_logform(); }) .aggregate({sm::shard_label}), sm::make_histogram( "request_latency_seconds", sm::description("Internal latency of kafka consume requests"), {ssx::metrics::make_namespaced_label("request")("consume")}, - [this] { - return ssx::metrics::report_default_histogram(_fetch_latency); - }) + [this] { return _fetch_latency.public_histogram_logform(); }) .aggregate({sm::shard_label}), }); } - std::unique_ptr auto_produce_measurement() { + std::unique_ptr auto_produce_measurement() { return _produce_latency.auto_measure(); } - std::unique_ptr auto_fetch_measurement() { + + std::unique_ptr auto_fetch_measurement() { return _fetch_latency.auto_measure(); } private: - hdr_hist _produce_latency; - hdr_hist _fetch_latency; + hist_t _produce_latency; + hist_t _fetch_latency; ss::metrics::metric_groups _metrics; ss::metrics::metric_groups _public_metrics{ ssx::metrics::public_metrics_handle}; diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index 28949f06c2028..1b9b8224cc1e4 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -118,6 +118,7 @@ ss::future<> connection_context::process_one_request() { _server.probe().header_corrupted(); co_return; } + _server.handler_probe(h->key).add_bytes_received(sz.value()); try { co_return co_await dispatch_method_once( @@ -286,7 +287,8 @@ ss::future connection_context::throttle_request( request_data r_data = request_data{ .request_key = hdr.key, .client_id = ss::sstring{hdr.client_id.value_or("")}}; - auto tracker = std::make_unique(_server.probe()); + auto& h_probe = _server.handler_probe(r_data.request_key); + auto tracker = std::make_unique(_server.probe(), h_probe); auto fut = ss::now(); if (delay.enforce > delay_t::clock::duration::zero()) { fut = ss::sleep_abortable(delay.enforce, _server.abort_source()); @@ -300,15 +302,16 @@ ss::future connection_context::throttle_request( r_data = std::move(r_data), delay = delay.request, track, - tracker = std::move(tracker)](ssx::semaphore_units units) mutable { + tracker = std::move(tracker), + &h_probe](ssx::semaphore_units units) mutable { return server().get_request_unit().then( [this, r_data = std::move(r_data), delay, mem_units = std::move(units), track, - tracker = std::move(tracker)]( - ssx::semaphore_units qd_units) mutable { + tracker = std::move(tracker), + &h_probe](ssx::semaphore_units qd_units) mutable { session_resources r{ .backpressure_delay = delay, .memlocks = std::move(mem_units), @@ -318,6 +321,7 @@ ss::future connection_context::throttle_request( if (track) { r.method_latency = _server.hist().auto_measure(); } + r.handler_latency = h_probe.auto_latency_measurement(); return r; }); }); @@ -422,8 +426,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { seq, correlation, self, - sres = std::move(sres)]( - ss::future<> d) mutable { + sres](ss::future<> d) mutable { /* * if the dispatch/first stage failed, then we need to * need to consume the second stage since it might be @@ -440,9 +443,8 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { "Discarding second stage failure {}", e); }) - .finally([self, d = std::move(d)]() mutable { - self->_server.probe().service_error(); - self->_server.probe().request_completed(); + .finally([self, d = std::move(d), sres]() mutable { + sres->tracker->mark_errored(); return std::move(d); }); } @@ -454,7 +456,7 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { _server.conn_gate(), [this, f = std::move(f), - sres = std::move(sres), + sres, seq, correlation]() mutable { return f.then([this, @@ -469,40 +471,41 @@ connection_context::dispatch_method_once(request_header hdr, size_t size) { return maybe_process_responses(); }); }) - .handle_exception([self](std::exception_ptr e) { - // ssx::spawn_with_gate already caught - // shutdown-like exceptions, so we should only - // be taking this path for real errors. That - // also means that on shutdown we don't bother - // to call shutdown_input on the connection, so - // rely on any future reader to check the abort - // source before considering reading the - // connection. - - auto disconnected - = net::is_disconnect_exception(e); - if (disconnected) { - vlog( - klog.info, - "Disconnected {} ({})", - self->conn->addr, - disconnected.value()); - } else { - vlog( - klog.warn, - "Error processing request: {}", - e); - } - - self->_server.probe().service_error(); - self->conn->shutdown_input(); - }); + .handle_exception( + [self, sres](std::exception_ptr e) { + // ssx::spawn_with_gate already caught + // shutdown-like exceptions, so we should only + // be taking this path for real errors. That + // also means that on shutdown we don't bother + // to call shutdown_input on the connection, + // so rely on any future reader to check the + // abort source before considering reading the + // connection. + auto disconnected + = net::is_disconnect_exception(e); + if (disconnected) { + vlog( + klog.info, + "Disconnected {} ({})", + self->conn->addr, + disconnected.value()); + } else { + vlog( + klog.warn, + "Error processing request: {}", + e); + } + + sres->tracker->mark_errored(); + self->conn->shutdown_input(); + }); return d; }) - .handle_exception([self](std::exception_ptr e) { + .handle_exception([self, sres](std::exception_ptr e) { vlog( klog.info, "Detected error dispatching request: {}", e); self->conn->shutdown_input(); + sres->tracker->mark_errored(); }); }); }) @@ -566,6 +569,7 @@ ss::future<> connection_context::maybe_process_responses() { *_snc_quota_context, response_size); } } + _server.handler_probe(request_key).add_bytes_sent(response_size); try { return conn->write(std::move(msg)) .then([] { @@ -574,8 +578,9 @@ ss::future<> connection_context::maybe_process_responses() { }) // release the resources only once it has been written to the // connection. - .finally([resources = std::move(resp_and_res.resources)] {}); + .finally([resources = resp_and_res.resources] {}); } catch (...) { + resp_and_res.resources->tracker->mark_errored(); vlog( klog.debug, "Failed to process request: {}", diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 778c5779f2d23..3b19c6a2a1a6b 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -10,6 +10,7 @@ */ #pragma once #include "config/property.h" +#include "kafka/server/handlers/handler_probe.h" #include "kafka/server/response.h" #include "kafka/server/server.h" #include "kafka/types.h" @@ -20,7 +21,7 @@ #include "security/mtls.h" #include "security/sasl_authentication.h" #include "ssx/semaphore.h" -#include "utils/hdr_hist.h" +#include "utils/log_hist.h" #include "utils/named_type.h" #include @@ -52,19 +53,34 @@ class request_context; // used to track number of pending requests class request_tracker { public: - explicit request_tracker(net::server_probe& probe) noexcept - : _probe(probe) { + explicit request_tracker( + net::server_probe& probe, handler_probe& h_probe) noexcept + : _probe(probe) + , _h_probe(h_probe) { _probe.request_received(); + _h_probe.request_started(); } request_tracker(const request_tracker&) = delete; request_tracker(request_tracker&&) = delete; request_tracker& operator=(const request_tracker&) = delete; request_tracker& operator=(request_tracker&&) = delete; - ~request_tracker() noexcept { _probe.request_completed(); } + void mark_errored() { _errored = true; } + + ~request_tracker() noexcept { + _probe.request_completed(); + if (_errored) { + _h_probe.request_errored(); + _probe.service_error(); + } else { + _h_probe.request_completed(); + } + } private: net::server_probe& _probe; + handler_probe& _h_probe; + bool _errored{false}; }; struct request_data { @@ -85,7 +101,8 @@ struct session_resources { ss::lowres_clock::duration backpressure_delay; ssx::semaphore_units memlocks; ssx::semaphore_units queue_units; - std::unique_ptr method_latency; + std::unique_ptr method_latency; + std::unique_ptr handler_latency; std::unique_ptr tracker; request_data request_data; }; diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index 8b1c0c7d0c478..b87dba663462b 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -363,7 +363,7 @@ static void fill_fetch_responses( op_context& octx, std::vector results, std::vector responses, - std::vector> metrics) { + std::vector> metrics) { auto range = boost::irange(0, results.size()); if (unlikely( results.size() != responses.size() @@ -389,7 +389,7 @@ static void fill_fetch_responses( if (unlikely(res.error != error_code::none)) { resp_it->set( make_partition_response_error(res.partition, res.error)); - metric->set_trace(false); + metric->cancel(); continue; } diff --git a/src/v/kafka/server/handlers/fetch.h b/src/v/kafka/server/handlers/fetch.h index 72ce88af8f05c..671559776ef4e 100644 --- a/src/v/kafka/server/handlers/fetch.h +++ b/src/v/kafka/server/handlers/fetch.h @@ -14,6 +14,7 @@ #include "kafka/server/handlers/handler.h" #include "kafka/types.h" #include "utils/intrusive_list_helpers.h" +#include "utils/log_hist.h" namespace kafka { @@ -293,10 +294,12 @@ struct read_result { // struct aggregating fetch requests and corresponding response iterators for // the same shard struct shard_fetch { + using hist_t = log_hist_internal; + void push_back( ntp_fetch_config config, op_context::response_placeholder_ptr r_ph, - std::unique_ptr m) { + std::unique_ptr m) { requests.push_back(std::move(config)); responses.push_back(r_ph); metrics.push_back(std::move(m)); @@ -306,7 +309,7 @@ struct shard_fetch { ss::shard_id shard; std::vector requests; std::vector responses; - std::vector> metrics; + std::vector> metrics; friend std::ostream& operator<<(std::ostream& o, const shard_fetch& sf) { fmt::print(o, "{}", sf.requests); diff --git a/src/v/kafka/server/handlers/handler_probe.cc b/src/v/kafka/server/handlers/handler_probe.cc new file mode 100644 index 0000000000000..b682b9f49b9fe --- /dev/null +++ b/src/v/kafka/server/handlers/handler_probe.cc @@ -0,0 +1,129 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "kafka/server/handlers/handler_probe.h" + +#include "config/configuration.h" +#include "kafka/server/handlers/handler_interface.h" +#include "kafka/server/logger.h" +#include "prometheus/prometheus_sanitize.h" + +#include +#include +#include + +#include +#include + +namespace kafka { + +handler_probe_manager::handler_probe_manager() + : _metrics() + , _probes(max_api_key() + 2) { + const auto unknown_handler_key = max_api_key() + 1; + for (size_t i = 0; i < _probes.size(); i++) { + auto key = api_key{i}; + + if (handler_for_key(key) || i == unknown_handler_key) { + _probes[i].setup_metrics(_metrics, key); + } + } +} + +handler_probe& handler_probe_manager::get_probe(api_key key) { + if (!handler_for_key(key)) { + return _probes.back(); + } + + return _probes[key]; +} + +handler_probe::handler_probe() + : _last_recorded_in_progress(ss::lowres_clock::now()) {} + +void handler_probe::setup_metrics( + ss::metrics::metric_groups& metrics, api_key key) { + namespace sm = ss::metrics; + + if (config::shard_local_cfg().disable_metrics()) { + return; + } + + const char* handler_name; + if (auto handler = handler_for_key(key)) { + handler_name = handler.value()->name(); + } else { + handler_name = "unknown_handler"; + } + + std::vector labels{sm::label("handler")(handler_name)}; + auto aggregate_labels = config::shard_local_cfg().aggregate_metrics() + ? std::vector{sm::shard_label} + : std::vector{}; + + metrics.add_group( + prometheus_sanitize::metrics_name("kafka_handler"), + { + sm::make_counter( + "requests_completed_total", + [this] { return _requests_completed; }, + sm::description("Number of kafka requests completed"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "requests_errored_total", + [this] { return _requests_errored; }, + sm::description("Number of kafka requests errored"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "requests_in_progress_total", + [this] { return _requests_in_progress_every_ns / 1'000'000'000; }, + sm::description("A running total of kafka requests in progress"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "received_bytes_total", + [this] { return _bytes_received; }, + sm::description("Number of bytes received from kafka requests"), + labels) + .aggregate(aggregate_labels), + sm::make_counter( + "sent_bytes_total", + [this] { return _bytes_sent; }, + sm::description("Number of bytes sent in kafka replies"), + labels) + .aggregate(aggregate_labels), + sm::make_histogram( + "latency_microseconds", + sm::description("Latency histogram of kafka requests"), + labels, + [this] { return _latency.internal_histogram_logform(); }) + .aggregate(aggregate_labels), + }); +} + +/* + * This roughly approximates an integral of `_requests_in_progress`. + * By providing Prometheus with a counter of the integral rather than + * a gauge for `_requests_in_progress` we avoid any bias in the value + * that Prometheus's sampling rate could introduce. + */ +void handler_probe::sample_in_progress() { + auto now = ss::lowres_clock::now(); + auto s_diff = (now - _last_recorded_in_progress) + / std::chrono::nanoseconds(1); + + _requests_in_progress_every_ns += _requests_in_progress * s_diff; + _last_recorded_in_progress = now; +} + +} // namespace kafka diff --git a/src/v/kafka/server/handlers/handler_probe.h b/src/v/kafka/server/handlers/handler_probe.h new file mode 100644 index 0000000000000..e85e681f75155 --- /dev/null +++ b/src/v/kafka/server/handlers/handler_probe.h @@ -0,0 +1,94 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "kafka/protocol/types.h" +#include "ssx/metrics.h" +#include "utils/log_hist.h" + +#include + +namespace kafka { + +/** + * Stores per-handler metrics for kafka requests. + * And exposes them to the internal metrics endpoint. + */ +class handler_probe { +public: + using hist_t = log_hist_internal; + + explicit handler_probe(); + void setup_metrics(ss::metrics::metric_groups&, api_key); + + void sample_in_progress(); + void request_completed() { + sample_in_progress(); + + _requests_completed++; + _requests_in_progress--; + } + void request_errored() { + sample_in_progress(); + + _requests_errored++; + _requests_in_progress--; + } + void request_started() { + sample_in_progress(); + + _requests_in_progress++; + } + + void add_bytes_received(size_t bytes) { _bytes_received += bytes; } + + void add_bytes_sent(size_t bytes) { _bytes_sent += bytes; } + + std::unique_ptr auto_latency_measurement() { + return _latency.auto_measure(); + } + +private: + uint64_t _requests_completed{0}; + uint64_t _requests_errored{0}; + + uint64_t _requests_in_progress{0}; + uint64_t _requests_in_progress_every_ns{0}; + + ss::lowres_clock::time_point _last_recorded_in_progress; + + uint64_t _bytes_received{0}; + uint64_t _bytes_sent{0}; + + hist_t _latency{}; +}; + +/** + * Maps Kafka api keys to the `handler_probe` instance + * specific to that key. + */ +class handler_probe_manager { +public: + handler_probe_manager(); + /** + * Maps an `api_key` to the metrics probe it's associate with. + * If the `api_key` isn't valid a probe to an `unknown_handler` + * is returned instead. + */ + handler_probe& get_probe(api_key key); + +private: + ss::metrics::metric_groups _metrics; + std::vector _probes; +}; + +} // namespace kafka diff --git a/src/v/kafka/server/handlers/produce.cc b/src/v/kafka/server/handlers/produce.cc index 0c9f1e8b9e22f..772d8a1a9a9d2 100644 --- a/src/v/kafka/server/handlers/produce.cc +++ b/src/v/kafka/server/handlers/produce.cc @@ -376,7 +376,7 @@ static partition_produce_stages produce_topic_partition( auto dur = std::chrono::steady_clock::now() - start; octx.rctx.connection()->server().update_produce_latency(dur); } else { - m->set_trace(false); + m->cancel(); } return p; }); diff --git a/src/v/kafka/server/server.h b/src/v/kafka/server/server.h index 652470b31d5c6..64117f5eee75c 100644 --- a/src/v/kafka/server/server.h +++ b/src/v/kafka/server/server.h @@ -16,8 +16,10 @@ #include "coproc/fwd.h" #include "features/feature_table.h" #include "kafka/latency_probe.h" +#include "kafka/protocol/types.h" #include "kafka/server/fetch_metadata_cache.hh" #include "kafka/server/fwd.h" +#include "kafka/server/handlers/handler_probe.h" #include "kafka/server/queue_depth_monitor.h" #include "net/server.h" #include "security/fwd.h" @@ -66,7 +68,7 @@ class server final ~server() noexcept override = default; server(const server&) = delete; server& operator=(const server&) = delete; - server(server&&) noexcept = default; + server(server&&) noexcept = delete; server& operator=(server&&) noexcept = delete; std::string_view name() const final { return "kafka rpc protocol"; } @@ -171,6 +173,10 @@ class server final ssx::semaphore& memory_fetch_sem() noexcept { return _memory_fetch_sem; } + handler_probe& handler_probe(api_key key) { + return _handler_probes.get_probe(key); + } + private: void setup_metrics(); @@ -202,6 +208,7 @@ class server final security::gssapi_principal_mapper _gssapi_principal_mapper; security::krb5::configurator _krb_configurator; ssx::semaphore _memory_fetch_sem; + handler_probe_manager _handler_probes; class latency_probe _probe; ss::metrics::metric_groups _metrics; diff --git a/src/v/net/server.cc b/src/v/net/server.cc index 815d906de99ed..5aeb6aea9cba8 100644 --- a/src/v/net/server.cc +++ b/src/v/net/server.cc @@ -331,7 +331,7 @@ void server::setup_metrics() { "{}: Memory consumed by request processing", cfg.name))), sm::make_histogram( "dispatch_handler_latency", - [this] { return _hist.seastar_histogram_logform(); }, + [this] { return _hist.internal_histogram_logform(); }, sm::description(ssx::sformat("{}: Latency ", cfg.name)))}); } @@ -352,7 +352,7 @@ void server::setup_public_metrics() { "latency_seconds", sm::description("RPC latency"), {server_label(server_name)}, - [this] { return ssx::metrics::report_default_histogram(_hist); }) + [this] { return _hist.public_histogram_logform(); }) .aggregate({sm::shard_label})}); } diff --git a/src/v/net/server.h b/src/v/net/server.h index 854de3dc27452..952bc03e508df 100644 --- a/src/v/net/server.h +++ b/src/v/net/server.h @@ -16,8 +16,9 @@ #include "net/connection.h" #include "net/connection_rate.h" #include "net/types.h" +#include "ssx/metrics.h" #include "ssx/semaphore.h" -#include "utils/hdr_hist.h" +#include "utils/log_hist.h" #include #include @@ -104,9 +105,11 @@ struct server_configuration { class server { public: + using hist_t = log_hist_internal; + explicit server(server_configuration, ss::logger&); explicit server(ss::sharded* s, ss::logger&); - server(server&&) noexcept = default; + server(server&&) noexcept = delete; server& operator=(server&&) noexcept = delete; server(const server&) = delete; server& operator=(const server&) = delete; @@ -131,7 +134,7 @@ class server { ss::future<> stop(); const server_configuration cfg; // NOLINT - const hdr_hist& histogram() const { return _hist; } + const hist_t& histogram() const { return _hist; } virtual std::string_view name() const = 0; virtual ss::future<> apply(ss::lw_shared_ptr) = 0; @@ -139,7 +142,7 @@ class server { server_probe& probe() { return _probe; } ssx::semaphore& memory() { return _memory; } ss::gate& conn_gate() { return _conn_gate; } - hdr_hist& hist() { return _hist; } + hist_t& hist() { return _hist; } ss::abort_source& abort_source() { return _as; } bool abort_requested() const { return _as.abort_requested(); } @@ -169,7 +172,7 @@ class server { boost::intrusive::list _connections; ss::abort_source _as; ss::gate _conn_gate; - hdr_hist _hist; + hist_t _hist; server_probe _probe; ss::metrics::metric_groups _metrics; ss::metrics::metric_groups _public_metrics; diff --git a/src/v/pandaproxy/probe.cc b/src/v/pandaproxy/probe.cc index 7b9fdd5c74ee5..b5098d5582e4b 100644 --- a/src/v/pandaproxy/probe.cc +++ b/src/v/pandaproxy/probe.cc @@ -55,7 +55,9 @@ void probe::setup_metrics() { "request_latency", sm::description("Request latency"), labels, - [this] { return _request_metrics.hist().seastar_histogram_logform(); }) + [this] { + return _request_metrics.hist().internal_histogram_logform(); + }) .aggregate(internal_aggregate_labels)}); } @@ -82,10 +84,7 @@ void probe::setup_public_metrics() { sm::description( ssx::sformat("Internal latency of request for {}", _group_name)), labels, - [this] { - return ssx::metrics::report_default_histogram( - _request_metrics.hist()); - }) + [this] { return _request_metrics.hist().public_histogram_logform(); }) .aggregate(aggregate_labels), sm::make_counter( diff --git a/src/v/pandaproxy/probe.h b/src/v/pandaproxy/probe.h index fb2e7300de0ef..09505ef8ff88b 100644 --- a/src/v/pandaproxy/probe.h +++ b/src/v/pandaproxy/probe.h @@ -11,7 +11,8 @@ #pragma once -#include "utils/hdr_hist.h" +#include "seastarx.h" +#include "utils/log_hist.h" #include #include @@ -22,10 +23,11 @@ namespace pandaproxy { /// If the request is good, measure latency, otherwise record the error. class http_status_metric { public: + using hist_t = log_hist_internal; class measurement { public: measurement( - http_status_metric* p, std::unique_ptr m) + http_status_metric* p, std::unique_ptr m) : _p(p) , _m(std::move(m)) {} @@ -41,17 +43,17 @@ class http_status_metric { } else { ++_p->_5xx_count; } - _m->set_trace(false); + _m->cancel(); } private: http_status_metric* _p; - std::unique_ptr _m; + std::unique_ptr _m; }; - hdr_hist& hist() { return _hist; } + hist_t& hist() { return _hist; } auto auto_measure() { return measurement{this, _hist.auto_measure()}; } - hdr_hist _hist; + hist_t _hist; int64_t _5xx_count{0}; int64_t _4xx_count{0}; int64_t _3xx_count{0}; diff --git a/src/v/rpc/rpc_compiler.py b/src/v/rpc/rpc_compiler.py index 9191a0ca147dc..8b971df2d97dc 100755 --- a/src/v/rpc/rpc_compiler.py +++ b/src/v/rpc/rpc_compiler.py @@ -96,7 +96,7 @@ class failure_probes; prometheus_sanitize::metrics_name("internal_rpc"), {sm::make_histogram( "latency", - [this] { return _methods[{{loop.index-1}}].probes.latency_hist().seastar_histogram_logform(); }, + [this] { return _methods[{{loop.index-1}}].probes.latency_hist().internal_histogram_logform(); }, sm::description("Internal RPC service latency"), labels) .aggregate(aggregate_labels)}); diff --git a/src/v/rpc/rpc_server.cc b/src/v/rpc/rpc_server.cc index 3493625b13142..4c875ea164f95 100644 --- a/src/v/rpc/rpc_server.cc +++ b/src/v/rpc/rpc_server.cc @@ -260,7 +260,8 @@ ss::future<> rpc_server::dispatch_method_once( } return send_reply(ctx, std::move(reply_buf)) .finally([m, l = std::move(l)]() mutable { - m->probes.latency_hist().record(std::move(l)); + m->probes.latency_hist().record( + l->compute_total_latency().count()); }); }); }) diff --git a/src/v/rpc/rpc_server.h b/src/v/rpc/rpc_server.h index 14d5b39a95a77..a34fcfcf9c89d 100644 --- a/src/v/rpc/rpc_server.h +++ b/src/v/rpc/rpc_server.h @@ -28,7 +28,7 @@ class rpc_server : public net::server { explicit rpc_server(ss::sharded* s) : net::server(s, rpclog) {} - rpc_server(rpc_server&&) noexcept = default; + rpc_server(rpc_server&&) noexcept = delete; ~rpc_server() = default; rpc_server& operator=(rpc_server&&) noexcept = delete; diff --git a/src/v/rpc/types.h b/src/v/rpc/types.h index ea28994fdf3fb..c35875af54368 100644 --- a/src/v/rpc/types.h +++ b/src/v/rpc/types.h @@ -18,7 +18,7 @@ #include "outcome.h" #include "seastarx.h" #include "ssx/semaphore.h" -#include "utils/hdr_hist.h" +#include "utils/log_hist.h" #include #include @@ -406,12 +406,14 @@ inline void netbuf::set_min_compression_bytes(size_t min) { class method_probes { public: - hdr_hist& latency_hist() { return _latency_hist; } - const hdr_hist& latency_hist() const { return _latency_hist; } + using hist_t = log_hist_internal; + + hist_t& latency_hist() { return _latency_hist; } + const hist_t& latency_hist() const { return _latency_hist; } private: - // roughly 2024 bytes - hdr_hist _latency_hist{120s, 1ms}; + // roughly 208 bytes + hist_t _latency_hist; }; /// \brief most method implementations will be codegenerated diff --git a/src/v/utils/CMakeLists.txt b/src/v/utils/CMakeLists.txt index a6167c0cdf419..47e8352f3b4f7 100644 --- a/src/v/utils/CMakeLists.txt +++ b/src/v/utils/CMakeLists.txt @@ -14,6 +14,7 @@ v_cc_library( uuid.cc request_auth.cc bottomless_token_bucket.cc + log_hist.cc DEPS Seastar::seastar Hdrhistogram::hdr_histogram diff --git a/src/v/utils/log_hist.cc b/src/v/utils/log_hist.cc new file mode 100644 index 0000000000000..3cdf0bdadc9f6 --- /dev/null +++ b/src/v/utils/log_hist.cc @@ -0,0 +1,98 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "utils/log_hist.h" + +template< + typename duration_t, + int number_of_buckets, + uint64_t first_bucket_upper_bound> +template +seastar::metrics::histogram +log_hist:: + seastar_histogram_logform() const { + seastar::metrics::histogram hist; + hist.buckets.resize(cfg::bucket_count); + hist.sample_sum = static_cast(_sample_sum) + / static_cast(cfg::scale); + + const unsigned first_bucket_exp + = 64 - std::countl_zero(first_bucket_upper_bound - 1); + const unsigned cfg_first_bucket_exp + = 64 - std::countl_zero(cfg::first_bucket_bound - 1); + + // Write bounds to seastar histogram + for (int i = 0; i < cfg::bucket_count; i++) { + auto& bucket = hist.buckets[i]; + bucket.count = 0; + + uint64_t unscaled_upper_bound = ((uint64_t)1 + << (cfg_first_bucket_exp + i)) + - 1; + bucket.upper_bound = static_cast(unscaled_upper_bound) + / static_cast(cfg::scale); + } + + uint64_t cumulative_count = 0; + size_t current_hist_idx = 0; + double current_hist_upper_bound = hist.buckets[0].upper_bound; + + // Write _counts to seastar histogram + for (size_t i = 0; i < _counts.size(); i++) { + uint64_t unscaled_upper_bound = ((uint64_t)1 << (first_bucket_exp + i)) + - 1; + double scaled_upper_bound = static_cast(unscaled_upper_bound) + / static_cast(cfg::scale); + + cumulative_count += _counts[i]; + + while (scaled_upper_bound > current_hist_upper_bound + && current_hist_idx != (hist.buckets.size() - 1)) { + current_hist_idx++; + current_hist_upper_bound + = hist.buckets[current_hist_idx].upper_bound; + } + + hist.buckets[current_hist_idx].count = cumulative_count; + } + + hist.sample_count = cumulative_count; + return hist; +} + +template< + typename duration_t, + int number_of_buckets, + uint64_t first_bucket_upper_bound> +seastar::metrics::histogram +log_hist:: + public_histogram_logform() const { + using public_hist_config = logform_config<1'000'000l, 256ul, 18>; + + return seastar_histogram_logform(); +} + +template< + typename duration_t, + int number_of_buckets, + uint64_t first_bucket_upper_bound> +seastar::metrics::histogram +log_hist:: + internal_histogram_logform() const { + using internal_hist_config = logform_config<1l, 8ul, 26>; + + return seastar_histogram_logform(); +} + +// Explicit instantiation for log_hist_public +template class log_hist; +// Explicit instantiation for log_hist_internal +template class log_hist; diff --git a/src/v/utils/log_hist.h b/src/v/utils/log_hist.h new file mode 100644 index 0000000000000..abc436f4392f4 --- /dev/null +++ b/src/v/utils/log_hist.h @@ -0,0 +1,216 @@ +/* + * Copyright 2023 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include +#include + +#include +#include +#include +#include +#include + +/* + * A histogram implementation + * The buckets upper bounds are powers of 2 minus 1. + * `first_bucket_upper_bound` therefore must be a power of 2. + * The number of values represented by each bucket increases by powers 2 as + * well. + * + * Assume `number_of_buckets` is 4 and `first_bucket_upper_bound` is 16 the + * bucket value ranges are; + * + * [1, 16), [16, 32), [32, 64), [64, 128) + * + * And if 1, 16, 32, and 33 are recorded the buckets will have the following + * counts; + * + * [1, 16) = 1 + * [16, 32) = 1 + * [32, 64) = 2 + * [64, 128) = 0 + */ +template< + typename duration_t, + int number_of_buckets, + uint64_t first_bucket_upper_bound> +class log_hist { + static_assert( + first_bucket_upper_bound >= 1 + && (first_bucket_upper_bound & (first_bucket_upper_bound - 1)) == 0, + "first bucket bound must be power of 2"); + + using measurement_canary_t = seastar::lw_shared_ptr; + +public: + static constexpr int first_bucket_clz = std::countl_zero( + first_bucket_upper_bound - 1); + static constexpr int first_bucket_exp = 64 - first_bucket_clz; + + using clock_type = std::chrono::high_resolution_clock; + + /// \brief move-only type to tracking durations + /// if the log_hist ptr goes out of scope, it will detach itself + /// and the recording will simply be ignored. + class measurement { + public: + explicit measurement(log_hist& h) + : _canary(h._canary) + , _h(std::ref(h)) + , _begin_t(log_hist::clock_type::now()) + , _total_latency(duration_t(0)) {} + measurement(const measurement&) = delete; + measurement& operator=(const measurement&) = delete; + measurement(measurement&& o) noexcept + : _canary(o._canary) + , _h(o._h) + , _begin_t(o._begin_t) + , _total_latency(o._total_latency) { + o.cancel(); + } + measurement& operator=(measurement&& o) noexcept { + if (this != &o) { + this->~measurement(); + new (this) measurement(std::move(o)); + } + return *this; + } + ~measurement() noexcept { + if (_canary && *_canary) { + _h.get().record(compute_total_latency().count()); + } + } + + // Cancels this measurements and prevents any values from + // being recorded to the underlying histogram. + void cancel() { _canary = nullptr; } + + // Temporarily stops measuring latency. + void stop() { + _total_latency = compute_total_latency(); + _begin_t = std::nullopt; + } + + // Resumes measuring latency. + void start() { + if (!_begin_t.has_value()) { + _begin_t = log_hist::clock_type::now(); + } + } + + // Returns the total latency that has been measured so far. + duration_t compute_total_latency() const { + if (_begin_t) { + return _total_latency + + std::chrono::duration_cast( + log_hist::clock_type::now() - *_begin_t); + } else { + return _total_latency; + } + } + + private: + measurement_canary_t _canary; + std::reference_wrapper _h; + std::optional _begin_t; + duration_t _total_latency; + }; + + std::unique_ptr auto_measure() { + return std::make_unique(*this); + } + + log_hist() + : _canary(seastar::make_lw_shared(true)) + , _counts() {} + log_hist(const log_hist& o) = delete; + log_hist& operator=(const log_hist&) = delete; + log_hist(log_hist&& o) = delete; + log_hist& operator=(log_hist&& o) = delete; + ~log_hist() { + // Notify any active measurements that this object no longer exists. + *_canary = false; + } + + /* + * record expects values of that are equivalent to `duration_t::count()` + * so make sure the input is scaled correctly. + */ + void record(uint64_t val) { + _sample_sum += val; + const unsigned i = std::clamp( + first_bucket_clz - std::countl_zero(val), + 0, + static_cast(_counts.size() - 1)); + _counts[i]++; + } + + template + struct logform_config { + static constexpr auto bound_is_pow_2 = _first_bucket_bound >= 1 + && (_first_bucket_bound + & (_first_bucket_bound - 1)) + == 0; + static_assert( + bound_is_pow_2, "_first_bucket_bound must be a power of 2"); + + static constexpr auto scale = _scale; + static constexpr auto first_bucket_bound = _first_bucket_bound; + static constexpr auto bucket_count = _bucket_count; + }; + + template + seastar::metrics::histogram seastar_histogram_logform() const; + /* + * Generates a Prometheus histogram with 18 buckets. The first bucket has an + * upper bound of 256 - 1 and subsequent buckets have an upper bound of 2 + * times the upper bound of the previous bucket. + * + * This is the histogram type used in the `/public_metrics` endpoint + */ + seastar::metrics::histogram public_histogram_logform() const; + /* + * Generates a Prometheus histogram with 26 buckets. The first bucket has an + * upper bound of 8 - 1 and subsequent buckets have an upper bound of 2 + * times the upper bound of the previous bucket. + * + * This is the histogram type used in the `/metrics` endpoint + */ + seastar::metrics::histogram internal_histogram_logform() const; + +private: + friend measurement; + + // Used to inform measurements whether `log_hist` has been destroyed + measurement_canary_t _canary; + + std::array _counts; + uint64_t _sample_sum{0}; +}; + +/* + * This histogram produces indentical results as the public metric's `hdr_hist`. + * So if this histogram and `hdr_hist` are create and have the same values + * recorded to them then `log_hist_public::seastar_histogram_logform(1000000)` + * will produce the same seastar histogram as + * `ssx::metrics::report_default_histogram(hdr_hist)`. + */ +using log_hist_public = log_hist; + +/* + * This histogram produces results that are similar, but not indentical to the + * internal metric's `hdr_hist`. Some of the first buckets will have the + * following bounds; [log_hist_internal upper bounds, internal hdr_hist upper + * bounds] [8, 10], [16, 20], [32, 41], [64, 83], [128, 167], [256, 335] + */ +using log_hist_internal = log_hist; diff --git a/src/v/utils/tests/seastar_histogram_test.cc b/src/v/utils/tests/seastar_histogram_test.cc index 3c92f0a387ed2..937c1ce88ab88 100644 --- a/src/v/utils/tests/seastar_histogram_test.cc +++ b/src/v/utils/tests/seastar_histogram_test.cc @@ -1,7 +1,16 @@ +#include "ssx/metrics.h" #include "utils/hdr_hist.h" +#include "utils/log_hist.h" +#include #include +#include + +#include +#include +#include + SEASTAR_THREAD_TEST_CASE(test_seastar_histograms_match) { using namespace std::chrono_literals; @@ -20,3 +29,168 @@ SEASTAR_THREAD_TEST_CASE(test_seastar_histograms_match) { logform_b.buckets[idx].upper_bound); } } + +namespace { +bool approximately_equal(double a, double b) { + constexpr double precision_error = 0.0001; + return std::abs(a - b) <= precision_error; +} + +template +void validate_public_histograms_equal(const hdr_hist& a, const l_hist& b) { + const auto logform_a = ssx::metrics::report_default_histogram(a); + const auto logform_b = b.public_histogram_logform(); + + BOOST_CHECK_EQUAL(logform_a.sample_count, logform_b.sample_count); + BOOST_CHECK( + approximately_equal(logform_a.sample_sum, logform_b.sample_sum)); + + for (size_t idx = 0; idx < logform_a.buckets.size(); ++idx) { + BOOST_CHECK(approximately_equal( + logform_a.buckets[idx].upper_bound, + logform_b.buckets[idx].upper_bound)); + BOOST_CHECK_EQUAL( + logform_a.buckets[idx].count, logform_b.buckets[idx].count); + } +} +} // namespace + +// ensures both the log_hist_public and the public hdr_hist return identical +// seastar histograms for values recorded around bucket bounds. +SEASTAR_THREAD_TEST_CASE(test_public_log_hist_and_hdr_hist_equal_bounds) { + hdr_hist a; + log_hist_public b; + + a.record(1); + b.record(1); + + for (unsigned i = 0; i < 17; i++) { + auto upper_bound + = (((unsigned)1 << (log_hist_public::first_bucket_exp + i)) - 1); + a.record(upper_bound); + a.record(upper_bound + 1); + b.record(upper_bound); + b.record(upper_bound + 1); + } + + validate_public_histograms_equal(a, b); +} + +// ensures both the log_hist_public and the public hdr_hist return identical +// seastar histograms for randomly selected values. +SEASTAR_THREAD_TEST_CASE(test_public_log_hist_and_hdr_hist_equal_rand) { + hdr_hist a; + log_hist_public b; + + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution d(1, (1 << (8 + 17)) - 1); + + for (unsigned i = 0; i < 1'000'000; i++) { + auto sample = d(gen); + a.record(sample); + b.record(sample); + } + + validate_public_histograms_equal(a, b); +} + +// Ensures that an internal histogram is properly converted to a public metrics +// histogram. +SEASTAR_THREAD_TEST_CASE(test_internal_hist_to_public_hist_bounds) { + hdr_hist a; + log_hist_internal b; + + a.record(1); + b.record(1); + + for (unsigned i = 0; i < 17; i++) { + auto upper_bound + = (((unsigned)1 << (log_hist_internal::first_bucket_exp + i)) - 1); + a.record(upper_bound); + a.record(upper_bound + 1); + b.record(upper_bound); + b.record(upper_bound + 1); + } + + validate_public_histograms_equal(a, b); +} + +// Ensures that generating a internal seastar histogram from log_hist_public +// results in the additional buckets for the extended lower bounds having counts +// of zero. +SEASTAR_THREAD_TEST_CASE(test_public_hist_to_internal_hist) { + log_hist_public a; + log_hist_internal b; + + a.record(1); + b.record(1); + + for (unsigned i = 0; i < 17; i++) { + auto upper_bound + = (((unsigned)1 << (log_hist_internal::first_bucket_exp + i)) - 1); + a.record(upper_bound); + a.record(upper_bound + 1); + b.record(upper_bound); + b.record(upper_bound + 1); + } + + auto pub_to_int_hist = a.internal_histogram_logform(); + auto int_to_int_hist = b.internal_histogram_logform(); + + const auto public_ub_exp = 8; + const auto internal_ub_exp = 3; + + // The buckets in the extended lower bounds should be empty + for (int i = 0; i < public_ub_exp - internal_ub_exp; i++) { + BOOST_CHECK_EQUAL(pub_to_int_hist.buckets[i].count, 0); + BOOST_CHECK_NE(int_to_int_hist.buckets[i].count, 0); + } +} + +SEASTAR_THREAD_TEST_CASE(test_log_hist_measure) { + log_hist_internal a; + + { + auto m1 = a.auto_measure(); + ss::sleep(std::chrono::microseconds(1)).get(); + auto m2 = a.auto_measure(); + ss::sleep(std::chrono::microseconds(1)).get(); + } + + auto hist = a.internal_histogram_logform(); + BOOST_CHECK_EQUAL(hist.buckets.back().count, 2); +} + +SEASTAR_THREAD_TEST_CASE(test_log_hist_measure_pause) { + using namespace std::chrono_literals; + + log_hist_internal a; + + { + auto m1 = a.auto_measure(); + auto m2 = a.auto_measure(); + + m1->stop(); + m2->stop(); + + auto m1_dur = m1->compute_total_latency(); + auto m2_dur = m2->compute_total_latency(); + + ss::sleep(std::chrono::microseconds(1)).get(); + + BOOST_CHECK_EQUAL((m1->compute_total_latency() - m1_dur).count(), 0); + BOOST_CHECK_EQUAL((m2->compute_total_latency() - m2_dur).count(), 0); + + m1->start(); + m2->start(); + + ss::sleep(std::chrono::microseconds(1)).get(); + + BOOST_CHECK_GT((m1->compute_total_latency() - m1_dur).count(), 1); + BOOST_CHECK_GT((m2->compute_total_latency() - m2_dur).count(), 1); + } + + auto hist = a.internal_histogram_logform(); + BOOST_CHECK_EQUAL(hist.buckets.back().count, 2); +}