Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iceberg: add batching parquet writer factory #23683

Merged
merged 7 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/v/datalake/batching_parquet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <seastar/coroutine/as_future.hh>

#include <cstdint>
#include <memory>
#include <utility>

namespace datalake {

Expand Down Expand Up @@ -159,4 +161,29 @@ ss::future<> batching_parquet_writer::abort() {
}
}

batching_parquet_writer_factory::batching_parquet_writer_factory(
std::filesystem::path local_directory,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Architecturally, I believe it will be simpler to keep these parquet files in memory. AFAIK we're going to load these fully into memory already to send them, and it's fairly straightforward to have a semaphore to make sure we stay within the subsystem's memory budget. Writing to disk, especially with our trends of smaller and smaller disks, is going to come with a number of challenges: cleaning up zombie files, integrating with space management, etc.

Now we can do this after the beta phase, but we should keep this in mind as we're structuring the codepaths.

However

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we do want to stream directly to s3 eventually. I'd rather stick with this for now so we can get an end-to-end test asap.

FWIW, we can stream from disk to S3, we don't need to load it all into memory first. cloud_storage::remote::upload_controller_snapshot is an example of that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

local_directory

todo for future: As we chatted offline, this is internal to the writer, so it should self manage this path and its lifecycle

ss::sstring file_name_prefix,
size_t row_count_threshold,
size_t byte_count_threshold)
: _local_directory{std::move(local_directory)}
, _file_name_prefix{std::move(file_name_prefix)}
, _row_count_threshold{row_count_threshold}
, _byte_count_treshold{byte_count_threshold} {}

ss::future<std::unique_ptr<data_writer>>
batching_parquet_writer_factory::create_writer(iceberg::struct_type schema) {
auto ret = std::make_unique<batching_parquet_writer>(
std::move(schema), _row_count_threshold, _byte_count_treshold);
std::string filename = fmt::format(
"{}-{}.parquet", _file_name_prefix, uuid_t::create());
std::filesystem::path file_path = _local_directory / filename;
auto err = co_await ret->initialize(file_path);
if (err != data_writer_error::ok) {
// FIXME: This method should return a result and let the multiplexer
// deal with it appropriately
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have thrown at this level instead of the next layer up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean by "this level" you mean throw an exception up to the multiplexer?

co_return nullptr;
}
co_return ret;
}
} // namespace datalake
19 changes: 19 additions & 0 deletions src/v/datalake/batching_parquet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <base/seastarx.h>

#include <filesystem>
#include <memory>

namespace datalake {
// batching_parquet_writer ties together the low-level components for iceberg to
Expand Down Expand Up @@ -75,4 +76,22 @@ class batching_parquet_writer : public data_writer {
data_writer_result _result;
};

class batching_parquet_writer_factory : public data_writer_factory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: could have also made this a nested class so it would be batching_parquet_writer::factory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given the number of things stacked above this, and how pervasive a change that would be, I'd rather make it a separate PR.

public:
batching_parquet_writer_factory(
std::filesystem::path local_directory,
ss::sstring file_name_prefix,
size_t row_count_threshold,
size_t byte_count_threshold);

ss::future<std::unique_ptr<data_writer>>
create_writer(iceberg::struct_type schema) override;

private:
std::filesystem::path _local_directory;
ss::sstring _file_name_prefix;
size_t _row_count_threshold;
size_t _byte_count_treshold;
};

} // namespace datalake
2 changes: 1 addition & 1 deletion src/v/datalake/data_writer_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class data_writer_factory {
public:
virtual ~data_writer_factory() = default;

virtual std::unique_ptr<data_writer>
virtual ss::future<std::unique_ptr<data_writer>>
create_writer(iceberg::struct_type /* schema */) = 0;
};

Expand Down
29 changes: 21 additions & 8 deletions src/v/datalake/record_multiplexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
#include "datalake/record_multiplexer.h"

