Skip to content

Commit

Permalink
Merge pull request #24333 from vbotbuildovich/backport-pr-24305-v24.3…
Browse files Browse the repository at this point in the history
….x-63
  • Loading branch information
rockwotj authored Nov 27, 2024
2 parents 64e5e6b + 3ae2c7e commit afe1a3f
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 29 deletions.
29 changes: 29 additions & 0 deletions src/v/cluster/partition_probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,35 @@ void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) {
{},
{sm::shard_label, partition_label});

if (model::is_user_topic(_partition.ntp())) {
_metrics.add_group(
cluster_metrics_name,
{
sm::make_gauge(
"iceberg_offsets_pending_translation",
[this] {
return _partition.log()->config().iceberg_enabled()
? _iceberg_translation_offset_lag
: metric_feature_disabled_state;
},
sm::description("Total number of offsets that are pending "
"translation to iceberg."),
labels),
sm::make_gauge(
"iceberg_offsets_pending_commit",
[this] {
return _partition.log()->config().iceberg_enabled()
? _iceberg_commit_offset_lag
: metric_feature_disabled_state;
},
sm::description("Total number of offsets that are pending "
"commit to iceberg catalog."),
labels),
},
{},
{sm::shard_label, partition_label});
}

if (
config::shard_local_cfg().enable_schema_id_validation()
!= pandaproxy::schema_registry::schema_id_validation_mode::none) {
Expand Down
22 changes: 22 additions & 0 deletions src/v/cluster/partition_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class partition_probe {
virtual void add_bytes_fetched(uint64_t) = 0;
virtual void add_bytes_fetched_from_follower(uint64_t) = 0;
virtual void add_schema_id_validation_failed() = 0;
virtual void update_iceberg_translation_offset_lag(int64_t) = 0;
virtual void update_iceberg_commit_offset_lag(int64_t) = 0;
virtual void setup_metrics(const model::ntp&) = 0;
virtual void clear_metrics() = 0;
virtual ~impl() noexcept = default;
Expand Down Expand Up @@ -66,6 +68,14 @@ class partition_probe {
_impl->add_schema_id_validation_failed();
}

void update_iceberg_translation_offset_lag(int64_t new_lag) {
_impl->update_iceberg_translation_offset_lag(new_lag);
}

void update_iceberg_commit_offset_lag(int64_t new_lag) {
_impl->update_iceberg_commit_offset_lag(new_lag);
}

void clear_metrics() { _impl->clear_metrics(); }

private:
Expand All @@ -88,6 +98,14 @@ class replicated_partition_probe : public partition_probe::impl {
++_schema_id_validation_records_failed;
};

void update_iceberg_translation_offset_lag(int64_t new_lag) final {
_iceberg_translation_offset_lag = new_lag;
}

void update_iceberg_commit_offset_lag(int64_t new_lag) final {
_iceberg_commit_offset_lag = new_lag;
}

void clear_metrics() final;

private:
Expand All @@ -98,13 +116,17 @@ class replicated_partition_probe : public partition_probe::impl {
void setup_public_scrubber_metric(const model::ntp&);

private:
static constexpr int64_t metric_default_initialized_state{-2};
static constexpr int64_t metric_feature_disabled_state{-1};
const partition& _partition;
uint64_t _records_produced{0};
uint64_t _records_fetched{0};
uint64_t _bytes_produced{0};
uint64_t _bytes_fetched{0};
uint64_t _bytes_fetched_from_follower{0};
uint64_t _schema_id_validation_records_failed{0};
int64_t _iceberg_translation_offset_lag{metric_default_initialized_state};
int64_t _iceberg_commit_offset_lag{metric_default_initialized_state};
metrics::internal_metric_groups _metrics;
metrics::public_metric_groups _public_metrics;
};
Expand Down
17 changes: 10 additions & 7 deletions src/v/datalake/coordinator/coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,8 @@ coordinator::sync_add_files(
co_return std::nullopt;
}

ss::future<checked<std::optional<kafka::offset>, coordinator::errc>>
coordinator::sync_get_last_added_offset(
ss::future<checked<coordinator::last_offsets, coordinator::errc>>
coordinator::sync_get_last_added_offsets(
model::topic_partition tp, model::revision_id requested_topic_rev) {
auto gate = maybe_gate();
if (gate.has_error()) {
Expand All @@ -400,7 +400,7 @@ coordinator::sync_get_last_added_offset(
}
auto topic_it = stm_->state().topic_to_state.find(tp.topic);
if (topic_it == stm_->state().topic_to_state.end()) {
co_return std::nullopt;
co_return last_offsets{std::nullopt, std::nullopt};
}
const auto& topic = topic_it->second;
if (requested_topic_rev < topic.revision) {
Expand All @@ -415,7 +415,7 @@ coordinator::sync_get_last_added_offset(
if (topic.lifecycle_state == topic_state::lifecycle_state_t::purged) {
// Coordinator is ready to accept files for the new topic revision,
// but there is no stm record yet. Reply with "no offset".
co_return std::nullopt;
co_return last_offsets{std::nullopt, std::nullopt};
}

vlog(
Expand All @@ -438,13 +438,16 @@ coordinator::sync_get_last_added_offset(

auto partition_it = topic.pid_to_pending_files.find(tp.partition);
if (partition_it == topic.pid_to_pending_files.end()) {
co_return std::nullopt;
co_return last_offsets{std::nullopt, std::nullopt};
}
const auto& prt_state = partition_it->second;
if (prt_state.pending_entries.empty()) {
co_return prt_state.last_committed;
co_return last_offsets{
prt_state.last_committed, prt_state.last_committed};
}
co_return prt_state.pending_entries.back().data.last_offset;
co_return last_offsets{
prt_state.pending_entries.back().data.last_offset,
prt_state.last_committed};
}

void coordinator::notify_leadership(std::optional<model::node_id> leader_id) {
Expand Down
7 changes: 5 additions & 2 deletions src/v/datalake/coordinator/coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ class coordinator {
model::revision_id topic_revision,
chunked_vector<translated_offset_range>);

ss::future<checked<std::optional<kafka::offset>, errc>>
sync_get_last_added_offset(
struct last_offsets {
std::optional<kafka::offset> last_added_offset;
std::optional<kafka::offset> last_committed_offset;
};
ss::future<checked<last_offsets, errc>> sync_get_last_added_offsets(
model::topic_partition tp, model::revision_id topic_rev);

void notify_leadership(std::optional<model::node_id>);
Expand Down
6 changes: 4 additions & 2 deletions src/v/datalake/coordinator/frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,14 @@ ss::future<fetch_latest_translated_offset_reply> fetch_latest_offset(
if (!crd) {
co_return fetch_latest_translated_offset_reply{errc::not_leader};
}
auto ret = co_await crd->sync_get_last_added_offset(
auto ret = co_await crd->sync_get_last_added_offsets(
req.tp, req.topic_revision);
if (ret.has_error()) {
co_return to_rpc_errc(ret.error());
}
co_return fetch_latest_translated_offset_reply{ret.value()};
auto& val = ret.value();
co_return fetch_latest_translated_offset_reply{
val.last_added_offset, val.last_committed_offset};
}
} // namespace

Expand Down
16 changes: 8 additions & 8 deletions src/v/datalake/coordinator/tests/coordinator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ ss::future<> file_adder_loop(
while (!done) {
co_await random_sleep_ms(30);
vlog(datalake::datalake_log.debug, "[{}] getting last added", id);
auto last_res = co_await n.crd.sync_get_last_added_offset(
auto last_res = co_await n.crd.sync_get_last_added_offsets(
tp, topic_rev);
if (last_res.has_error()) {
continue;
Expand All @@ -126,7 +126,7 @@ ss::future<> file_adder_loop(
if (ensure_res.has_error()) {
continue;
}
auto cur_last_opt = last_res.value();
auto cur_last_opt = last_res.value().last_added_offset;
while (true) {
co_await random_sleep_ms(30);
if (cur_last_opt && cur_last_opt.value()() == last_offset) {
Expand Down Expand Up @@ -402,14 +402,14 @@ TEST_F(CoordinatorTest, TestLastAddedHappyPath) {
ASSERT_FALSE(add_res.has_error()) << add_res.error();
}

auto last_res = leader.crd.sync_get_last_added_offset(tp00, rev).get();
auto last_res = leader.crd.sync_get_last_added_offsets(tp00, rev).get();
ASSERT_FALSE(last_res.has_error()) << last_res.error();
ASSERT_TRUE(last_res.value().has_value());
ASSERT_EQ(400, last_res.value().value()());
ASSERT_TRUE(last_res.value().last_added_offset.has_value());
ASSERT_EQ(400, last_res.value().last_added_offset.value()());

last_res = leader.crd.sync_get_last_added_offset(tp01, rev).get();
last_res = leader.crd.sync_get_last_added_offsets(tp01, rev).get();
ASSERT_FALSE(last_res.has_error()) << last_res.error();
ASSERT_FALSE(last_res.value().has_value());
ASSERT_FALSE(last_res.value().last_added_offset.has_value());
}

TEST_F(CoordinatorTest, TestNotLeader) {
Expand All @@ -435,7 +435,7 @@ TEST_F(CoordinatorTest, TestNotLeader) {
ASSERT_TRUE(add_res.has_error());
EXPECT_EQ(coordinator::errc::not_leader, add_res.error());

auto last_res = non_leader.crd.sync_get_last_added_offset(tp00, rev).get();
auto last_res = non_leader.crd.sync_get_last_added_offsets(tp00, rev).get();
ASSERT_TRUE(last_res.has_error()) << last_res.error();
EXPECT_EQ(coordinator::errc::not_leader, last_res.error());
}
Expand Down
5 changes: 3 additions & 2 deletions src/v/datalake/coordinator/tests/state_machine_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,14 @@ TEST_F_CORO(coordinator_stm_fixture, test_snapshots) {
auto add_files_result = co_await retry_with_leader_coordinator(
[&, this](coordinator& coordinator) mutable {
auto tp = random_tp();
return coordinator->sync_get_last_added_offset(tp, rev).then(
return coordinator->sync_get_last_added_offsets(tp, rev).then(
[&, tp](auto result) {
if (!result) {
return ss::make_ready_future<bool>(false);
}
auto last_committed_offset = kafka::offset_cast(
result.value().value_or(kafka::offset{-1}));
result.value().last_added_offset.value_or(
kafka::offset{-1}));
std::vector<std::pair<int64_t, int64_t>> offset_pairs;
offset_pairs.reserve(5);
auto next_offset = last_committed_offset() + 1;
Expand Down
8 changes: 6 additions & 2 deletions src/v/datalake/coordinator/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,17 @@ struct fetch_latest_translated_offset_reply
explicit fetch_latest_translated_offset_reply(errc err)
: errc(err) {}
explicit fetch_latest_translated_offset_reply(
std::optional<kafka::offset> o)
: last_added_offset(o)
std::optional<kafka::offset> last_added,
std::optional<kafka::offset> last_committed)
: last_added_offset(last_added)
, last_iceberg_committed_offset(last_committed)
, errc(errc::ok) {}

// The offset of the latest data file added to the coordinator.
std::optional<kafka::offset> last_added_offset;

std::optional<kafka::offset> last_iceberg_committed_offset;

// If not ok, the request processing has a problem.
errc errc;

Expand Down
46 changes: 40 additions & 6 deletions src/v/datalake/translation/partition_translator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ partition_translator::do_translate_once(retry_chain_node& parent_rcn) {
read_begin_offset,
read_end_offset,
_partition->last_stable_offset());
_partition->probe().update_iceberg_translation_offset_lag(0);
co_return translation_success::yes;
}
// We have some data to translate, make a reader
Expand Down Expand Up @@ -338,14 +339,46 @@ partition_translator::do_translate_once(retry_chain_node& parent_rcn) {
parent_rcn, std::move(kafka_reader), read_begin_offset);
units.return_all();
vlog(_logger.debug, "translation result: {}", translation_result);
units.return_all();
auto result = translation_success::no;
auto max_translated_offset = kafka::prev_offset(read_begin_offset);
if (translation_result) {
auto last_translated_offset = translation_result->last_offset;
if (co_await checkpoint_translated_data(
parent_rcn,
read_begin_offset,
std::move(translation_result.value()))) {
max_translated_offset = last_translated_offset;
result = translation_success::yes;
}
}
update_translation_lag(max_translated_offset);
co_return result;
}

void partition_translator::update_translation_lag(
kafka::offset max_translated_offset) const {
auto max_translatable_offset = max_offset_for_translation();
if (
translation_result
&& co_await checkpoint_translated_data(
parent_rcn, read_begin_offset, std::move(translation_result.value()))) {
co_return translation_success::yes;
!max_translatable_offset
|| max_translatable_offset.value() < kafka::offset{0}) {
return;
}
auto offset_lag = max_translatable_offset.value()
- std::max(max_translated_offset, kafka::offset{-1});
_partition->probe().update_iceberg_translation_offset_lag(offset_lag);
}

void partition_translator::update_commit_lag(
std::optional<kafka::offset> max_committed_offset) const {
auto max_translatable_offset = max_offset_for_translation();
if (
!max_translatable_offset
|| max_translatable_offset.value() < kafka::offset{0}) {
return;
}
co_return translation_success::no;
auto offset_lag = max_translatable_offset.value()
- max_committed_offset.value_or(kafka::offset{-1});
_partition->probe().update_iceberg_commit_offset_lag(offset_lag);
}

ss::future<partition_translator::checkpoint_result>
Expand Down Expand Up @@ -409,6 +442,7 @@ partition_translator::reconcile_with_coordinator() {
vlog(_logger.warn, "reconciliation failed, response: {}", resp);
co_return std::nullopt;
}
update_commit_lag(resp.last_iceberg_committed_offset);
// No file entry signifies the translation was just enabled on the
// topic. In such a case we start translation from the local start
// of the log. The underlying assumption is that there is a reasonable
Expand Down
5 changes: 5 additions & 0 deletions src/v/datalake/translation/partition_translator.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ class partition_translator {

ss::future<> do_translate();

void
update_translation_lag(kafka::offset max_translated_kafka_offset) const;
void update_commit_lag(
std::optional<kafka::offset> max_committed_kafka_offset) const;

using translation_success = ss::bool_class<struct translation_success>;
ss::future<translation_success> do_translate_once(retry_chain_node& parent);
ss::future<model::record_batch_reader> make_reader();
Expand Down
Loading

0 comments on commit afe1a3f

Please sign in to comment.