Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/v/datalake/coordinator/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ redpanda_cc_library(
":state_update",
"//src/v/base",
"//src/v/container:fragmented_vector",
"//src/v/iceberg:table_identifier",
"@seastar",
],
)
Expand Down Expand Up @@ -200,6 +201,7 @@ redpanda_cc_library(
"//src/v/container:fragmented_vector",
"//src/v/iceberg:catalog",
"//src/v/iceberg:manifest_io",
"//src/v/iceberg:table_identifier",
"@seastar",
],
)
Expand Down
123 changes: 112 additions & 11 deletions src/v/datalake/coordinator/coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <seastar/util/defer.hh>

#include <exception>
#include <optional>

namespace datalake::coordinator {

Expand Down Expand Up @@ -266,6 +267,9 @@ coordinator::sync_ensure_table_exists(
check_res.error());
co_return errc::revision_mismatch;
}

// Will skip the STM update if the topic is already live.
// But will still ensure the DLQ table schema.
if (check_res.value()) {
// update is non-trivial
storage::record_batch_builder builder(
Expand Down Expand Up @@ -311,6 +315,79 @@ coordinator::sync_ensure_table_exists(
co_return std::nullopt;
}

ss::future<checked<std::nullopt_t, coordinator::errc>>
coordinator::sync_ensure_dlq_table_exists(
model::topic topic, model::revision_id topic_revision) {
auto gate = maybe_gate();
if (gate.has_error()) {
co_return gate.error();
}

vlog(
datalake_log.debug,
"Sync ensure dlq table exists requested, topic: {} rev: {}",
topic,
topic_revision);

auto sync_res = co_await stm_->sync(10s);
if (sync_res.has_error()) {
co_return convert_stm_errc(sync_res.error());
}

// TODO: add mutex to protect against the thundering herd problem

topic_lifecycle_update update{
.topic = topic,
.revision = topic_revision,
.new_state = topic_state::lifecycle_state_t::live,
};
auto check_res = update.can_apply(stm_->state());
if (check_res.has_error()) {
vlog(
datalake_log.debug,
"Rejecting ensure_dlq_table_exist for {} rev {}: {}",
topic,
topic_revision,
check_res.error());
co_return errc::revision_mismatch;
}

// Will skip the STM update if the topic is already live.
// But will still ensure the DLQ table schema.
if (check_res.value()) {
Comment thread
nvartolomei marked this conversation as resolved.
// update is non-trivial
storage::record_batch_builder builder(
model::record_batch_type::datalake_coordinator, model::offset{0});
builder.add_raw_kv(
serde::to_iobuf(topic_lifecycle_update::key),
serde::to_iobuf(std::move(update)));
auto repl_res = co_await stm_->replicate_and_wait(
sync_res.value(), std::move(builder).build(), as_);
if (repl_res.has_error()) {
co_return convert_stm_errc(repl_res.error());
}
}

// TODO: verify stm state after replication

auto dlq_table_id = table_id_provider::dlq_table_id(topic);
auto record_type = key_value_translator{}.build_type(std::nullopt);
auto ensure_res = co_await schema_mgr_.ensure_table_schema(
dlq_table_id, record_type.type, hour_partition_spec());
if (ensure_res.has_error()) {
switch (ensure_res.error()) {
case schema_manager::errc::not_supported:
co_return errc::incompatible_schema;
case schema_manager::errc::failed:
co_return errc::failed;
case schema_manager::errc::shutting_down:
co_return errc::shutting_down;
}
}

co_return std::nullopt;
}

ss::future<checked<std::nullopt_t, coordinator::errc>>
coordinator::sync_add_files(
model::topic_partition tp,
Expand Down Expand Up @@ -525,17 +602,41 @@ coordinator::update_lifecycle_state(
if (tombstone_it != topic_table_.get_iceberg_tombstones().end()) {
auto tombstone_rev = tombstone_it->second.last_deleted_revision;
if (tombstone_rev >= topic.revision) {
auto drop_res = co_await file_committer_.drop_table(t);
if (drop_res.has_error()) {
switch (drop_res.error()) {
case file_committer::errc::shutting_down:
co_return errc::shutting_down;
case file_committer::errc::failed:
vlog(
datalake_log.warn,
"failed to drop table for topic {}",
t);
co_return ss::stop_iteration::yes;
// Drop the main table if it exists.
{
auto table_id = table_id_provider::table_id(t);
auto drop_res = co_await file_committer_.drop_table(
table_id);
if (drop_res.has_error()) {
switch (drop_res.error()) {
case file_committer::errc::shutting_down:
co_return errc::shutting_down;
case file_committer::errc::failed:
vlog(
datalake_log.warn,
"failed to drop table for topic {}",
t);
co_return ss::stop_iteration::yes;
}
}
}

// Drop the DLQ table if it exists.
{
auto dlq_table_id = table_id_provider::dlq_table_id(t);
Comment thread
nvartolomei marked this conversation as resolved.
auto drop_res = co_await file_committer_.drop_table(
dlq_table_id);
if (drop_res.has_error()) {
switch (drop_res.error()) {
case file_committer::errc::shutting_down:
co_return errc::shutting_down;
case file_committer::errc::failed:
vlog(
datalake_log.warn,
"failed to drop dlq table for topic {}",
t);
co_return ss::stop_iteration::yes;
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/v/datalake/coordinator/coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class coordinator {
model::revision_id topic_revision,
record_schema_components);

ss::future<checked<std::nullopt_t, errc>> sync_ensure_dlq_table_exists(
model::topic topic, model::revision_id topic_revision);

ss::future<checked<std::nullopt_t, errc>> sync_add_files(
model::topic_partition tp,
model::revision_id topic_revision,
Expand Down
3 changes: 2 additions & 1 deletion src/v/datalake/coordinator/file_committer.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "base/seastarx.h"
#include "container/fragmented_vector.h"
#include "datalake/coordinator/state_update.h"
#include "iceberg/table_identifier.h"

#include <seastar/core/future.hh>

Expand All @@ -28,7 +29,7 @@ class file_committer {
commit_topic_files_to_catalog(model::topic, const topics_state&) const = 0;

virtual ss::future<checked<std::nullopt_t, errc>>
drop_table(const model::topic&) const = 0;
drop_table(const iceberg::table_identifier&) const = 0;

virtual ~file_committer() = default;
};
Expand Down
42 changes: 42 additions & 0 deletions src/v/datalake/coordinator/frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ ss::future<ensure_table_exists_reply> do_ensure_table_exists(
}
co_return ensure_table_exists_reply{errc::ok};
}
ss::future<ensure_dlq_table_exists_reply> do_ensure_dlq_table_exists(
coordinator_manager& mgr,
model::ntp coordinator_ntp,
ensure_dlq_table_exists_request req) {
auto crd = mgr.get(coordinator_ntp);
if (!crd) {
co_return ensure_dlq_table_exists_reply{errc::not_leader};
}
auto ret = co_await crd->sync_ensure_dlq_table_exists(
req.topic, req.topic_revision);
if (ret.has_error()) {
co_return to_rpc_errc(ret.error());
}
co_return ensure_dlq_table_exists_reply{errc::ok};
}
ss::future<add_translated_data_files_reply> add_files(
coordinator_manager& mgr,
model::ntp coordinator_ntp,
Expand Down Expand Up @@ -302,6 +317,33 @@ ss::future<ensure_table_exists_reply> frontend::ensure_table_exists(
&client::ensure_table_exists>(std::move(request), bool(local_only_exec));
}

ss::future<ensure_dlq_table_exists_reply>
frontend::ensure_dlq_table_exists_locally(
ensure_dlq_table_exists_request request,
const model::ntp& coordinator_partition,
ss::shard_id shard) {
co_return co_await _coordinator_mgr->invoke_on(
shard,
[coordinator_partition,
req = std::move(request)](coordinator_manager& mgr) mutable {
auto partition = mgr.get(coordinator_partition);
if (!partition) {
return ssx::now(ensure_dlq_table_exists_reply{errc::not_leader});
}
return do_ensure_dlq_table_exists(
mgr, coordinator_partition, std::move(req));
});
}

ss::future<ensure_dlq_table_exists_reply> frontend::ensure_dlq_table_exists(
ensure_dlq_table_exists_request request, local_only local_only_exec) {
auto holder = _gate.hold();
co_return co_await process<
&frontend::ensure_dlq_table_exists_locally,
&client::ensure_dlq_table_exists>(
std::move(request), bool(local_only_exec));
}

ss::future<add_translated_data_files_reply>
frontend::add_translated_data_files_locally(
add_translated_data_files_request request,
Expand Down
8 changes: 8 additions & 0 deletions src/v/datalake/coordinator/frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class frontend : public ss::peering_sharded_service<frontend> {
ss::future<ensure_table_exists_reply> ensure_table_exists(
ensure_table_exists_request, local_only = local_only::no);

ss::future<ensure_dlq_table_exists_reply> ensure_dlq_table_exists(
ensure_dlq_table_exists_request, local_only = local_only::no);

ss::future<add_translated_data_files_reply> add_translated_data_files(
add_translated_data_files_request, local_only = local_only::no);

Expand Down Expand Up @@ -95,6 +98,11 @@ class frontend : public ss::peering_sharded_service<frontend> {
const model::ntp& coordinator_partition,
ss::shard_id);

ss::future<ensure_dlq_table_exists_reply> ensure_dlq_table_exists_locally(
ensure_dlq_table_exists_request,
const model::ntp& coordinator_partition,
ss::shard_id);

ss::future<add_translated_data_files_reply>
add_translated_data_files_locally(
add_translated_data_files_request,
Expand Down
5 changes: 3 additions & 2 deletions src/v/datalake/coordinator/iceberg_file_committer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "iceberg/manifest_entry.h"
#include "iceberg/manifest_io.h"
#include "iceberg/partition_key.h"
#include "iceberg/table_identifier.h"
#include "iceberg/table_metadata.h"
#include "iceberg/transaction.h"
#include "iceberg/values.h"
Expand Down Expand Up @@ -332,8 +333,8 @@ iceberg_file_committer::commit_topic_files_to_catalog(
}

ss::future<checked<std::nullopt_t, file_committer::errc>>
iceberg_file_committer::drop_table(const model::topic& topic) const {
auto table_id = table_id_provider::table_id(topic);
iceberg_file_committer::drop_table(
const iceberg::table_identifier& table_id) const {
auto drop_res = co_await catalog_.drop_table(table_id, true);
if (
drop_res.has_error()
Expand Down
2 changes: 1 addition & 1 deletion src/v/datalake/coordinator/iceberg_file_committer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class iceberg_file_committer : public file_committer {
model::topic, const topics_state&) const final;

ss::future<checked<std::nullopt_t, errc>>
drop_table(const model::topic&) const final;
drop_table(const iceberg::table_identifier&) const final;

private:
ss::future<checked<iceberg::table_metadata, errc>>
Expand Down
5 changes: 5 additions & 0 deletions src/v/datalake/coordinator/rpc.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
"input_type": "ensure_table_exists_request",
"output_type": "ensure_table_exists_reply"
},
{
"name": "ensure_dlq_table_exists",
"input_type": "ensure_dlq_table_exists_request",
"output_type": "ensure_dlq_table_exists_reply"
},
{
"name": "add_translated_data_files",
"input_type": "add_translated_data_files_request",
Expand Down
1 change: 1 addition & 0 deletions src/v/datalake/coordinator/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ redpanda_test_cc_library(
"//src/v/datalake/coordinator:file_committer",
"//src/v/datalake/coordinator:state",
"//src/v/datalake/coordinator:translated_offset_range",
"//src/v/iceberg:table_identifier",
"//src/v/model",
"@googletest//:gtest",
],
Expand Down
2 changes: 1 addition & 1 deletion src/v/datalake/coordinator/tests/coordinator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class noop_file_committer : public file_committer {
}

ss::future<checked<std::nullopt_t, errc>>
drop_table(const model::topic&) const final {
drop_table(const iceberg::table_identifier&) const final {
co_return std::nullopt;
}

Expand Down
2 changes: 1 addition & 1 deletion src/v/datalake/coordinator/tests/state_test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class simple_file_committer : public file_committer {
}

ss::future<checked<std::nullopt_t, errc>>
drop_table(const model::topic&) const final {
drop_table(const iceberg::table_identifier&) const final {
co_return std::nullopt;
}

Expand Down
44 changes: 44 additions & 0 deletions src/v/datalake/coordinator/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,50 @@ struct ensure_table_exists_request
}
};

struct ensure_dlq_table_exists_reply
: serde::envelope<
ensure_dlq_table_exists_reply,
serde::version<0>,
serde::compat_version<0>> {
using rpc_adl_exempt = std::true_type;

ensure_dlq_table_exists_reply() = default;
explicit ensure_dlq_table_exists_reply(errc err)
: errc(err) {}

friend std::ostream&
operator<<(std::ostream&, const ensure_dlq_table_exists_reply&);

errc errc;

auto serde_fields() { return std::tie(errc); }
};

struct ensure_dlq_table_exists_request
: serde::envelope<
ensure_dlq_table_exists_request,
serde::version<0>,
serde::compat_version<0>> {
using rpc_adl_exempt = std::true_type;
using resp_t = ensure_dlq_table_exists_reply;

ensure_dlq_table_exists_request() = default;
ensure_dlq_table_exists_request(
model::topic topic, model::revision_id topic_revision)
: topic(std::move(topic))
, topic_revision(topic_revision) {}

model::topic topic;
model::revision_id topic_revision;

friend std::ostream&
operator<<(std::ostream&, const ensure_dlq_table_exists_request&);

const model::topic& get_topic() const { return topic; }

auto serde_fields() { return std::tie(topic, topic_revision); }
};

struct add_translated_data_files_reply
: serde::envelope<
add_translated_data_files_reply,
Expand Down
Loading