Skip to content

Commit

Permalink
dl/coordinator: add initial coordinator
Browse files Browse the repository at this point in the history
Adds some initial structure for the coordinator. This commit just adds
wrapper around a couple of calls to the underlying STM, so that it can
be plugged into the frontend.

Later commits will have the coordinator reconcile the STM with the
Iceberg catalog, perform local/Raft snapshots, etc.
  • Loading branch information
andrwng committed Oct 10, 2024
1 parent fa8c137 commit 46340ac
Show file tree
Hide file tree
Showing 7 changed files with 608 additions and 0 deletions.
23 changes: 23 additions & 0 deletions src/v/datalake/coordinator/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,29 @@ redpanda_cc_rpc_library(
include_prefix = "datalake/coordinator",
)

redpanda_cc_library(
name = "coordinator",
srcs = [
"coordinator.cc",
],
hdrs = [
"coordinator.h",
],
implementation_deps = [
"//src/v/base",
"//src/v/datalake:logger",
"//src/v/storage:record_batch_builder",
],
include_prefix = "datalake/coordinator",
deps = [
":state_update",
":stm",
"//src/v/container:fragmented_vector",
"//src/v/model",
"@seastar",
],
)

redpanda_cc_library(
name = "data_file",
srcs = [
Expand Down
1 change: 1 addition & 0 deletions src/v/datalake/coordinator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ rpcgen(TARGET generated_datalake_coordinator_rpc
v_cc_library(
NAME datalake_coordinator
SRCS
coordinator.cc
data_file.cc
frontend.cc
service.cc
Expand Down
149 changes: 149 additions & 0 deletions src/v/datalake/coordinator/coordinator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#include "datalake/coordinator/coordinator.h"

#include "base/vlog.h"
#include "container/fragmented_vector.h"
#include "datalake/coordinator/state_update.h"
#include "datalake/logger.h"
#include "model/fundamental.h"
#include "model/record_batch_reader.h"
#include "storage/record_batch_builder.h"

namespace datalake::coordinator {

namespace {
coordinator::errc convert_stm_errc(coordinator_stm::errc e) {
switch (e) {
case coordinator_stm::errc::not_leader:
return coordinator::errc::not_leader;
case coordinator_stm::errc::shutting_down:
return coordinator::errc::shutting_down;
case coordinator_stm::errc::apply_error:
return coordinator::errc::stm_apply_error;
case coordinator_stm::errc::raft_error:
return coordinator::errc::timedout;
}
}
} // namespace

std::ostream& operator<<(std::ostream& o, coordinator::errc e) {
switch (e) {
case coordinator::errc::not_leader:
return o << "coordinator::errc::not_leader";
case coordinator::errc::shutting_down:
return o << "coordinator::errc::shutting_down";
case coordinator::errc::stm_apply_error:
return o << "coordinator::errc::stm_apply_error";
case coordinator::errc::timedout:
return o << "coordinator::errc::timedout";
}
}

ss::future<> coordinator::stop_and_wait() {
as_.request_abort();
return gate_.close();
}

checked<ss::gate::holder, coordinator::errc> coordinator::maybe_gate() {
ss::gate::holder h;
if (as_.abort_requested() || gate_.is_closed()) {
return errc::shutting_down;
}
return gate_.hold();
}

ss::future<checked<std::nullopt_t, coordinator::errc>>
coordinator::sync_add_files(
model::topic_partition tp, chunked_vector<translated_offset_range> entries) {
if (entries.empty()) {
vlog(datalake_log.debug, "Empty entry requested {}", tp);
co_return std::nullopt;
}
auto gate = maybe_gate();
if (gate.has_error()) {
co_return gate.error();
}
vlog(
datalake_log.debug,
"Sync add files requested {}: [{}, {}], {} files",
tp,
entries.begin()->start_offset,
entries.back().last_offset,
entries.size());
auto sync_res = co_await stm_.sync(10s);
if (sync_res.has_error()) {
co_return convert_stm_errc(sync_res.error());
}
auto added_last_offset = entries.back().last_offset;
auto update_res = add_files_update::build(
stm_.state(), tp, std::move(entries));
if (update_res.has_error()) {
// NOTE: rejection here is just an optimization -- the operation would
// fail to be applied to the STM anyway.
vlog(
datalake_log.debug,
"Rejecting request to add files for {}: {}",
tp,
update_res.error());
co_return errc::stm_apply_error;
}
storage::record_batch_builder builder(
model::record_batch_type::datalake_coordinator, model::offset{0});
builder.add_raw_kv(
serde::to_iobuf(add_files_update::key),
serde::to_iobuf(std::move(update_res.value())));
auto repl_res = co_await stm_.replicate_and_wait(
sync_res.value(), std::move(builder).build(), as_);
if (repl_res.has_error()) {
co_return convert_stm_errc(repl_res.error());
}

// Check that the resulting state matches that expected by the caller.
// NOTE: a mismatch here just means there was a race to update the STM, and
// this should be handled by callers.
// TODO: would be nice to encapsulate this in some update validator.
auto prt_opt = stm_.state().partition_state(tp);
if (
!prt_opt.has_value() || prt_opt->get().pending_entries.empty()
|| prt_opt->get().pending_entries.back().last_offset
!= added_last_offset) {
vlog(
datalake_log.debug,
"Resulting last offset for {} does not match expected {}",
tp,
added_last_offset);
co_return errc::stm_apply_error;
}
co_return std::nullopt;
}

ss::future<checked<std::optional<kafka::offset>, coordinator::errc>>
coordinator::sync_get_last_added_offset(model::topic_partition tp) {
auto gate = maybe_gate();
if (gate.has_error()) {
co_return gate.error();
}
auto sync_res = co_await stm_.sync(10s);
if (sync_res.has_error()) {
co_return convert_stm_errc(sync_res.error());
}
auto prt_state_opt = stm_.state().partition_state(tp);
if (!prt_state_opt.has_value()) {
co_return std::nullopt;
}
const auto& prt_state = prt_state_opt->get();
if (prt_state.pending_entries.empty()) {
co_return prt_state.last_committed;
}
co_return prt_state.pending_entries.back().last_offset;
}

} // namespace datalake::coordinator
50 changes: 50 additions & 0 deletions src/v/datalake/coordinator/coordinator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#pragma once

#include "container/fragmented_vector.h"
#include "datalake/coordinator/state_machine.h"
#include "datalake/coordinator/state_update.h"
#include "model/fundamental.h"

namespace datalake::coordinator {

// Public interface that provides access to the coordinator STM. Conceptually,
// the STM focuses solely on persisting deterministic updates, while this:
// 1. wrangles additional aspects of these updates like concurrency, and
// 2. reconciles the STM state with external catalogs.
class coordinator {
public:
enum class errc {
not_leader,
stm_apply_error,
timedout,
shutting_down,
};
explicit coordinator(coordinator_stm& stm)
: stm_(stm) {}

ss::future<> stop_and_wait();
ss::future<checked<std::nullopt_t, errc>> sync_add_files(
model::topic_partition tp, chunked_vector<translated_offset_range>);
ss::future<checked<std::optional<kafka::offset>, errc>>
sync_get_last_added_offset(model::topic_partition tp);

private:
checked<ss::gate::holder, errc> maybe_gate();

coordinator_stm& stm_;

ss::gate gate_;
ss::abort_source as_;
};
std::ostream& operator<<(std::ostream&, coordinator::errc);

} // namespace datalake::coordinator
22 changes: 22 additions & 0 deletions src/v/datalake/coordinator/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,28 @@ redpanda_test_cc_library(
],
)

redpanda_cc_gtest(
name = "coordinator_test",
timeout = "short",
srcs = [
"coordinator_test.cc",
],
cpu = 1,
deps = [
":state_test_utils",
"//src/v/datalake:logger",
"//src/v/datalake/coordinator",
"//src/v/datalake/coordinator:state",
"//src/v/datalake/coordinator:stm",
"//src/v/datalake/coordinator:translated_offset_range",
"//src/v/raft/tests:raft_fixture",
"//src/v/random:generators",
"//src/v/test_utils:gtest",
"@googletest//:gtest",
"@seastar",
],
)

redpanda_cc_gtest(
name = "state_update_test",
timeout = "short",
Expand Down
2 changes: 2 additions & 0 deletions src/v/datalake/coordinator/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ rp_test(
GTEST
BINARY_NAME coordinator
SOURCES
coordinator_test.cc
state_update_test.cc
LIBRARIES
v::datalake_coordinator
v::datalake_coordinator_test_utils
v::gtest_main
v::raft_fixture
LABELS datalake
ARGS "-- -c 1"
)
Expand Down
Loading

0 comments on commit 46340ac

Please sign in to comment.