-
Notifications
You must be signed in to change notification settings - Fork 601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
iceberg: add batching parquet writer factory #23683
iceberg: add batching parquet writer factory #23683
Conversation
} catch (const std::exception& e) { | ||
datalake_log.error( | ||
"Error making output stream for file {}", _output_file_path); | ||
datalake_log.error(e.what()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} catch (...) {
datalake_log.error("Error making output stream for file {}: {}",
_output_file_path, std::current_exception());
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
vlog
?
std::stringstream filename_stream; | ||
filename_stream << file_uuid << ".parquet"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use fmt::format?
datalake_log.error("Error opening output file {}",_output_file_path); | ||
datalake_log.error(e.what()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please keep this a single log line and wrap it with vlog
macro
56d88d8
to
48eddb3
Compare
48eddb3
to
79062fb
Compare
@@ -159,4 +162,26 @@ ss::future<> batching_parquet_writer::abort() { | |||
} | |||
} | |||
|
|||
batching_parquet_writer_factory::batching_parquet_writer_factory( | |||
std::filesystem::path local_directory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Architecturally, I believe it will be simpler to keep these parquet files in memory. AFAIK we're going to load these fully into memory already to send them, and it's fairly straightforward to have a semaphore to make sure we stay within the subsystem's memory budget. Writing to disk, especially with our trends of smaller and smaller disks, is going to come with a number of challenges: cleaning up zombie files, integrating with space management, etc.
Now we can do this after the beta phase, but we should keep this in mind as we're structuring the codepaths.
However
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we do want to stream directly to s3 eventually. I'd rather stick with this for now so we can get an end-to-end test asap.
FWIW, we can stream from disk to S3, we don't need to load it all into memory first. cloud_storage::remote::upload_controller_snapshot is an example of that.
batching_parquet_writer_factory::create_writer(iceberg::struct_type schema) { | ||
auto ret = std::make_unique<batching_parquet_writer>( | ||
std::move(schema), _row_count_threshold, _byte_count_treshold); | ||
std::string filename = fmt::format("{}.parquet", uuid_t::create()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add some more info here like discussed in slack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just talked about this on slack again.
- Currently adding a file_name_prefix to the factory
- Later PR will add a remote_directory parameter for the uploader
- PR that adds structured tables will prepend schema_id to the file name
// FIXME: This method should return a result and let the multiplexer | ||
// deal with it appropriately |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably have thrown at this level instead of the next layer up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean by "this level" you mean throw an exception up to the multiplexer?
src/v/datalake/record_multiplexer.cc
Outdated
_writer_status = co_await writer.add_data_struct( | ||
std::move(data), estimated_size); | ||
} catch (const std::runtime_error& err) { | ||
datalake_log.error("Failed to add data to writer"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
vlog
Also we should set the _writer_status
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think _writer_status should probably be initialized to a generic_error and be set to ok when it actually finished.
@@ -75,4 +76,20 @@ class batching_parquet_writer : public data_writer { | |||
data_writer_result _result; | |||
}; | |||
|
|||
class batching_parquet_writer_factory : public data_writer_factory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional: could have also made this a nested class so it would be batching_parquet_writer::factory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given the number of things stacked above this, and how pervasive a change that would be, I'd rather make it a separate PR.
src/v/datalake/record_multiplexer.h
Outdated
@@ -42,21 +56,25 @@ class record_multiplexer { | |||
explicit record_multiplexer( | |||
std::unique_ptr<data_writer_factory> writer_factory); | |||
ss::future<ss::stop_iteration> operator()(model::record_batch batch); | |||
ss::future<result<chunked_vector<data_writer_result>, data_writer_error>> | |||
// ss::future<result<chunked_vector<data_file_result>, data_writer_error>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed that I left this in here. Removing it now.
7c93d14
to
ecdd60e
Compare
the below tests from https://buildkite.com/redpanda/redpanda/builds/56150#01927262-6c33-4348-8659-1d033fdeab71 have failed and will be retried
the below tests from https://buildkite.com/redpanda/redpanda/builds/56172#019272db-7e8b-42ef-828c-da0c04579c2d have failed and will be retried
the below tests from https://buildkite.com/redpanda/redpanda/builds/56183#0192739b-ea9b-4c42-9850-dc89d848a76d have failed and will be retried
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
none of the comments are blockers, probably good to address in a later change
@@ -46,12 +49,24 @@ batching_parquet_writer::initialize(std::filesystem::path output_file_path) { | |||
ss::open_flags::create | ss::open_flags::truncate | |||
| ss::open_flags::wo); | |||
} catch (...) { | |||
vlogl( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: vlog(datalake_log.error... is the usual pattern, that doesn't work?
src/v/datalake/record_multiplexer.cc
Outdated
_writer_status = co_await writer.add_data_struct( | ||
std::move(data), estimated_size); | ||
} catch (const std::runtime_error& err) { | ||
datalake_log.error("Failed to add data to writer"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think _writer_status should probably be initialized to a generic_error and be set to ok when it actually finished.
ecdd60e
to
c3818d1
Compare
c3818d1
to
92ed6e9
Compare
92ed6e9
to
8aa3cbb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all my feedback was addressed. lgtm
ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/56250#019276e2-1b8b-4037-94c4-236142433fc3 |
Retry command for Build#56250please wait until all jobs are finished before running the slash command
|
Add an implementation of data_writer_factory for batching_parquet_writer. Use this to test the data path from multiplexer through to writing parquet files.
batching_parquet_writer catches different types of exceptions and transforms them into data_writer_error error codes. This is a good place to integrate some error logging.
The data_writer_factory::create method may need to open files or do other things that may fail. Return a result type so we can correctly indicate failure.
Previously, a failure to create a data writer was handled through a try/ catch. This changes that to a result type, since that's our preferred error handling for the higher-level parts of the code. This requires changing the type for the writer from std::unique_ptr to ss::shared_ptr so it can be returned in a result (previously it was returned by reference).
When reading Parquet files, the Arrow library reads int32s from unaligned memory. After upgrading to Clang 18 we started getting warnings about this when testing locally and errors in CI. This is safe to suppress: the read code path is only used in tests.
b5cfb5c
to
06e20c6
Compare
Retry command for Build#56307please wait until all jobs are finished before running the slash command
|
/ci-repeat 1 |
@@ -159,4 +161,29 @@ ss::future<> batching_parquet_writer::abort() { | |||
} | |||
} | |||
|
|||
batching_parquet_writer_factory::batching_parquet_writer_factory( | |||
std::filesystem::path local_directory, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
local_directory
todo for future: As we chatted offline, this is internal to the writer, so it should self manage this path and its lifecycle
src/v/datalake/record_multiplexer.cc
Outdated
std::move(data), estimated_size); | ||
} catch (const std::runtime_error& err) { | ||
datalake_log.error("Failed to add data to writer"); | ||
_writer_status =data_writer_error::parquet_conversion_error; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space after =
@@ -10,6 +10,7 @@ | |||
#pragma once | |||
|
|||
#include "base/outcome.h" | |||
#include "coordinator/data_file.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
datalake/coordinator/data_file.h
i think we can ignore this it's failing upstream frequently. |
force-merging given that https://buildkite.com/redpanda/redpanda/builds/56307#019278c1-9fc3-40ed-ad1a-1f9ab3b24793 only shows the known always-failing-in-dev test failure |
Adds a factory class for the batching_parquet_writer so it can be created by the multiplexer. This also improves error handling and logging in the data path.
Backports Required
Release Notes