From ae9a0a6607b96e338f2c82e6f92332c6befe12b8 Mon Sep 17 00:00:00 2001 From: Bharath Vissapragada Date: Thu, 12 Dec 2024 15:36:59 -0800 Subject: [PATCH] datalake/metrics: miscellaneous improvements to lag metrics This is based on our experience debugging Brandon's perf setup. - Changes the metric reporting to report lag only on leaders. This makes it easy to monitor the metric using an aggregate across all replicas without having to worry about the current leader. - Fixed a bug where lag entry was not added to serde fields, adjusted the test coverage to catch this scenario, refactored the test slightly while I'm there. (cherry picked from commit bef32546a3ece0764c51e2eb881411c799b1427d) --- src/v/cluster/partition_probe.cc | 36 ++++++-- src/v/cluster/partition_probe.h | 2 + src/v/datalake/coordinator/types.h | 4 +- .../tests/datalake/datalake_e2e_test.py | 89 ++++++++++++------- 4 files changed, 94 insertions(+), 37 deletions(-) diff --git a/src/v/cluster/partition_probe.cc b/src/v/cluster/partition_probe.cc index 2607ccba0675..6c33f6522104 100644 --- a/src/v/cluster/partition_probe.cc +++ b/src/v/cluster/partition_probe.cc @@ -24,6 +24,8 @@ namespace cluster { static const ss::sstring cluster_metrics_name = prometheus_sanitize::metrics_name("cluster:partition"); +static constexpr int64_t follower_iceberg_lag_metric = 0; + replicated_partition_probe::replicated_partition_probe( const partition& p) noexcept : _partition(p) { @@ -46,6 +48,16 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) { setup_public_metrics(ntp); } +int64_t replicated_partition_probe::iceberg_translation_offset_lag() const { + return _partition.is_leader() ? _iceberg_translation_offset_lag + : follower_iceberg_lag_metric; +} + +int64_t replicated_partition_probe::iceberg_commit_offset_lag() const { + return _partition.is_leader() ? _iceberg_commit_offset_lag + : follower_iceberg_lag_metric; +} + void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) { namespace sm = ss::metrics; @@ -168,6 +180,11 @@ void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) { {sm::shard_label, partition_label}); if (model::is_user_topic(_partition.ntp())) { + // Metrics are reported as follows + // -2 (default initialized state) + // -1 (iceberg disabled state) + // 0 (iceberg enabled but follower replicas) + // leader replicas _metrics.add_group( cluster_metrics_name, { @@ -175,21 +192,28 @@ void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) { "iceberg_offsets_pending_translation", [this] { return _partition.log()->config().iceberg_enabled() - ? _iceberg_translation_offset_lag + ? iceberg_translation_offset_lag() : metric_feature_disabled_state; }, - sm::description("Total number of offsets that are pending " - "translation to iceberg."), + sm::description( + "Total number of offsets that are pending " + "translation to iceberg. Lag is reported only on leader " + "replicas while followers report 0. -1 is reported if iceberg " + "is disabled while -2 indicates the lag is " + "not yet computed."), labels), sm::make_gauge( "iceberg_offsets_pending_commit", [this] { return _partition.log()->config().iceberg_enabled() - ? _iceberg_commit_offset_lag + ? iceberg_commit_offset_lag() : metric_feature_disabled_state; }, - sm::description("Total number of offsets that are pending " - "commit to iceberg catalog."), + sm::description( + "Total number of offsets that are pending " + "commit to iceberg catalog. Lag is reported only on leader " + "while followers report 0. -1 is reported if iceberg is " + "disabled while -2 indicates the lag is not yet computed."), labels), }, {}, diff --git a/src/v/cluster/partition_probe.h b/src/v/cluster/partition_probe.h index 7d8c10aaaf6a..32acc0da9ea9 100644 --- a/src/v/cluster/partition_probe.h +++ b/src/v/cluster/partition_probe.h @@ -109,6 +109,8 @@ class replicated_partition_probe : public partition_probe::impl { void clear_metrics() final; private: + int64_t iceberg_translation_offset_lag() const; + int64_t iceberg_commit_offset_lag() const; void reconfigure_metrics(); void setup_public_metrics(const model::ntp&); void setup_internal_metrics(const model::ntp&); diff --git a/src/v/datalake/coordinator/types.h b/src/v/datalake/coordinator/types.h index ba37346bfa4d..d25c9ea70c31 100644 --- a/src/v/datalake/coordinator/types.h +++ b/src/v/datalake/coordinator/types.h @@ -186,7 +186,9 @@ struct fetch_latest_translated_offset_reply friend std::ostream& operator<<(std::ostream&, const fetch_latest_translated_offset_reply&); - auto serde_fields() { return std::tie(last_added_offset, errc); } + auto serde_fields() { + return std::tie(last_added_offset, errc, last_iceberg_committed_offset); + } }; // For a given topic/partition fetches the latest translated offset from diff --git a/tests/rptest/tests/datalake/datalake_e2e_test.py b/tests/rptest/tests/datalake/datalake_e2e_test.py index fc6c73386e70..baf47abfaf5e 100644 --- a/tests/rptest/tests/datalake/datalake_e2e_test.py +++ b/tests/rptest/tests/datalake/datalake_e2e_test.py @@ -24,11 +24,6 @@ from ducktape.utils.util import wait_until from rptest.services.metrics_check import MetricCheck -NO_SCHEMA_ERRORS = [ - r'Must have parsed schema when using structured data mode', - r'Error translating data to binary record' -] - class DatalakeE2ETests(RedpandaTest): def __init__(self, test_ctx, *args, **kwargs): @@ -192,45 +187,79 @@ def table_deleted(): dl.produce_to_topic(self.topic_name, 1024, count) dl.wait_for_translation(self.topic_name, msg_count=count) - @cluster(num_nodes=3, log_allow_list=NO_SCHEMA_ERRORS) - @matrix(cloud_storage_type=supported_storage_types()) - def test_metrics(self, cloud_storage_type): - commit_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_commit' - translation_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_translation' +class DatalakeMetricsTest(RedpandaTest): + + commit_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_commit' + translation_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_translation' + + def __init__(self, test_ctx, *args, **kwargs): + super(DatalakeMetricsTest, + self).__init__(test_ctx, + num_brokers=3, + si_settings=SISettings(test_context=test_ctx), + extra_rp_conf={ + "iceberg_enabled": "true", + "iceberg_catalog_commit_interval_ms": "5000", + "enable_leader_balancer": False + }, + schema_registry_config=SchemaRegistryConfig(), + pandaproxy_config=PandaproxyConfig(), + *args, + **kwargs) + self.test_ctx = test_ctx + self.topic_name = "test" + + def setUp(self): + pass + + def wait_for_lag(self, metric_check: MetricCheck, metric_name: str, + count: int): + wait_until( + lambda: metric_check.evaluate([(metric_name, lambda _, val: val == + count)]), + timeout_sec=30, + backoff_sec=5, + err_msg=f"Timed out waiting for {metric_name} to reach: {count}") + + @cluster(num_nodes=5) + @matrix(cloud_storage_type=supported_storage_types()) + def test_lag_metrics(self, cloud_storage_type): with DatalakeServices(self.test_ctx, redpanda=self.redpanda, filesystem_catalog_mode=False, include_query_engines=[]) as dl: - dl.create_iceberg_enabled_topic( - self.topic_name, - partitions=1, - replicas=1, - iceberg_mode="value_schema_id_prefix") + # Stop the catalog to halt the translation flow + dl.catalog_service.stop() + + dl.create_iceberg_enabled_topic(self.topic_name, + partitions=1, + replicas=3) + topic_leader = self.redpanda.partitions(self.topic_name)[0].leader count = randint(12, 21) - # Populate schemaless messages in schema-ed mode, this should - # hold up translation and commits - dl.produce_to_topic(self.topic_name, 1024, msg_count=count) + dl.produce_to_topic(self.topic_name, 1, msg_count=count) m = MetricCheck(self.redpanda.logger, self.redpanda, - self.redpanda.nodes[0], - [commit_lag, translation_lag], + topic_leader, [ + DatalakeMetricsTest.commit_lag, + DatalakeMetricsTest.translation_lag + ], labels={ 'namespace': 'kafka', 'topic': self.topic_name, 'partition': '0' }, reduce=sum) - expectations = [] - for metric in [commit_lag, translation_lag]: - expectations.append([metric, lambda _, val: val == count]) - - # Ensure lag metric builds up as expected. - wait_until( - lambda: m.evaluate(expectations), - timeout_sec=30, - backoff_sec=5, - err_msg=f"Timed out waiting for metrics to reach: {count}") + + # Wait for lag build up + self.wait_for_lag(m, DatalakeMetricsTest.translation_lag, count) + self.wait_for_lag(m, DatalakeMetricsTest.commit_lag, count) + + # Resume iceberg translation + dl.catalog_service.start() + + self.wait_for_lag(m, DatalakeMetricsTest.translation_lag, 0) + self.wait_for_lag(m, DatalakeMetricsTest.commit_lag, 0)