Skip to content

Commit

Permalink
datalake: add translated_offset_range type, modify multiplexer to ret…
Browse files Browse the repository at this point in the history
…urn it

This adds a new data structure, translated_offset_range, to store a the
range of Kafka offsets translated into Parquet, as well as the locations
for the resulting Parquet files. It modifies the record_multiplexer to
return this new type.
  • Loading branch information
jcipar committed Oct 9, 2024
1 parent ee27a59 commit 8aa3cbb
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 20 deletions.
28 changes: 23 additions & 5 deletions src/v/datalake/data_writer_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,34 @@ enum class data_writer_error {
ok = 0,
parquet_conversion_error,
file_io_error,
no_data,
};

struct data_file_result
: serde::envelope<
data_file_result,
serde::version<0>,
serde::compat_version<0>> {
: serde::
envelope<data_file_result, serde::version<0>, serde::compat_version<0>> {
ss::sstring file_path = "";
size_t record_count = 0;
size_t file_size_bytes = 0;

auto serde_fields() { return std::tie(record_count); }
// TODO: add kafka schema id

friend std::ostream&
operator<<(std::ostream& o, const data_file_result& r) {
// TODO: do we have a preferred format for printing structures?
fmt::print(
o,
"data_file_result(file_path: {} record_count: {} file_size_bytes: "
"{})",
r.file_path,
r.record_count,
r.file_size_bytes);
return o;
}

auto serde_fields() {
return std::tie(file_path, record_count, file_size_bytes);
}
};

struct data_writer_error_category : std::error_category {
Expand All @@ -48,6 +64,8 @@ struct data_writer_error_category : std::error_category {
return "Parquet Conversion Error";
case data_writer_error::file_io_error:
return "File IO Error";
case data_writer_error::no_data:
return "No data";
}
}

Expand Down
31 changes: 23 additions & 8 deletions src/v/datalake/record_multiplexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ record_multiplexer::operator()(model::record_batch batch) {
+ record.offset_delta();
int64_t estimated_size = key.size_bytes() + val.size_bytes() + 16;

// TODO: we want to ensure we're using an offset translating reader so
// that these will be Kafka offsets, not Raft offsets.
if (!_result.has_value()) {
_result = translated_offset_range{};
_result.value().start_offset = kafka::offset(offset);
}
if (offset < _result.value().start_offset()) {
_result.value().start_offset = kafka::offset(offset);
}
if (offset > _result.value().start_offset()) {
_result.value().last_offset = kafka::offset(offset);
}

// Translate the record
auto& translator = get_translator();
iceberg::struct_value data = translator.translate_event(
Expand All @@ -69,23 +82,25 @@ record_multiplexer::operator()(model::record_batch batch) {
co_return ss::stop_iteration::no;
}

ss::future<result<chunked_vector<data_file_result>, data_writer_error>>
ss::future<result<translated_offset_range, data_writer_error>>
record_multiplexer::end_of_stream() {
if (_writer_status != data_writer_error::ok) {
co_return _writer_status;
}
// TODO: once we have multiple _writers this should be a loop
if (_writer) {
chunked_vector<data_file_result> ret;
auto res = co_await _writer->finish();
if (res.has_value()) {
ret.push_back(res.value());
co_return ret;
if (!_result.has_value()) {
co_return data_writer_error::no_data;
}
auto result_files = co_await _writer->finish();
if (result_files.has_value()) {
_result.value().files.push_back(result_files.value());
co_return std::move(_result.value());
} else {
co_return res.error();
co_return result_files.error();
}
} else {
co_return chunked_vector<data_file_result>{};
co_return data_writer_error::no_data;
}
}

Expand Down
19 changes: 18 additions & 1 deletion src/v/datalake/record_multiplexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,28 @@
#include "container/fragmented_vector.h"
#include "datalake/data_writer_interface.h"
#include "datalake/schemaless_translator.h"
#include "model/fundamental.h"
#include "model/record.h"
#include "serde/envelope.h"

#include <seastar/core/future.hh>

#include <memory>

namespace datalake {

struct translated_offset_range
: serde::envelope<
translated_offset_range,
serde::version<0>,
serde::compat_version<0>> {
kafka::offset start_offset;
kafka::offset last_offset;
chunked_vector<data_file_result> files;

auto serde_fields() { return std::tie(start_offset, last_offset, files); }
};

/*
Consumes logs and sends records to the appropriate translator
based on the schema ID. This is meant to be called with a
Expand All @@ -42,7 +56,8 @@ class record_multiplexer {
explicit record_multiplexer(
std::unique_ptr<data_writer_factory> writer_factory);
ss::future<ss::stop_iteration> operator()(model::record_batch batch);
ss::future<result<chunked_vector<data_file_result>, data_writer_error>>

ss::future<result<translated_offset_range, data_writer_error>>
end_of_stream();

private:
Expand All @@ -58,6 +73,8 @@ class record_multiplexer {
ss::shared_ptr<data_writer> _writer;

data_writer_error _writer_status = data_writer_error::ok;

std::optional<translated_offset_range> _result;
};

} // namespace datalake
23 changes: 19 additions & 4 deletions src/v/datalake/tests/gtest_record_multiplexer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "datalake/batching_parquet_writer.h"
#include "datalake/record_multiplexer.h"
#include "datalake/tests/test_data_writer.h"
#include "model/fundamental.h"
#include "model/tests/random_batch.h"
#include "test_utils/tmp_dir.h"

Expand All @@ -23,13 +24,15 @@
TEST(DatalakeMultiplexerTest, TestMultiplexer) {
int record_count = 10;
int batch_count = 10;
int start_offset = 1005;
auto writer_factory = std::make_unique<datalake::test_data_writer_factory>(
false);
datalake::record_multiplexer multiplexer(std::move(writer_factory));

model::test::record_batch_spec batch_spec;
batch_spec.records = record_count;
batch_spec.count = batch_count;
batch_spec.offset = model::offset{start_offset};
ss::circular_buffer<model::record_batch> batches
= model::test::make_random_batches(batch_spec).get();

Expand All @@ -43,8 +46,13 @@ TEST(DatalakeMultiplexerTest, TestMultiplexer) {
= reader.consume(std::move(multiplexer), model::no_timeout).get();

ASSERT_TRUE(result.has_value());
ASSERT_EQ(result.value().size(), 1);
EXPECT_EQ(result.value()[0].record_count, record_count * batch_count);
ASSERT_EQ(result.value().files.size(), 1);
EXPECT_EQ(result.value().files[0].record_count, record_count * batch_count);
EXPECT_EQ(result.value().start_offset(), start_offset);
// Subtract one since offsets end at 0, and this is an inclusive range.
EXPECT_EQ(
result.value().last_offset(),
start_offset + record_count * batch_count - 1);
}
TEST(DatalakeMultiplexerTest, TestMultiplexerWriteError) {
int record_count = 10;
Expand Down Expand Up @@ -79,6 +87,7 @@ TEST(DatalakeMultiplexerTest, WritesDataFiles) {

int record_count = 50;
int batch_count = 20;
int start_offset = 1005;

auto writer_factory
= std::make_unique<datalake::batching_parquet_writer_factory>(
Expand All @@ -88,6 +97,7 @@ TEST(DatalakeMultiplexerTest, WritesDataFiles) {
model::test::record_batch_spec batch_spec;
batch_spec.records = record_count;
batch_spec.count = batch_count;
batch_spec.offset = model::offset{start_offset};
ss::circular_buffer<model::record_batch> batches
= model::test::make_random_batches(batch_spec).get0();

Expand All @@ -101,8 +111,13 @@ TEST(DatalakeMultiplexerTest, WritesDataFiles) {
= reader.consume(std::move(multiplexer), model::no_timeout).get0();

ASSERT_TRUE(result.has_value());
ASSERT_EQ(result.value().size(), 1);
EXPECT_EQ(result.value()[0].record_count, record_count * batch_count);
ASSERT_EQ(result.value().files.size(), 1);
EXPECT_EQ(result.value().files[0].record_count, record_count * batch_count);
EXPECT_EQ(result.value().start_offset(), start_offset);
// Subtract one since offsets end at 0, and this is an inclusive range.
EXPECT_EQ(
result.value().last_offset(),
start_offset + record_count * batch_count - 1);

// Open the resulting file and check that it has data in it with the
// appropriate counts.
Expand Down
3 changes: 1 addition & 2 deletions src/v/datalake/tests/test_data_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ class test_data_writer : public data_writer {
return ss::make_ready_future<data_writer_error>(status);
}

ss::future<result<data_file_result, data_writer_error>>
finish() override {
ss::future<result<data_file_result, data_writer_error>> finish() override {
return ss::make_ready_future<
result<data_file_result, data_writer_error>>(_result);
}
Expand Down

0 comments on commit 8aa3cbb

Please sign in to comment.