Skip to content

Commit 625c020

Browse files
Merge pull request #13398 from vbotbuildovich/backport-pr-13296-v23.2.x-815
[v23.2.x] Show true partition count in cluster health API
2 parents 5f7c06a + 9ff4249 commit 625c020

10 files changed

+495
-35
lines changed

src/v/cluster/health_monitor_backend.cc

+68-34
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#include <seastar/core/with_timeout.hh>
4343
#include <seastar/util/log.hh>
4444

45+
#include <absl/container/node_hash_map.h>
4546
#include <absl/container/node_hash_set.h>
4647
#include <fmt/format.h>
4748
#include <fmt/ranges.h>
@@ -785,6 +786,62 @@ health_monitor_backend::get_node_drain_status(
785786
co_return it->second.drain_status;
786787
}
787788

789+
health_monitor_backend::aggregated_report
790+
health_monitor_backend::aggregate_reports(report_cache_t& reports) {
791+
struct collector {
792+
absl::node_hash_set<model::ntp> to_ntp_set() const {
793+
absl::node_hash_set<model::ntp> ret;
794+
for (const auto& [topic, parts] : t_to_p) {
795+
for (auto part : parts) {
796+
ret.emplace(topic.ns, topic.tp, part);
797+
if (
798+
ret.size() == aggregated_report::max_partitions_report) {
799+
return ret;
800+
}
801+
}
802+
}
803+
return ret;
804+
}
805+
806+
size_t count() const {
807+
size_t sum = 0;
808+
for (const auto& [_, parts] : t_to_p) {
809+
sum += parts.size();
810+
}
811+
return sum;
812+
}
813+
814+
absl::node_hash_map<
815+
model::topic_namespace,
816+
absl::node_hash_set<model::partition_id>>
817+
t_to_p;
818+
};
819+
820+
collector leaderless, urp;
821+
822+
for (const auto& [_, report] : reports) {
823+
for (const auto& [tp_ns, partitions] : report.topics) {
824+
auto& leaderless_this_topic = leaderless.t_to_p[tp_ns];
825+
auto& urp_this_topic = urp.t_to_p[tp_ns];
826+
827+
for (const auto& partition : partitions) {
828+
if (!partition.leader_id.has_value()) {
829+
leaderless_this_topic.emplace(partition.id);
830+
}
831+
if (partition.under_replicated_replicas.value_or(0) > 0) {
832+
urp_this_topic.emplace(partition.id);
833+
}
834+
}
835+
}
836+
}
837+
838+
return {
839+
.leaderless = leaderless.to_ntp_set(),
840+
.under_replicated = urp.to_ntp_set(),
841+
.leaderless_count = leaderless.count(),
842+
.under_replicated_count = urp.count()};
843+
}
844+
788845
ss::future<cluster_health_overview>
789846
health_monitor_backend::get_cluster_health_overview(
790847
model::timeout_clock::time_point deadline) {
@@ -809,41 +866,18 @@ health_monitor_backend::get_cluster_health_overview(
809866
std::sort(ret.all_nodes.begin(), ret.all_nodes.end());
810867
std::sort(ret.nodes_down.begin(), ret.nodes_down.end());
811868

812-
// The size of the health status must be bounded: if all partitions
813-
// on a system with 50k partitions are under-replicated, it is not helpful
814-
// to try and cram all 50k NTPs into a vector here.
815-
size_t max_partitions_report = 128;
869+
auto aggr_report = aggregate_reports(_reports);
816870

817-
absl::node_hash_set<model::ntp> leaderless;
818-
absl::node_hash_set<model::ntp> under_replicated;
871+
auto move_into = [](auto& dest, auto& src) {
872+
dest.reserve(src.size());
873+
std::move(src.begin(), src.end(), std::back_inserter(dest));
874+
};
819875

820-
for (const auto& [_, report] : _reports) {
821-
for (const auto& [tp_ns, partitions] : report.topics) {
822-
for (const auto& partition : partitions) {
823-
if (
824-
!partition.leader_id.has_value()
825-
&& leaderless.size() < max_partitions_report) {
826-
leaderless.emplace(tp_ns.ns, tp_ns.tp, partition.id);
827-
}
828-
if (
829-
partition.under_replicated_replicas.value_or(0) > 0
830-
&& under_replicated.size() < max_partitions_report) {
831-
under_replicated.emplace(tp_ns.ns, tp_ns.tp, partition.id);
832-
}
833-
}
834-
}
835-
}
836-
ret.leaderless_partitions.reserve(leaderless.size());
837-
std::move(
838-
leaderless.begin(),
839-
leaderless.end(),
840-
std::back_inserter(ret.leaderless_partitions));
876+
move_into(ret.leaderless_partitions, aggr_report.leaderless);
877+
move_into(ret.under_replicated_partitions, aggr_report.under_replicated);
841878

842-
ret.under_replicated_partitions.reserve(under_replicated.size());
843-
std::move(
844-
under_replicated.begin(),
845-
under_replicated.end(),
846-
std::back_inserter(ret.under_replicated_partitions));
879+
ret.leaderless_count = aggr_report.leaderless_count;
880+
ret.under_replicated_count = aggr_report.under_replicated_count;
847881

848882
ret.controller_id = _raft0->get_leader_id();
849883

@@ -857,8 +891,8 @@ health_monitor_backend::get_cluster_health_overview(
857891
ret.unhealthy_reasons.emplace_back("leaderless_partitions");
858892
}
859893

860-
// cluster is not healthy if some partitions have fewer replicas than their
861-
// configured amount
894+
// cluster is not healthy if some partitions have fewer replicas than
895+
// their configured amount
862896
if (!ret.under_replicated_partitions.empty()) {
863897
ret.unhealthy_reasons.emplace_back("under_replicated_partitions");
864898
}

src/v/cluster/health_monitor_backend.h

+30
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,34 @@ class health_monitor_backend {
144144
void on_leadership_changed(
145145
raft::group_id, model::term_id, std::optional<model::node_id>);
146146

147+
/**
148+
* @brief Stucture holding the aggregated results of partition status.
149+
*/
150+
struct aggregated_report {
151+
// The size of the health status must be bounded: if all partitions
152+
// on a system with 50k partitions are under-replicated, it is not
153+
// helpful to try and cram all 50k NTPs into a vector here.
154+
static constexpr size_t max_partitions_report = 128;
155+
156+
/**
157+
* List of leaderless or under-replicated ntps reported by any node.
158+
* The size of either list is capped at max_partitions_report, and
159+
* other elements are dropped.
160+
*/
161+
absl::node_hash_set<model::ntp> leaderless, under_replicated;
162+
163+
/**
164+
* The true count of leaderless and under-replicated partitions, not
165+
* capped at max_partitions_report, and truncation of above the sets
166+
* can be detected when the size is larger than the corresponding set.
167+
*/
168+
size_t leaderless_count{}, under_replicated_count{};
169+
170+
bool operator==(const aggregated_report&) const = default;
171+
};
172+
173+
static aggregated_report aggregate_reports(report_cache_t& reports);
174+
147175
ss::lw_shared_ptr<raft::consensus> _raft0;
148176
ss::sharded<members_table>& _members;
149177
ss::sharded<rpc::connection_cache>& _connections;
@@ -173,5 +201,7 @@ class health_monitor_backend {
173201
std::vector<std::pair<cluster::notification_id_type, health_node_cb_t>>
174202
_node_callbacks;
175203
cluster::notification_id_type _next_callback_id{0};
204+
205+
friend struct health_report_accessor;
176206
};
177207
} // namespace cluster

src/v/cluster/health_monitor_types.h

+2
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,9 @@ struct cluster_health_overview {
242242
// subsystem.
243243
std::vector<model::node_id> nodes_down;
244244
std::vector<model::ntp> leaderless_partitions;
245+
size_t leaderless_count{};
245246
std::vector<model::ntp> under_replicated_partitions;
247+
size_t under_replicated_count{};
246248
std::optional<size_t> bytes_in_cloud_storage;
247249
};
248250

src/v/cluster/tests/CMakeLists.txt

+8
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,14 @@ rp_test(
66
LABELS cluster
77
)
88

9+
rp_test(
10+
BENCHMARK_TEST
11+
BINARY_NAME health_report
12+
SOURCES health_bench.cc
13+
LIBRARIES Seastar::seastar_perf_testing v::cluster
14+
LABELS cluster
15+
)
16+
917
set(srcs
1018
partition_allocator_tests.cc
1119
partition_balancer_planner_test.cc

src/v/cluster/tests/health_bench.cc

+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright 2020 Redpanda Data, Inc.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.md
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0
9+
10+
#include "cluster/health_monitor_backend.h"
11+
#include "cluster/health_monitor_types.h"
12+
#include "cluster/tests/health_monitor_test_utils.h"
13+
#include "model/namespace.h"
14+
#include "random/generators.h"
15+
#include "vassert.h"
16+
17+
#include <seastar/testing/perf_tests.hh>
18+
19+
#include <limits>
20+
#include <optional>
21+
22+
namespace cluster {
23+
24+
struct health_bench : health_report_accessor {
25+
using health_report_accessor::aggregated_report;
26+
/**
27+
* @brief The original aggregate function prior to optimization.
28+
*/
29+
template<size_t max_partitions_report>
30+
static aggregated_report original_aggregate(report_cache_t& reports) {
31+
aggregated_report ret;
32+
33+
absl::node_hash_map<
34+
model::topic_namespace,
35+
std::vector<model::partition_id>>
36+
leaderless, urp;
37+
38+
for (const auto& [_, report] : reports) {
39+
for (const auto& [tp_ns, partitions] : report.topics) {
40+
for (const auto& partition : partitions) {
41+
if (
42+
!partition.leader_id.has_value()
43+
&& ret.leaderless.size() < max_partitions_report) {
44+
ret.leaderless.emplace(
45+
tp_ns.ns, tp_ns.tp, partition.id);
46+
}
47+
if (
48+
partition.under_replicated_replicas.value_or(0) > 0
49+
&& ret.under_replicated.size() < max_partitions_report) {
50+
ret.under_replicated.emplace(
51+
tp_ns.ns, tp_ns.tp, partition.id);
52+
}
53+
}
54+
}
55+
}
56+
57+
return ret;
58+
}
59+
60+
void bench(auto aggr_fn) {
61+
using namespace cluster;
62+
63+
constexpr int topic_count = 10;
64+
constexpr int parts_per_topic = 10000;
65+
constexpr int rf = 3;
66+
constexpr int nodes = 32;
67+
68+
// genreate a random health report
69+
absl::node_hash_map<model::node_id, cluster::node_health_report>
70+
reports;
71+
72+
for (int topic = 0; topic < topic_count; topic++) {
73+
std::vector<topic_status> statuses;
74+
for (model::node_id node{0}; node < nodes; node++) {
75+
model::topic_namespace tns{
76+
model::kafka_namespace,
77+
model::topic(fmt::format("topic_{}", topic))};
78+
79+
statuses.emplace_back(topic_status{tns, {}});
80+
}
81+
82+
for (int pid = 0; pid < parts_per_topic; pid++) {
83+
for (int r = 0; r < rf; r++) {
84+
auto nid = model::node_id(
85+
random_generators::get_int(nodes - 1));
86+
partition_status status{
87+
.id{pid},
88+
.leader_id = std::nullopt,
89+
.under_replicated_replicas = 1};
90+
statuses.at(nid).partitions.emplace_back(std::move(status));
91+
}
92+
}
93+
94+
for (model::node_id node{0}; node < nodes; node++) {
95+
reports[node].topics.emplace_back(statuses.at(node));
96+
}
97+
}
98+
99+
perf_tests::start_measuring_time();
100+
auto res = aggr_fn(reports);
101+
perf_tests::stop_measuring_time();
102+
}
103+
};
104+
105+
PERF_TEST_F(health_bench, original) {
106+
bench(original_aggregate<original_limit>);
107+
}
108+
109+
PERF_TEST_F(health_bench, original_unlimited) {
110+
bench(original_aggregate<std::numeric_limits<size_t>::max()>);
111+
}
112+
113+
PERF_TEST_F(health_bench, current) { bench(aggregate); }
114+
115+
} // namespace cluster

0 commit comments

Comments
 (0)