Skip to content

Commit

Permalink
Merge pull request #23707 from andrwng/datalake-rpc-types
Browse files Browse the repository at this point in the history
datalake/coordinator: iron out RPC types
  • Loading branch information
andrwng authored Oct 9, 2024
2 parents 95f22f1 + 80a5965 commit 5bc0e62
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 38 deletions.
14 changes: 10 additions & 4 deletions src/v/datalake/coordinator/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ redpanda_cc_rpc_library(

redpanda_cc_library(
name = "data_file",
srcs = [
"data_file.cc",
],
hdrs = [
"data_file.h",
],
Expand All @@ -31,12 +34,11 @@ redpanda_cc_library(
include_prefix = "datalake/coordinator",
visibility = [":__subpackages__"],
deps = [
"//src/v/serde",
"//src/v/serde:enum",
":translated_offset_range",
"//src/v/datalake:types",
"//src/v/model",
# todo: split writer further once it evolves
"//src/v/datalake:writer",
"//src/v/serde",
"//src/v/serde:enum",
],
)

Expand Down Expand Up @@ -98,6 +100,9 @@ redpanda_cc_library(

redpanda_cc_library(
name = "translated_offset_range",
srcs = [
"translated_offset_range.cc",
],
hdrs = [
"translated_offset_range.h",
],
Expand All @@ -106,6 +111,7 @@ redpanda_cc_library(
":data_file",
"//src/v/base",
"//src/v/container:fragmented_vector",
"//src/v/model",
"//src/v/serde",
"@fmt",
],
Expand Down
2 changes: 2 additions & 0 deletions src/v/datalake/coordinator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ rpcgen(TARGET generated_datalake_coordinator_rpc
v_cc_library(
NAME datalake_coordinator
SRCS
data_file.cc
frontend.cc
service.cc
state.cc
state_machine.cc
state_update.cc
translated_offset_range.cc
DEPS
generated_datalake_coordinator_rpc
v::cluster
Expand Down
24 changes: 24 additions & 0 deletions src/v/datalake/coordinator/data_file.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#include "datalake/coordinator/data_file.h"

namespace datalake::coordinator {

std::ostream& operator<<(std::ostream& o, const data_file& f) {
o << fmt::format(
"{{remote_path: {}, row_count: {}, file_size_bytes: {}, hour: {}}}",
f.remote_path,
f.row_count,
f.file_size_bytes,
f.hour);
return o;
}

} // namespace datalake::coordinator
15 changes: 5 additions & 10 deletions src/v/datalake/coordinator/data_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,15 @@ namespace datalake::coordinator {
struct data_file
: serde::envelope<data_file, serde::version<0>, serde::compat_version<0>> {
auto serde_fields() {
return std::tie(remote_path, row_count, file_size_bytes);
return std::tie(remote_path, row_count, file_size_bytes, hour);
}
ss::sstring remote_path = "";
size_t row_count = 0;
size_t file_size_bytes = 0;
int hour = 0;
// TODO: add kafka schema id

friend std::ostream& operator<<(std::ostream& o, const data_file& f) {
o << fmt::format(
"{{remote_path: {}, row_count: {}, file_size_bytes: {}}}",
f.remote_path,
f.row_count,
f.file_size_bytes);
return o;
}
};

std::ostream& operator<<(std::ostream& o, const data_file& f);

} // namespace datalake::coordinator
23 changes: 23 additions & 0 deletions src/v/datalake/coordinator/translated_offset_range.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#include "datalake/coordinator/translated_offset_range.h"

namespace datalake::coordinator {

std::ostream& operator<<(std::ostream& o, const translated_offset_range& r) {
o << fmt::format(
"{{start_offset: {}, last_offset: {}, files: {}}}",
r.start_offset,
r.last_offset,
r.files);
return o;
}

} // namespace datalake::coordinator
12 changes: 2 additions & 10 deletions src/v/datalake/coordinator/translated_offset_range.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,8 @@ struct translated_offset_range
// Last Kafka offset (inclusive) represented in this range.
kafka::offset last_offset;
chunked_vector<data_file> files;

friend std::ostream&
operator<<(std::ostream& o, const translated_offset_range& r) {
o << fmt::format(
"{{start_offset: {}, last_offset: {}, files: {}}}",
r.start_offset,
r.last_offset,
r.files);
return o;
}
};

std::ostream& operator<<(std::ostream& o, const translated_offset_range& r);

} // namespace datalake::coordinator
22 changes: 8 additions & 14 deletions src/v/datalake/coordinator/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#pragma once

#include "container/fragmented_vector.h"
#include "datalake/data_writer_interface.h"
#include "datalake/coordinator/translated_offset_range.h"
#include "datalake/errors.h"
#include "model/fundamental.h"
#include "serde/rw/enum.h"
Expand All @@ -24,19 +24,12 @@ struct translated_data_file_entry
serde::version<0>,
serde::compat_version<0>> {
model::topic_partition tp;
// inclusive offset range
model::offset begin_offset;
model::offset end_offset;
// term of the leader that performed this
// translation
model::term_id translator_term;

data_writer_result translation_result;
// Translated data files, expected to be contiguous, with no gaps or
// overlaps, ordered in increasing offset order.
chunked_vector<translated_offset_range> translated_ranges;

auto serde_fields() {
return std::tie(
tp, begin_offset, end_offset, translator_term, translation_result);
}
auto serde_fields() { return std::tie(tp, translated_ranges); }
};

struct add_translated_data_files_reply
Expand Down Expand Up @@ -84,12 +77,13 @@ struct fetch_latest_data_file_reply
explicit fetch_latest_data_file_reply(coordinator_errc err)
: errc(err) {}

std::optional<translated_data_file_entry> entry;
// The offset of the latest data file added to the coordinator.
std::optional<kafka::offset> last_added_offset;

// If not ok, the request processing has a problem.
coordinator_errc errc;

auto serde_fields() { return std::tie(entry, errc); }
auto serde_fields() { return std::tie(last_added_offset, errc); }
};

struct fetch_latest_data_file_request
Expand Down

0 comments on commit 5bc0e62

Please sign in to comment.