Skip to content

Commit 52f9abc

Browse files
committed
prometheus: coroutinize metrics writing
Coroutinize and de-boilerplate up the prometheus metrics writing path. We do not couroutinze the innermost loop, as this is using ss::async and currently relies on that. This is a precursor to performance improvements on this path. This change is more or less performance neutral: there is a slight "fixed cost" win for the body writing, but this doesn't matter much in the larger context since the per-series overhead dominates and there is no change in that case (for this change).
1 parent cb3482e commit 52f9abc

File tree

3 files changed

+72
-41
lines changed

3 files changed

+72
-41
lines changed

include/seastar/core/prometheus.hh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ namespace details {
6060
using filter_t = std::function<bool(const metrics::impl::labels_type&)>;
6161

6262
class test_access {
63-
future<> write_body(config cfg, sstring metric_family_name, bool prefix, bool show_help, bool enable_aggregation, filter_t filter, output_stream<char>&& s);
63+
future<> write_body(config cfg, bool use_protobuf_format, sstring metric_family_name, bool prefix, bool show_help, bool enable_aggregation, filter_t filter, output_stream<char>&& s);
6464

6565
friend struct metrics_perf_fixture;
6666
};

src/core/prometheus.cc

Lines changed: 70 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
#include <sstream>
2929

3030
#include <seastar/core/metrics_api.hh>
31+
#include <seastar/core/future.hh>
32+
#include <seastar/core/iostream.hh>
3133
#include <seastar/core/scollectd.hh>
3234
#include <seastar/http/function_handlers.hh>
3335
#include <boost/algorithm/string/replace.hpp>
@@ -401,18 +403,19 @@ class metrics_families_per_shard {
401403
/** @} */
402404
};
403405

404-
static future<> get_map_value(metrics_families_per_shard& vec) {
406+
static future<metrics_families_per_shard> get_map_value() {
407+
metrics_families_per_shard vec;
405408
vec.resize(smp::count);
406-
return parallel_for_each(std::views::iota(0u, smp::count), [&vec] (auto cpu) {
409+
co_await parallel_for_each(std::views::iota(0u, smp::count), [&vec] (auto cpu) {
407410
return smp::submit_to(cpu, [] {
408411
return mi::get_values();
409412
}).then([&vec, cpu] (auto res) {
410413
vec[cpu] = std::move(res);
411414
});
412415
});
416+
co_return vec;
413417
}
414418

415-
416419
/*!
417420
* \brief a facade class for metric family
418421
*/
@@ -762,23 +765,41 @@ void write_value_as_string(std::stringstream& s, const mi::metric_value& value)
762765
}
763766
}
764767

765-
future<> write_text_representation(output_stream<char>& out, const config& ctx, const metric_family_range& m, bool show_help, bool enable_aggregation, std::function<bool(const mi::labels_type&)> filter) {
766-
return seastar::async([&ctx, &out, &m, show_help, enable_aggregation, filter] () mutable {
767-
bool found = false;
768+
struct write_body_args {
769+
details::filter_t filter;
770+
sstring metric_family_name;
771+
bool use_protobuf_format;
772+
bool use_prefix;
773+
bool show_help;
774+
bool enable_aggregation;
775+
};
776+
777+
struct write_context {
778+
output_stream<char>& out;
779+
const config& ctx;
780+
const metric_family_range m;
781+
const write_body_args args;
782+
783+
future<> write_text_representation();
784+
future<> write_protobuf_representation();
785+
};
786+
787+
future<> write_context::write_text_representation() {
788+
return seastar::async([this] {
768789
std::stringstream s;
769790
for (metric_family& metric_family : m) {
770791
auto name = ctx.prefix + "_" + metric_family.name();
771-
found = false;
792+
bool found = false;
772793
metric_aggregate_by_labels aggregated_values(metric_family.metadata().aggregate_labels);
773-
bool should_aggregate = enable_aggregation && !metric_family.metadata().aggregate_labels.empty();
774-
metric_family.foreach_metric([&s, &out, &ctx, &found, &name, &metric_family, &aggregated_values, should_aggregate, show_help, &filter](const mi::metric_value& value, const mi::metric_series_metadata& value_info) mutable {
794+
bool should_aggregate = args.enable_aggregation && !metric_family.metadata().aggregate_labels.empty();
795+
metric_family.foreach_metric([this, &s, &found, &name, &metric_family, &aggregated_values, should_aggregate](const mi::metric_value& value, const mi::metric_series_metadata& value_info) mutable {
775796
s.clear();
776797
s.str("");
777-
if ((value_info.should_skip_when_empty() && value.is_empty()) || !filter(value_info.labels())) {
798+
if ((value_info.should_skip_when_empty() && value.is_empty()) || !args.filter(value_info.labels())) {
778799
return;
779800
}
780801
if (!found) {
781-
if (show_help && metric_family.metadata().d.str() != "") {
802+
if (args.show_help && metric_family.metadata().d.str() != "") {
782803
s << "# HELP " << name << " " << metric_family.metadata().d.str() << '\n';
783804
}
784805
s << "# TYPE " << name << " " << metric_family.metadata().type << '\n';
@@ -817,19 +838,19 @@ future<> write_text_representation(output_stream<char>& out, const config& ctx,
817838
});
818839
}
819840

820-
future<> write_protobuf_representation(output_stream<char>& out, const config& ctx, metric_family_range& m, bool enable_aggregation, std::function<bool(const mi::labels_type&)> filter) {
821-
return do_for_each(m, [&ctx, &out, enable_aggregation, filter=std::move(filter)](metric_family& metric_family) mutable {
841+
future<> write_context::write_protobuf_representation() {
842+
return do_for_each(m, [this](metric_family& metric_family) mutable {
822843
std::string s;
823844
google::protobuf::io::StringOutputStream os(&s);
824845
metric_aggregate_by_labels aggregated_values(metric_family.metadata().aggregate_labels);
825-
bool should_aggregate = enable_aggregation && !metric_family.metadata().aggregate_labels.empty();
846+
bool should_aggregate = args.enable_aggregation && !metric_family.metadata().aggregate_labels.empty();
826847
auto& name = metric_family.name();
827848
pm::MetricFamily mtf;
828849
bool empty_metric = true;
829850
mtf.set_name(fmt::format("{}_{}", ctx.prefix, name));
830851
mtf.mutable_metric()->Reserve(metric_family.size());
831-
metric_family.foreach_metric([&mtf, &ctx, &filter, &aggregated_values, &empty_metric, should_aggregate](const auto& value, const auto& value_info) {
832-
if ((value_info.should_skip_when_empty() && value.is_empty()) || !filter(value_info.labels())) {
852+
metric_family.foreach_metric([this, &mtf, &aggregated_values, &empty_metric, should_aggregate](const auto& value, const auto& value_info) {
853+
if ((value_info.should_skip_when_empty() && value.is_empty()) || !args.filter(value_info.labels())) {
833854
return;
834855
}
835856
if (should_aggregate) {
@@ -921,41 +942,51 @@ class metrics_handler : public httpd::handler_base {
921942

922943
future<std::unique_ptr<http::reply>> handle(const sstring& path,
923944
std::unique_ptr<http::request> req, std::unique_ptr<http::reply> rep) override {
924-
auto is_protobuf_format = _ctx.allow_protobuf && is_accept_protobuf(req->get_header("Accept"));
925-
sstring metric_family_name = req->get_query_param("__name__");
926-
bool prefix = trim_asterisk(metric_family_name);
927-
bool show_help = req->get_query_param("__help__") != "false";
928-
bool enable_aggregation = req->get_query_param("__aggregate__") != "false";
929-
std::function<bool(const mi::labels_type&)> filter = make_filter(*req);
930-
rep->write_body(is_protobuf_format ? "proto" : "txt", [this, is_protobuf_format, metric_family_name, prefix, show_help, enable_aggregation, filter](output_stream<char>&& s) {
931-
return write_body(is_protobuf_format, metric_family_name, prefix, show_help, enable_aggregation, filter, std::move(s));
945+
write_body_args args{
946+
.filter = make_filter(*req),
947+
.metric_family_name = req->get_query_param("__name__"),
948+
.use_protobuf_format = _ctx.allow_protobuf && is_accept_protobuf(req->get_header("Accept")),
949+
.use_prefix = trim_asterisk(args.metric_family_name),
950+
.show_help = req->get_query_param("__help__") != "false",
951+
.enable_aggregation = req->get_query_param("__aggregate__") != "false"
952+
};
953+
rep->write_body(args.use_protobuf_format ? "proto" : "txt", [this, args = std::move(args)](output_stream<char>&& s) {
954+
return write_body(std::move(args), std::move(s));
932955
});
933956
return make_ready_future<std::unique_ptr<http::reply>>(std::move(rep));
934957
}
935958

936959
private:
937-
future<> write_body(bool is_protobuf_format, sstring metric_family_name, bool prefix, bool show_help, bool enable_aggregation, details::filter_t filter, output_stream<char>&& s) {
938-
return do_with(metrics_families_per_shard(), output_stream<char>(std::move(s)),
939-
[this, is_protobuf_format, prefix, &metric_family_name, show_help, enable_aggregation, filter] (metrics_families_per_shard& families, output_stream<char>& s) mutable {
940-
return get_map_value(families).then([&s, &families, this, is_protobuf_format, prefix, metric_family_name, show_help, enable_aggregation, filter]() mutable {
941-
return do_with(get_range(families, metric_family_name, prefix),
942-
[&s, this, is_protobuf_format, show_help, enable_aggregation, filter](metric_family_range& m) {
943-
return (is_protobuf_format) ? write_protobuf_representation(s, _ctx, m, enable_aggregation, filter) :
944-
write_text_representation(s, _ctx, m, show_help, enable_aggregation, filter);
945-
});
946-
}).finally([&s] () mutable {
947-
return s.close();
948-
});
949-
});
960+
961+
future<> write_body(write_body_args args, output_stream<char>&& out_stream) {
962+
auto s = std::move(out_stream);
963+
auto families = co_await get_map_value();
964+
965+
write_context context{
966+
.out = s,
967+
.ctx = _ctx,
968+
.m = get_range(families, args.metric_family_name, args.use_prefix),
969+
.args = std::move(args)
970+
};
971+
972+
co_return co_await (args.use_protobuf_format ? context.write_protobuf_representation() : context.write_text_representation())
973+
.finally([&s] { return s.close(); });
950974
}
951975

952976
friend details::test_access;
953977
};
954978

955-
future<> details::test_access::write_body(config cfg, sstring metric_family_name, bool prefix, bool show_help, bool enable_aggregation, filter_t filter, output_stream<char>&& s) {
979+
future<> details::test_access::write_body(config cfg, bool use_protobuf_format, sstring metric_family_name, bool prefix, bool show_help, bool enable_aggregation, filter_t filter, output_stream<char>&& s) {
956980
metrics_handler handler(std::move(cfg));
957981

958-
co_return co_await handler.write_body(cfg.allow_protobuf, metric_family_name, prefix, show_help, enable_aggregation, filter, std::move(s));
982+
co_return co_await handler.write_body({
983+
.filter = filter,
984+
.metric_family_name = metric_family_name,
985+
.use_protobuf_format = use_protobuf_format,
986+
.use_prefix = prefix,
987+
.show_help = show_help,
988+
.enable_aggregation = enable_aggregation
989+
}, std::move(s));
959990
}
960991

961992
std::function<bool(const mi::labels_type&)> metrics_handler::_true_function = [](const mi::labels_type&) {

tests/perf/metrics_perf.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ struct metrics_perf_fixture {
128128
perf_tests::start_measuring_time();
129129
for (auto _ : boost::irange(iterations)) {
130130
output_stream<char> out{counting_data_sink{}};
131-
co_await access{}.write_body(config, "", false, true, false, always_true, std::move(out));
131+
co_await access{}.write_body(config, false, "", false, true, false, always_true, std::move(out));
132132
}
133133
perf_tests::stop_measuring_time();
134134

0 commit comments

Comments
 (0)