#include "datalake/data_writer_interface.h"
#include "datalake/logger.h"
#include "datalake/schemaless_translator.h"
#include "datalake/tests/test_data_writer.h"
#include "iceberg/values.h"
#include "model/record.h"
#include "storage/parser_utils.h"
Expand Down Expand Up @@ -49,13 +51,18 @@ record_multiplexer::operator()(model::record_batch batch) {
iceberg::struct_value data = translator.translate_event(
std::move(key), std::move(val), timestamp, offset);
// Send it to the writer
auto& writer = get_writer();
data_writer_error writer_status = co_await writer.add_data_struct(
std::move(data), estimated_size);
if (writer_status != data_writer_error::ok) {

try {
auto& writer = co_await get_writer();
_writer_status = co_await writer.add_data_struct(
std::move(data), estimated_size);
} catch (const std::runtime_error& err) {
datalake_log.error("Failed to add data to writer");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vlog

Also we should set the _writer_status here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think _writer_status should probably be initialized to a generic_error and be set to ok when it actually finished.

_writer_status =data_writer_error::parquet_conversion_error;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after =

}
if (_writer_status != data_writer_error::ok) {
// If a write fails, the writer is left in an indeterminate state,
// we cannot continue in this case.
_writer_status = writer_status;
co_return ss::stop_iteration::yes;
}
}
Expand Down Expand Up @@ -86,12 +93,18 @@ schemaless_translator& record_multiplexer::get_translator() {
return _translator;
}

data_writer& record_multiplexer::get_writer() {
ss::future<data_writer&> record_multiplexer::get_writer() {
if (!_writer) {
auto& translator = get_translator();
auto schema = translator.get_schema();
_writer = _writer_factory->create_writer(std::move(schema));
_writer = co_await _writer_factory->create_writer(std::move(schema));
if (!_writer) {
// FIXME: modify create_writer to return a result and check that
// here. This method should also return a result. That is coming in
// one of the next commits. For now throw & catch.
throw std::runtime_error("Could not create data writer");
}
}
return *_writer;
co_return *_writer;
}
} // namespace datalake
2 changes: 1 addition & 1 deletion src/v/datalake/record_multiplexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class record_multiplexer {

private:
schemaless_translator& get_translator();
data_writer& get_writer();
ss::future<data_writer&> get_writer();

// TODO: in a future PR this will be a map of translators keyed by schema_id
schemaless_translator _translator;
Expand Down
70 changes: 70 additions & 0 deletions src/v/datalake/tests/gtest_record_multiplexer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#include "datalake/batching_parquet_writer.h"
#include "datalake/record_multiplexer.h"
#include "datalake/tests/test_data_writer.h"
#include "model/tests/random_batch.h"
#include "test_utils/tmp_dir.h"

#include <arrow/io/file.h>
#include <arrow/table.h>
#include <gtest/gtest.h>
#include <parquet/arrow/reader.h>

#include <filesystem>

TEST(DatalakeMultiplexerTest, TestMultiplexer) {
int record_count = 10;
Expand Down Expand Up @@ -62,3 +69,66 @@ TEST(DatalakeMultiplexerTest, TestMultiplexerWriteError) {
EXPECT_EQ(
res.error(), datalake::data_writer_error::parquet_conversion_error);
}

TEST(DatalakeMultiplexerTest, WritesDataFiles) {
// Almost an integration test:
// Stitch together as many parts of the data path as is reasonable in a
// single test and make sure we can go from Kafka log to Parquet files on
// disk.
temporary_dir tmp_dir("datalake_multiplexer_test");

int record_count = 50;
int batch_count = 20;

auto writer_factory
= std::make_unique<datalake::batching_parquet_writer_factory>(
tmp_dir.get_path(), "data", 100, 10000);
datalake::record_multiplexer multiplexer(std::move(writer_factory));

model::test::record_batch_spec batch_spec;
batch_spec.records = record_count;
batch_spec.count = batch_count;
ss::circular_buffer<model::record_batch> batches
= model::test::make_random_batches(batch_spec).get0();

auto reader = model::make_generating_record_batch_reader(
[batches = std::move(batches)]() mutable {
return ss::make_ready_future<model::record_batch_reader::data_t>(
std::move(batches));
});

auto result
= reader.consume(std::move(multiplexer), model::no_timeout).get0();

ASSERT_TRUE(result.has_value());
ASSERT_EQ(result.value().size(), 1);
EXPECT_EQ(result.value()[0].record_count, record_count * batch_count);

// Open the resulting file and check that it has data in it with the
// appropriate counts.
int file_count = 0;
for (const auto& entry :
std::filesystem::directory_iterator(tmp_dir.get_path())) {
file_count++;
auto arrow_file_reader
= arrow::io::ReadableFile::Open(entry.path()).ValueUnsafe();

// Open Parquet file reader
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
ASSERT_TRUE(
parquet::arrow::OpenFile(
arrow_file_reader, arrow::default_memory_pool(), &arrow_reader)
.ok());

// Read entire file as a single Arrow table
std::shared_ptr<arrow::Table> table;
auto r = arrow_reader->ReadTable(&table);
ASSERT_TRUE(r.ok());

EXPECT_EQ(table->num_rows(), record_count * batch_count);
// Expect 4 columns for schemaless: offset, timestamp, key, value
EXPECT_EQ(table->num_columns(), 4);
}
// Expect this test to create exactly 1 file
EXPECT_EQ(file_count, 1);
}
4 changes: 2 additions & 2 deletions src/v/datalake/tests/test_data_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ class test_data_writer_factory : public data_writer_factory {
explicit test_data_writer_factory(bool return_error)
: _return_error{return_error} {}

std::unique_ptr<data_writer>
ss::future<std::unique_ptr<data_writer>>
create_writer(iceberg::struct_type schema) override {
return std::make_unique<test_data_writer>(
co_return std::make_unique<test_data_writer>(
std::move(schema), _return_error);
}

Expand Down