Skip to content

Conversation

BenPope
Copy link
Member

@BenPope BenPope commented Sep 24, 2025

Reimplement lag metrics from health report

The health report now contains HWM for each partition, which means we don't have to fetch them from every node, we can ask the controller.

Also optimise the number of requests from one per group coordinator (broker-shards) to one per broker. by gathering all required HWM from all shards.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v25.2.x
  • v25.1.x
  • v24.3.x

Release Notes

  • none

This reduces the number of requests to the controller from one
per-shard to one per broker.

Signed-off-by: Ben Pope <[email protected]>
@BenPope BenPope requested review from a team and pgellert and removed request for a team September 24, 2025 20:19
@BenPope BenPope requested a review from a team as a code owner September 24, 2025 20:19
@Copilot Copilot AI review requested due to automatic review settings September 24, 2025 20:19
@BenPope BenPope self-assigned this Sep 24, 2025
@BenPope BenPope changed the title Core 13298 consumer group lag health report [CORE-13298] Reimplement consumer_lag from health_report Sep 24, 2025
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

The purpose of this PR is to reimplement the consumer group lag metrics functionality to use the health monitor system instead of a custom RPC service. This optimization reduces the number of requests from one per group coordinator (broker-shards) to one per broker by leveraging health report data that contains high watermark (HWM) information.

Key changes:

  • Remove the entire consumer group lag metrics RPC infrastructure (frontend, service, types)
  • Integrate lag collection directly into group_manager using health monitor data
  • Update test timing to account for both lag collection and health monitor intervals

Reviewed Changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/v/redpanda/application.h Remove consumer group lag metrics frontend member variable
src/v/redpanda/application.cc Remove construction and initialization of lag metrics frontend and service
src/v/kafka/server/group_manager.h Replace lag metrics frontend dependency with health monitor frontend
src/v/kafka/server/group_manager.cc Reimplement lag collection using health monitor reports instead of RPC calls
src/v/kafka/server/fwd.h Remove forward declarations for removed classes
src/v/kafka/server/consumer_group_lag_metrics_* Complete removal of custom RPC infrastructure files
src/v/kafka/server/BUILD Remove build rules for deleted RPC service
src/v/config/configuration.cc Update configuration documentation to mention health monitor dependency
tests/rptest/tests/consumer_group_test.py Update test timing and configuration for new health monitor integration

