Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datalake/coordinator: initial coordinator base #23712

Merged
merged 7 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/v/datalake/coordinator/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,29 @@ redpanda_cc_rpc_library(
include_prefix = "datalake/coordinator",
)

redpanda_cc_library(
name = "coordinator",
srcs = [
"coordinator.cc",
],
hdrs = [
"coordinator.h",
],
implementation_deps = [
"//src/v/base",
"//src/v/datalake:logger",
"//src/v/storage:record_batch_builder",
],
include_prefix = "datalake/coordinator",
deps = [
":state_update",
":stm",
"//src/v/container:fragmented_vector",
"//src/v/model",
"@seastar",
],
)

redpanda_cc_library(
name = "data_file",
srcs = [
Expand Down Expand Up @@ -71,11 +94,16 @@ redpanda_cc_library(
visibility = ["//visibility:public"],
deps = [
":model",
":state",
":state_update",
"//src/v/cluster",
"//src/v/datalake:logger",
"//src/v/model",
"//src/v/raft",
"//src/v/serde",
"//src/v/serde:enum",
"//src/v/ssx:future_util",
"@seastar",
],
)

Expand All @@ -95,6 +123,7 @@ redpanda_cc_library(
"//src/v/container:fragmented_vector",
"//src/v/model",
"//src/v/serde",
"//src/v/utils:named_type",
],
)

Expand Down
1 change: 1 addition & 0 deletions src/v/datalake/coordinator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ rpcgen(TARGET generated_datalake_coordinator_rpc
v_cc_library(
NAME datalake_coordinator
SRCS
coordinator.cc
data_file.cc
frontend.cc
service.cc
Expand Down
149 changes: 149 additions & 0 deletions src/v/datalake/coordinator/coordinator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#include "datalake/coordinator/coordinator.h"

#include "base/vlog.h"
#include "container/fragmented_vector.h"
#include "datalake/coordinator/state_update.h"
#include "datalake/logger.h"
#include "model/fundamental.h"
#include "model/record_batch_reader.h"
#include "storage/record_batch_builder.h"

namespace datalake::coordinator {

namespace {
coordinator::errc convert_stm_errc(coordinator_stm::errc e) {
switch (e) {
case coordinator_stm::errc::not_leader:
return coordinator::errc::not_leader;
case coordinator_stm::errc::shutting_down:
return coordinator::errc::shutting_down;
case coordinator_stm::errc::apply_error:
return coordinator::errc::stm_apply_error;
case coordinator_stm::errc::raft_error:
return coordinator::errc::timedout;
}
}
} // namespace

std::ostream& operator<<(std::ostream& o, coordinator::errc e) {
switch (e) {
case coordinator::errc::not_leader:
return o << "coordinator::errc::not_leader";
case coordinator::errc::shutting_down:
return o << "coordinator::errc::shutting_down";
case coordinator::errc::stm_apply_error:
return o << "coordinator::errc::stm_apply_error";
case coordinator::errc::timedout:
return o << "coordinator::errc::timedout";
}
}

ss::future<> coordinator::stop_and_wait() {
as_.request_abort();
return gate_.close();
}

checked<ss::gate::holder, coordinator::errc> coordinator::maybe_gate() {
ss::gate::holder h;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

h is unused? intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops! unintentional, may fix in a follow up if this PR can get merged

if (as_.abort_requested() || gate_.is_closed()) {
return errc::shutting_down;
}
return gate_.hold();
}

ss::future<checked<std::nullopt_t, coordinator::errc>>
coordinator::sync_add_files(
model::topic_partition tp, chunked_vector<translated_offset_range> entries) {
if (entries.empty()) {
vlog(datalake_log.debug, "Empty entry requested {}", tp);
co_return std::nullopt;
}
auto gate = maybe_gate();
if (gate.has_error()) {
co_return gate.error();
}
vlog(
datalake_log.debug,
"Sync add files requested {}: [{}, {}], {} files",
tp,
entries.begin()->start_offset,
entries.back().last_offset,
entries.size());
auto sync_res = co_await stm_.sync(10s);
if (sync_res.has_error()) {
co_return convert_stm_errc(sync_res.error());
}
auto added_last_offset = entries.back().last_offset;
auto update_res = add_files_update::build(
mmaslankaprv marked this conversation as resolved.
Show resolved Hide resolved
stm_.state(), tp, std::move(entries));
if (update_res.has_error()) {
// NOTE: rejection here is just an optimization -- the operation would
// fail to be applied to the STM anyway.
vlog(
datalake_log.debug,
"Rejecting request to add files for {}: {}",
tp,
update_res.error());
co_return errc::stm_apply_error;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this is a validation error? (since its pre replication).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the caller's perspective they should be the same thing, no? the request does not apply cleanly to the stm

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking from a debugging perspective, if the request was rejected due to a race or if was legit validation, nbd, feel free to keep it as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving it for now, i think the log is enough, unless these get serialized in the response and the only logs we have are the worker logs

}
storage::record_batch_builder builder(
model::record_batch_type::datalake_coordinator, model::offset{0});
builder.add_raw_kv(
serde::to_iobuf(add_files_update::key),
serde::to_iobuf(std::move(update_res.value())));
auto repl_res = co_await stm_.replicate_and_wait(
sync_res.value(), std::move(builder).build(), as_);
if (repl_res.has_error()) {
co_return convert_stm_errc(repl_res.error());
}

// Check that the resulting state matches that expected by the caller.
// NOTE: a mismatch here just means there was a race to update the STM, and
// this should be handled by callers.
// TODO: would be nice to encapsulate this in some update validator.
auto prt_opt = stm_.state().partition_state(tp);
if (
!prt_opt.has_value() || prt_opt->get().pending_entries.empty()
|| prt_opt->get().pending_entries.back().last_offset
!= added_last_offset) {
vlog(
datalake_log.debug,
"Resulting last offset for {} does not match expected {}",
tp,
added_last_offset);
co_return errc::stm_apply_error;
}
co_return std::nullopt;
}

ss::future<checked<std::optional<kafka::offset>, coordinator::errc>>
coordinator::sync_get_last_added_offset(model::topic_partition tp) {
auto gate = maybe_gate();
if (gate.has_error()) {
co_return gate.error();
}
auto sync_res = co_await stm_.sync(10s);
if (sync_res.has_error()) {
co_return convert_stm_errc(sync_res.error());
}
auto prt_state_opt = stm_.state().partition_state(tp);
if (!prt_state_opt.has_value()) {
co_return std::nullopt;
}
const auto& prt_state = prt_state_opt->get();
if (prt_state.pending_entries.empty()) {
co_return prt_state.last_committed;
}
co_return prt_state.pending_entries.back().last_offset;
}

} // namespace datalake::coordinator
50 changes: 50 additions & 0 deletions src/v/datalake/coordinator/coordinator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#pragma once

#include "container/fragmented_vector.h"
#include "datalake/coordinator/state_machine.h"
#include "datalake/coordinator/state_update.h"
#include "model/fundamental.h"

namespace datalake::coordinator {

// Public interface that provides access to the coordinator STM. Conceptually,
// the STM focuses solely on persisting deterministic updates, while this:
// 1. wrangles additional aspects of these updates like concurrency, and
// 2. reconciles the STM state with external catalogs.
class coordinator {
public:
enum class errc {
not_leader,
stm_apply_error,
timedout,
shutting_down,
};
explicit coordinator(coordinator_stm& stm)
: stm_(stm) {}

ss::future<> stop_and_wait();
ss::future<checked<std::nullopt_t, errc>> sync_add_files(
model::topic_partition tp, chunked_vector<translated_offset_range>);
ss::future<checked<std::optional<kafka::offset>, errc>>
sync_get_last_added_offset(model::topic_partition tp);

private:
checked<ss::gate::holder, errc> maybe_gate();

coordinator_stm& stm_;

ss::gate gate_;
ss::abort_source as_;
};
std::ostream& operator<<(std::ostream&, coordinator::errc);

} // namespace datalake::coordinator
103 changes: 101 additions & 2 deletions src/v/datalake/coordinator/state_machine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,114 @@

#include "datalake/coordinator/state_machine.h"

#include "datalake/coordinator/state.h"
#include "datalake/coordinator/state_update.h"
#include "datalake/logger.h"
#include "model/record_batch_types.h"
#include "serde/async.h"
#include "ssx/future-util.h"

#include <seastar/coroutine/as_future.hh>

namespace datalake::coordinator {

namespace {
void maybe_log_update_error(
update_key key,
model::offset o,
const checked<std::nullopt_t, stm_update_error>& r) {
if (r.has_value()) {
return;
}
// NOTE: inability to update the STM is not necessarily a bug! It indicates
// that this update's construction raced with another update that broke an
// invariant required to apply update. Expectation is that this update's
// caller constructs a new update and tries again if needed.
vlog(
datalake_log.debug,
"Coordinator STM {} update at offset {} didn't apply: {}",
key,
o,
r.error());
}
} // namespace

coordinator_stm::coordinator_stm(ss::logger& logger, raft::consensus* raft)
: raft::persisted_stm<>("datalake_coordinator_stm.snapshot", logger, raft) {}

ss::future<> coordinator_stm::do_apply(const model::record_batch&) {
co_return;
ss::future<checked<model::term_id, coordinator_stm::errc>>
coordinator_stm::sync(model::timeout_clock::duration timeout) {
auto sync_res = co_await ss::coroutine::as_future(
raft::persisted_stm<>::sync(timeout));
if (sync_res.failed()) {
auto eptr = sync_res.get_exception();
auto msg = fmt::format("Exception caught while syncing: {}", eptr);
if (ssx::is_shutdown_exception(eptr)) {
vlog(datalake_log.debug, "Ignoring shutdown error: {}", msg);
co_return errc::shutting_down;
}
vlog(datalake_log.warn, "{}", msg);
co_return errc::raft_error;
}

if (!sync_res.get()) {
co_return errc::not_leader;
}
// At this point we're guaranteed that this node is the leader and that the
// STM has been caught up in the current term before (this doesn't mean the
// STM is currently caught up right now though!)
co_return _insync_term;
}

ss::future<checked<std::nullopt_t, coordinator_stm::errc>>
andrwng marked this conversation as resolved.
Show resolved Hide resolved
coordinator_stm::replicate_and_wait(
model::term_id term, model::record_batch batch, ss::abort_source& as) {
auto opts = raft::replicate_options{raft::consistency_level::quorum_ack};
opts.set_force_flush();
auto res = co_await _raft->replicate(
term, model::make_memory_record_batch_reader(std::move(batch)), opts);
if (res.has_error()) {
co_return errc::raft_error;
}
auto replicated_offset = res.value().last_offset;
co_await wait_no_throw(
replicated_offset, ss::lowres_clock::now() + 30s, as);
co_return std::nullopt;
}

ss::future<> coordinator_stm::do_apply(const model::record_batch& b) {
if (b.header().type != model::record_batch_type::datalake_coordinator) {
co_return;
}
auto iter = model::record_batch_iterator::create(b);
while (iter.has_next()) {
andrwng marked this conversation as resolved.
Show resolved Hide resolved
auto r = iter.next();
iobuf_parser key_p{r.release_key()};
auto key = co_await serde::read_async<update_key>(key_p);

iobuf_parser val_p{r.release_value()};
auto offset = b.base_offset() + model::offset_delta{r.offset_delta()};
switch (key) {
case update_key::add_files: {
auto update = co_await serde::read_async<add_files_update>(val_p);
auto res = update.apply(state_);
maybe_log_update_error(key, offset, res);
continue;
}
case update_key::mark_files_committed: {
auto update
= co_await serde::read_async<mark_files_committed_update>(val_p);
auto res = update.apply(state_);
maybe_log_update_error(key, offset, res);
continue;
}
}
vlog(
datalake_log.error,
"Unknown datalake coordinator STM record type: {}",
key,
b.header());
}
}

model::offset coordinator_stm::max_collectible_offset() { return {}; }
Expand Down
Loading