Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.3.x] datalake/metrics: miscellaneous improvements to lag metrics #24575

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions src/v/cluster/partition_probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ namespace cluster {
static const ss::sstring cluster_metrics_name
= prometheus_sanitize::metrics_name("cluster:partition");

static constexpr int64_t follower_iceberg_lag_metric = 0;

replicated_partition_probe::replicated_partition_probe(
const partition& p) noexcept
: _partition(p) {
Expand All @@ -46,6 +48,16 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) {
setup_public_metrics(ntp);
}

int64_t replicated_partition_probe::iceberg_translation_offset_lag() const {
return _partition.is_leader() ? _iceberg_translation_offset_lag
: follower_iceberg_lag_metric;
}

int64_t replicated_partition_probe::iceberg_commit_offset_lag() const {
return _partition.is_leader() ? _iceberg_commit_offset_lag
: follower_iceberg_lag_metric;
}

void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) {
namespace sm = ss::metrics;

Expand Down Expand Up @@ -168,28 +180,40 @@ void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) {
{sm::shard_label, partition_label});

if (model::is_user_topic(_partition.ntp())) {
// Metrics are reported as follows
// -2 (default initialized state)
// -1 (iceberg disabled state)
// 0 (iceberg enabled but follower replicas)
// <actual lag> leader replicas
_metrics.add_group(
cluster_metrics_name,
{
sm::make_gauge(
"iceberg_offsets_pending_translation",
[this] {
return _partition.log()->config().iceberg_enabled()
? _iceberg_translation_offset_lag
? iceberg_translation_offset_lag()
: metric_feature_disabled_state;
},
sm::description("Total number of offsets that are pending "
"translation to iceberg."),
sm::description(
"Total number of offsets that are pending "
"translation to iceberg. Lag is reported only on leader "
"replicas while followers report 0. -1 is reported if iceberg "
"is disabled while -2 indicates the lag is "
"not yet computed."),
labels),
sm::make_gauge(
"iceberg_offsets_pending_commit",
[this] {
return _partition.log()->config().iceberg_enabled()
? _iceberg_commit_offset_lag
? iceberg_commit_offset_lag()
: metric_feature_disabled_state;
},
sm::description("Total number of offsets that are pending "
"commit to iceberg catalog."),
sm::description(
"Total number of offsets that are pending "
"commit to iceberg catalog. Lag is reported only on leader "
"while followers report 0. -1 is reported if iceberg is "
"disabled while -2 indicates the lag is not yet computed."),
labels),
},
{},
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/partition_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class replicated_partition_probe : public partition_probe::impl {
void clear_metrics() final;

private:
int64_t iceberg_translation_offset_lag() const;
int64_t iceberg_commit_offset_lag() const;
void reconfigure_metrics();
void setup_public_metrics(const model::ntp&);
void setup_internal_metrics(const model::ntp&);
Expand Down
4 changes: 3 additions & 1 deletion src/v/datalake/coordinator/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ struct fetch_latest_translated_offset_reply
friend std::ostream&
operator<<(std::ostream&, const fetch_latest_translated_offset_reply&);

auto serde_fields() { return std::tie(last_added_offset, errc); }
auto serde_fields() {
return std::tie(last_added_offset, errc, last_iceberg_committed_offset);
}
};

// For a given topic/partition fetches the latest translated offset from
Expand Down
89 changes: 59 additions & 30 deletions tests/rptest/tests/datalake/datalake_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@
from ducktape.utils.util import wait_until
from rptest.services.metrics_check import MetricCheck

NO_SCHEMA_ERRORS = [
r'Must have parsed schema when using structured data mode',
r'Error translating data to binary record'
]


class DatalakeE2ETests(RedpandaTest):
def __init__(self, test_ctx, *args, **kwargs):
Expand Down Expand Up @@ -192,45 +187,79 @@ def table_deleted():
dl.produce_to_topic(self.topic_name, 1024, count)
dl.wait_for_translation(self.topic_name, msg_count=count)

@cluster(num_nodes=3, log_allow_list=NO_SCHEMA_ERRORS)
@matrix(cloud_storage_type=supported_storage_types())
def test_metrics(self, cloud_storage_type):

commit_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_commit'
translation_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_translation'
class DatalakeMetricsTest(RedpandaTest):

commit_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_commit'
translation_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_translation'

def __init__(self, test_ctx, *args, **kwargs):
super(DatalakeMetricsTest,
self).__init__(test_ctx,
num_brokers=3,
si_settings=SISettings(test_context=test_ctx),
extra_rp_conf={
"iceberg_enabled": "true",
"iceberg_catalog_commit_interval_ms": "5000",
"enable_leader_balancer": False
},
schema_registry_config=SchemaRegistryConfig(),
pandaproxy_config=PandaproxyConfig(),
*args,
**kwargs)
self.test_ctx = test_ctx
self.topic_name = "test"

def setUp(self):
pass

def wait_for_lag(self, metric_check: MetricCheck, metric_name: str,
count: int):
wait_until(
lambda: metric_check.evaluate([(metric_name, lambda _, val: val ==
count)]),
timeout_sec=30,
backoff_sec=5,
err_msg=f"Timed out waiting for {metric_name} to reach: {count}")

@cluster(num_nodes=5)
@matrix(cloud_storage_type=supported_storage_types())
def test_lag_metrics(self, cloud_storage_type):

with DatalakeServices(self.test_ctx,
redpanda=self.redpanda,
filesystem_catalog_mode=False,
include_query_engines=[]) as dl:

dl.create_iceberg_enabled_topic(
self.topic_name,
partitions=1,
replicas=1,
iceberg_mode="value_schema_id_prefix")
# Stop the catalog to halt the translation flow
dl.catalog_service.stop()

dl.create_iceberg_enabled_topic(self.topic_name,
partitions=1,
replicas=3)
topic_leader = self.redpanda.partitions(self.topic_name)[0].leader
count = randint(12, 21)
# Populate schemaless messages in schema-ed mode, this should
# hold up translation and commits
dl.produce_to_topic(self.topic_name, 1024, msg_count=count)
dl.produce_to_topic(self.topic_name, 1, msg_count=count)

m = MetricCheck(self.redpanda.logger,
self.redpanda,
self.redpanda.nodes[0],
[commit_lag, translation_lag],
topic_leader, [
DatalakeMetricsTest.commit_lag,
DatalakeMetricsTest.translation_lag
],
labels={
'namespace': 'kafka',
'topic': self.topic_name,
'partition': '0'
},
reduce=sum)
expectations = []
for metric in [commit_lag, translation_lag]:
expectations.append([metric, lambda _, val: val == count])

# Ensure lag metric builds up as expected.
wait_until(
lambda: m.evaluate(expectations),
timeout_sec=30,
backoff_sec=5,
err_msg=f"Timed out waiting for metrics to reach: {count}")

# Wait for lag build up
self.wait_for_lag(m, DatalakeMetricsTest.translation_lag, count)
self.wait_for_lag(m, DatalakeMetricsTest.commit_lag, count)

# Resume iceberg translation
dl.catalog_service.start()

self.wait_for_lag(m, DatalakeMetricsTest.translation_lag, 0)
self.wait_for_lag(m, DatalakeMetricsTest.commit_lag, 0)
Loading