Skip to content

Commit

Permalink
Merge pull request #24575 from vbotbuildovich/backport-pr-24568-v24.3…
Browse files Browse the repository at this point in the history
….x-981

[v24.3.x] datalake/metrics: miscellaneous improvements to lag metrics
  • Loading branch information
bharathv authored Dec 15, 2024
2 parents 951f1f2 + ae9a0a6 commit ac6cfbf
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 37 deletions.
36 changes: 30 additions & 6 deletions src/v/cluster/partition_probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ namespace cluster {
static const ss::sstring cluster_metrics_name
= prometheus_sanitize::metrics_name("cluster:partition");

static constexpr int64_t follower_iceberg_lag_metric = 0;

replicated_partition_probe::replicated_partition_probe(
const partition& p) noexcept
: _partition(p) {
Expand All @@ -46,6 +48,16 @@ void replicated_partition_probe::setup_metrics(const model::ntp& ntp) {
setup_public_metrics(ntp);
}

int64_t replicated_partition_probe::iceberg_translation_offset_lag() const {
return _partition.is_leader() ? _iceberg_translation_offset_lag
: follower_iceberg_lag_metric;
}

int64_t replicated_partition_probe::iceberg_commit_offset_lag() const {
return _partition.is_leader() ? _iceberg_commit_offset_lag
: follower_iceberg_lag_metric;
}

void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) {
namespace sm = ss::metrics;

Expand Down Expand Up @@ -168,28 +180,40 @@ void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) {
{sm::shard_label, partition_label});

if (model::is_user_topic(_partition.ntp())) {
// Metrics are reported as follows
// -2 (default initialized state)
// -1 (iceberg disabled state)
// 0 (iceberg enabled but follower replicas)
// <actual lag> leader replicas
_metrics.add_group(
cluster_metrics_name,
{
sm::make_gauge(
"iceberg_offsets_pending_translation",
[this] {
return _partition.log()->config().iceberg_enabled()
? _iceberg_translation_offset_lag
? iceberg_translation_offset_lag()
: metric_feature_disabled_state;
},
sm::description("Total number of offsets that are pending "
"translation to iceberg."),
sm::description(
"Total number of offsets that are pending "
"translation to iceberg. Lag is reported only on leader "
"replicas while followers report 0. -1 is reported if iceberg "
"is disabled while -2 indicates the lag is "
"not yet computed."),
labels),
sm::make_gauge(
"iceberg_offsets_pending_commit",
[this] {
return _partition.log()->config().iceberg_enabled()
? _iceberg_commit_offset_lag
? iceberg_commit_offset_lag()
: metric_feature_disabled_state;
},
sm::description("Total number of offsets that are pending "
"commit to iceberg catalog."),
sm::description(
"Total number of offsets that are pending "
"commit to iceberg catalog. Lag is reported only on leader "
"while followers report 0. -1 is reported if iceberg is "
"disabled while -2 indicates the lag is not yet computed."),
labels),
},
{},
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/partition_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class replicated_partition_probe : public partition_probe::impl {
void clear_metrics() final;

private:
int64_t iceberg_translation_offset_lag() const;
int64_t iceberg_commit_offset_lag() const;
void reconfigure_metrics();
void setup_public_metrics(const model::ntp&);
void setup_internal_metrics(const model::ntp&);
Expand Down
4 changes: 3 additions & 1 deletion src/v/datalake/coordinator/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ struct fetch_latest_translated_offset_reply
friend std::ostream&
operator<<(std::ostream&, const fetch_latest_translated_offset_reply&);

auto serde_fields() { return std::tie(last_added_offset, errc); }
auto serde_fields() {
return std::tie(last_added_offset, errc, last_iceberg_committed_offset);
}
};

// For a given topic/partition fetches the latest translated offset from
Expand Down
89 changes: 59 additions & 30 deletions tests/rptest/tests/datalake/datalake_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@
from ducktape.utils.util import wait_until
from rptest.services.metrics_check import MetricCheck

NO_SCHEMA_ERRORS = [
r'Must have parsed schema when using structured data mode',
r'Error translating data to binary record'
]


class DatalakeE2ETests(RedpandaTest):
def __init__(self, test_ctx, *args, **kwargs):
Expand Down Expand Up @@ -192,45 +187,79 @@ def table_deleted():
dl.produce_to_topic(self.topic_name, 1024, count)
dl.wait_for_translation(self.topic_name, msg_count=count)

@cluster(num_nodes=3, log_allow_list=NO_SCHEMA_ERRORS)
@matrix(cloud_storage_type=supported_storage_types())
def test_metrics(self, cloud_storage_type):

commit_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_commit'
translation_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_translation'
class DatalakeMetricsTest(RedpandaTest):

commit_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_commit'
translation_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_translation'

def __init__(self, test_ctx, *args, **kwargs):
super(DatalakeMetricsTest,
self).__init__(test_ctx,
num_brokers=3,
si_settings=SISettings(test_context=test_ctx),
extra_rp_conf={
"iceberg_enabled": "true",
"iceberg_catalog_commit_interval_ms": "5000",
"enable_leader_balancer": False
},
schema_registry_config=SchemaRegistryConfig(),
pandaproxy_config=PandaproxyConfig(),
*args,
**kwargs)
self.test_ctx = test_ctx
self.topic_name = "test"

def setUp(self):
pass

def wait_for_lag(self, metric_check: MetricCheck, metric_name: str,
count: int):
wait_until(
lambda: metric_check.evaluate([(metric_name, lambda _, val: val ==
count)]),
timeout_sec=30,
backoff_sec=5,
err_msg=f"Timed out waiting for {metric_name} to reach: {count}")

@cluster(num_nodes=5)
@matrix(cloud_storage_type=supported_storage_types())
def test_lag_metrics(self, cloud_storage_type):

with DatalakeServices(self.test_ctx,
redpanda=self.redpanda,
filesystem_catalog_mode=False,
include_query_engines=[]) as dl:

dl.create_iceberg_enabled_topic(
self.topic_name,
partitions=1,
replicas=1,
iceberg_mode="value_schema_id_prefix")
# Stop the catalog to halt the translation flow
dl.catalog_service.stop()

dl.create_iceberg_enabled_topic(self.topic_name,
partitions=1,
replicas=3)
topic_leader = self.redpanda.partitions(self.topic_name)[0].leader
count = randint(12, 21)
# Populate schemaless messages in schema-ed mode, this should
# hold up translation and commits
dl.produce_to_topic(self.topic_name, 1024, msg_count=count)
dl.produce_to_topic(self.topic_name, 1, msg_count=count)

m = MetricCheck(self.redpanda.logger,
self.redpanda,
self.redpanda.nodes[0],
[commit_lag, translation_lag],
topic_leader, [
DatalakeMetricsTest.commit_lag,
DatalakeMetricsTest.translation_lag
],
labels={
'namespace': 'kafka',
'topic': self.topic_name,
'partition': '0'
},
reduce=sum)
expectations = []
for metric in [commit_lag, translation_lag]:
expectations.append([metric, lambda _, val: val == count])

# Ensure lag metric builds up as expected.
wait_until(
lambda: m.evaluate(expectations),
timeout_sec=30,
backoff_sec=5,
err_msg=f"Timed out waiting for metrics to reach: {count}")

# Wait for lag build up
self.wait_for_lag(m, DatalakeMetricsTest.translation_lag, count)
self.wait_for_lag(m, DatalakeMetricsTest.commit_lag, count)

# Resume iceberg translation
dl.catalog_service.start()

self.wait_for_lag(m, DatalakeMetricsTest.translation_lag, 0)
self.wait_for_lag(m, DatalakeMetricsTest.commit_lag, 0)

0 comments on commit ac6cfbf

Please sign in to comment.