{
"enable_consumer_group_metrics": ["group", "partition", "consumer_lag"],
"consumer_group_lag_collection_interval_sec": lag_collection_interval,
"health_monitor_max_metadata_age": lag_collection_interval,
Copy link
Preview

Copilot AI Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The health_monitor_max_metadata_age should be set independently of lag_collection_interval. These are different concepts - one controls metadata freshness and the other controls lag collection frequency. Consider using a separate variable or explaining why they should be equal.

Copilot uses AI. Check for mistakes.

@vbotbuildovich
Copy link
Collaborator

CI test results

test results on build#72887
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
MasterTestSuite test_chunk_dl_with_random_http_errors unit https://buildkite.com/redpanda/redpanda/builds/72887#01997d62-5d7e-4114-8601-56a8065114a9 FAIL 0/1
EndToEndCloudTopicsTest test_delete_records null integration https://buildkite.com/redpanda/redpanda/builds/72887#01997d8c-e903-4159-a542-be0490bad7ce FLAKY 15/21 upstream reliability is '78.67647058823529'. current run reliability is '71.42857142857143'. drift is 7.2479 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=EndToEndCloudTopicsTest&test_method=test_delete_records
EndToEndCloudTopicsTxTest test_write null integration https://buildkite.com/redpanda/redpanda/builds/72887#01997d8c-e8f5-4523-b2fe-a99ed55f6647 FLAKY 11/21 upstream reliability is '82.70509977827051'. current run reliability is '52.38095238095239'. drift is 30.32415 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=EndToEndCloudTopicsTxTest&test_method=test_write
DisablingPartitionsTest test_disable null integration https://buildkite.com/redpanda/redpanda/builds/72887#01997d8c-e905-48e9-b68f-a4983d7ebb44 FLAKY 12/21 upstream reliability is '86.79060665362034'. current run reliability is '57.14285714285714'. drift is 29.64775 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=DisablingPartitionsTest&test_method=test_disable
TieredStorageTest test_tiered_storage {"cloud_storage_type_and_url_style": [1, "path"], "test_case": {"name": "(TS_Read == True, TS_Timequery == True)"}} integration https://buildkite.com/redpanda/redpanda/builds/72887#01997d8c-e8fd-4b60-8c78-c7ecfea2b0d3 FAIL 0/1 https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=TieredStorageTest&test_method=test_tiered_storage

Copy link
Contributor

@pgellert pgellert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great

Comment on lines 2215 to 2229
for (auto& [node_id, filter] : requests) {
auto report_r = co_await _hm_frontend.local().get_cluster_health(
std::move(filter),
cluster::force_refresh::no,
model::timeout_clock::now() + _lag_collection_interval());
if (report_r) {
responses.emplace(node_id, std::move(report_r).assume_value());
} else {
vlog(
klog.warn,
"group_manager::collect_consumer_lag_metrics: "
"failed to get cluster health report: {}",
report_r.error());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if we need to issue these in parallel / with some parallelism, but I think we don't need that because this is not issuing requests directly but rather querying cached state and, if stale, issues a full refresh. Is my understanding correct there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it does seem to do a full refresh from all nodes, so subsequent requests in the loop should be served from cached state.

I guess we could change the api of the filters to be more expressive. Or I could request the full ntp set from all nodes and then filter by leader.

Comment on lines -3398 to -3402
runtime_services.push_back(
std::make_unique<kafka::consumer_group_lag_metrics_service>(
sched_groups.cluster_sg(),
smp_service_groups.cluster_smp_sg(),
std::ref(_consumer_group_lag_metrics_frontend)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep this code around for 1 major version to avoid the metrics disappearing during an upgrade?
I think if we remove this now, during an upgrade, old nodes would try to hit this endpoint still but will get back rpc::errc::method_not_found and the metrics would be broken during the upgrade.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's not too onerous.

Comment on lines 2215 to 2217
for (auto shard_id : std::views::iota(ss::shard_id(0), ss::smp::count)) {
co_await container().invoke_on(shard_id, collect_requests);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: co_await container().invoke_on_all(collect_requests);

Copy link
Member Author

@BenPope BenPope Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: co_await container().invoke_on_all(collect_requests);

invoke_on_all is parallel, collect_requests mutates shared state, so sequentially was intentional. I initially did it with a map_reduce, but the reduce is a bit ugly. I can take another stab at it.

Copy link
Member Author

@BenPope BenPope Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not pretty

    auto r = co_await container().map_reduce0(
      collect_requests, requests_t{}, [](requests_t acc, requests_t val) {
          for (auto& [leader_id, filter] : val) {
              auto& acc_report = acc
                                   .try_emplace(
                                     leader_id,
                                     cluster::cluster_report_filter{
                                       .nodes = {leader_id}})
                                   .first->second;
              for (auto& [ns, topics] :
                   filter.node_report_filter.ntp_filters.namespaces) {
                  auto& acc_ns
                    = acc_report.node_report_filter.ntp_filters.namespaces[ns];
                  for (auto& [topic, parts] : topics) {
                      auto& acc_topic = acc_ns[topic];
                      acc_topic.insert(parts.begin(), parts.end());
                  }
              }
          }
          return acc;
      });

Comment on lines +625 to +626
"Updates will not be more frequent than "
"`health_monitor_max_metadata_age`.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

health_monitor_max_metadata_age currently only limits how fresh the hwm is, but not how fresh the group offsets are, if I'm not mistaken. I'm wondering if we should update the docs to call that out or use std::max(consumer_group_lag_collection_interval_sec, health_monitor_max_metadata_age) as the collection interval.

Copy link
Member Author

@BenPope BenPope Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The offsets are read live, whilst the HWM may be a little stale, so it's possible to have a negative lag, which is unfortunate.

Since I'm pushing consumer_group_lag_collection_interval into the build report, the data can be stale by min(health_monitor_max_metadata_age, consumer_group_lag_collection_interval).

Comment on lines 623 to +626
"How often to run the collection loop when enable_consumer_group_metrics "
"contains consumer_lag",
"contains consumer_lag."
"Updates will not be more frequent than "
"`health_monitor_max_metadata_age`.",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"How often to run the collection loop when enable_consumer_group_metrics "
"contains consumer_lag",
"contains consumer_lag."
"Updates will not be more frequent than "
"`health_monitor_max_metadata_age`.",
"How often Redpanda runs the collection loop when `enable_consumer_group_metrics` is set to `consumer_lag`. Updates will not be more frequent than `health_monitor_max_metadata_age`."

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i hate that this does not preserve spaces sometimes :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants