-
Notifications
You must be signed in to change notification settings - Fork 601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ct: Add L0 read path #23629
ct: Add L0 read path #23629
Conversation
3bf6bd0
to
9dcb29c
Compare
9dcb29c
to
8c4e90c
Compare
4de71b8
to
6ea3a72
Compare
eec897f
to
34167b4
Compare
Rebased with dev |
34167b4
to
2b9a299
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeh this looks good to me. could minor nits, but is ready to merge IMO.
before, that tho, can you please do one of the following:
- submit the changes to cloud_io/* and cloud_storage/* as a separate PR so that storage team wide changes are more accessible for broader review, or
- bring all those changes into cloud_topics/* so that we cloud_topics owns the changes for now?
src/v/model/record_batch_types.h
Outdated
dl_placeholder = 35, // placeholder batch type used by cloud topics | ||
dl_overlay = 36, // overlay batch used by dl_stm and cloud topics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we have separate batch types, or just a single cloud_topics_meta batch type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll rename this. We should have two batch types. dl_placeholder
for placeholders and dl_stm_command
batch for all STM commands including dl_overlay
.
ss::future<result<bool>> materialize( | ||
cloud_storage_clients::bucket_name bucket, | ||
cloud_io::remote_api<Clock>* api, | ||
cloud_io::basic_cache_service_api<Clock>* cache, | ||
basic_retry_chain_node<Clock>* rtc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like this should be a free function factory. that would also allow you avoid the Clock template parameter on the extent? even make_raft_data_batch
could be a free function? much easier to write tests on functional bits than having mutators like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case the materialize
is called conditionally. Several placeholders may share the same L0 object. So if the L0 object is materialized we're not calling materizlie
for it anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You do call it conditionally but from the reader not from the extent, right? Why this method is on the extent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case the materialize is called conditionally. Several placeholders may share the same L0 object. So if the L0 object is materialized we're not calling materizlie for it anymore.
yeh, i still think the extent should be a dumb object and there should be free functions to construct it. i can work on that if you dont' want to.
|
||
#include <seastar/core/lowres_clock.hh> | ||
|
||
#pragma once |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pragma should go up above includes?
|
||
ss::circular_buffer<model::record_batch> slice; | ||
for (auto& e : extents.value()) { | ||
slice.push_back(e.make_raft_data_batch()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it isn't obvious why we are converting placeholder batches to raft batches. i'm guessing that it has something to do with the rest of the system expecting raft data batches? are there parts of the raft batch that can't be filled in completely, or is it identical, etc...?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The placeholder batch doesn't have the data, so we're bringing the data and replacing placeholder batch with the raft_data batch. We need to make this a raft-data batch because the Kafka layer will filter out anything which isn't a raft-data batch. We can introduce another batch type (e.g. materialized_placeholder) and teach Kafka layer to use it but IMO this will only add complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make this a raft-data batch because the Kafka layer will filter out anything which isn't a raft-data batch
this is what i was looking for. thanks
created #23748 |
3733a37
to
d7d5263
Compare
non flaky failures in https://buildkite.com/redpanda/redpanda/builds/56353#01927c04-92c7-4c9a-82b7-461c05b702a1:
non flaky failures in https://buildkite.com/redpanda/redpanda/builds/56578#019292f7-3531-4b61-b16d-b2421234126f:
|
Retry command for Build#56353please wait until all jobs are finished before running the slash command
|
ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/56353#01927c1d-d7fb-4a4a-a4cd-238151ebbb3e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still need to review tests.
@@ -0,0 +1,17 @@ | |||
### Missing bits | |||
|
|||
* In the write path we should populate batch cache with `raft_data` batches |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be tricky given that we will have the data on the leader but not on the replicas. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I think we need to focus on optimizing the read path for parallelism to have good performance even if the data is not cached. But having it cached if the retrieval is happening shortly after the data was produced on the same broker is also important.
|
||
namespace experimental::cloud_topics { | ||
|
||
// Extent represents dl_placeholder with the data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move this comment to the class? I missed it and was about to write a comment asking for details but found it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what class? basic_placeholder_extent?
} | ||
|
||
if ( | ||
!status.has_value() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in which case do we expect o get here with status.has_value() == false
? i don't understand this condition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will clean up in a followup, this check is not needed because if status.has_value() == false
we will return timeout error from previous if statement (the only way status could be a nullopt is when the loop timed out while cache element is in-progress, in this case rp.is_allowed will be equal to false).
ss::future<result<bool>> materialize( | ||
cloud_storage_clients::bucket_name bucket, | ||
cloud_io::remote_api<Clock>* api, | ||
cloud_io::basic_cache_service_api<Clock>* cache, | ||
basic_retry_chain_node<Clock>* rtc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You do call it conditionally but from the reader not from the extent, right? Why this method is on the extent?
|
||
namespace experimental::cloud_topics { | ||
|
||
model::record_batch_reader make_placeholder_extent_reader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A comment please about the inputs to this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will add a followup
struct errc_converter; | ||
|
||
template<> | ||
struct errc_converter<cloud_io::download_result, errc> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't like this mapping (to be clear: I'm not asking you to change it). Don't have better ideas but maybe you get some ideas by reading this tip https://www.boost.org/doc/libs/develop/libs/outcome/doc/html/tutorial/essential/conventions.html?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not a public api, the whole thing lives inside .cc to begin with
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't like this mapping
yeh i tend to agree. @nvartolomei can you say a bit more about what you don't like?
this is not a public api, the whole thing lives inside .cc to begin with
i get what you are saying: it is contained, but i'm not sure it is relevant to nicoale's feedback about the pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case the error code enum from one module is mapped to error code enum from another. And the link says that this is a bad pattern and it's better to use std::error_code
on api boundaries so you don't have to convert. The reasoning here is that cloud_io::download_result
is not an std::error_code
. Also, this is an exhaustive check. Sometimes the error code based solely on enum class
is better because you can use switch
stmt and rely on the compiler to check that the error check is exhaustive. In this case this utility function is not exposed outside and the stuff in .h returns result
types which are based on std::error_code
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's say I have a function ss::future<result<iobuf>> download()
in the header. The result
stores std::error_code
which may contain different error categories. The module also has its own error code type. As a user of the module I'd still expect that all public functions / methods are returning error codes from the module itself, right? In this case I can still write an exhaustive error check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, Iceberg uses different approach with boost::outcome
that returns error code which is not std::error_code
. As Andrew explained to me the other day this was done to avoid boilerplate and to rely on switch
statement during the error handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @Lazin this makes sense!
/// | ||
/// The type of the error code should be known | ||
template<class T, class E> | ||
result<T> result_convert(result<T>&& res) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cmd+f doesn't find any matches for result_convert
is it used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like I ended up using error-converter directly. Will clean this up in a followup.
|
||
template<class Clock> | ||
ss::future<result<iobuf>> | ||
basic_placeholder_extent<Clock>::materialize_from_cloud_storage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Lazin what if we make basic_placeholder_extent closer to a dumb object and instead we have some sort of materialization service which does the materialization? Or, just put this methods into the reader. Why we choose to attach so much functionality to the extent class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a final version of this code, don't overthink all this. The goal of the placeholder
extent is to be used in one place in the read path for a month or two until we will roll out proper read path with reader reuse/caching and centralized control over materialized resources (similar to what we have now in TS). The goal of this code is to unblock further development and be correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal of this code is to unblock further development and be correct.
yes, but keep things as simple as possible. i think @nvartolomei is right that it would be better to have a plain object--it would be simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nvartolomei after we merge this, i think you should feel free to come back and do some clean up / refactoring work for stuff like this. i'm planning to do this as well as a forcing function to get into the code in more detail as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll make the change and we will see if it's getting much simpler.
Signed-off-by: Evgeny Lazin <[email protected]>
Signed-off-by: Evgeny Lazin <[email protected]>
Signed-off-by: Evgeny Lazin <[email protected]>
The extent represents the placeholder and the data that the placeholder references. The extent can be used to materialize the data and to restore the original 'raft_data' batch used to create the extent. Signed-off-by: Evgeny Lazin <[email protected]>
Signed-off-by: Evgeny Lazin <[email protected]>
The fixture is supposed to be used to test reader and extent components. It can be used to gemernate the test data and set up mocks (cache and cloud storage api). Signed-off-by: Evgeny Lazin <[email protected]>
The test is validating materialization of the extent in case if the data is cached or not. Signed-off-by: Evgeny Lazin <[email protected]>
The reader consumes placeholder batches from the underlying reader (storage::log_segment_reader) and transforms them into raft_data batches. It does this by creating placeholder_extent instance for every dl_placeholder batch and materializing this extent using cloud storage or disk cache. Signed-off-by: Evgeny Lazin <[email protected]>
Signed-off-by: Evgeny Lazin <[email protected]>
Signed-off-by: Evgeny Lazin <[email protected]>
d7d5263
to
1ba9187
Compare
/ci-repeat 1 |
struct errc_converter; | ||
|
||
template<> | ||
struct errc_converter<cloud_io::download_result, errc> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't like this mapping
yeh i tend to agree. @nvartolomei can you say a bit more about what you don't like?
this is not a public api, the whole thing lives inside .cc to begin with
i get what you are saying: it is contained, but i'm not sure it is relevant to nicoale's feedback about the pattern.
} | ||
|
||
if ( | ||
!status.has_value() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
template<class Clock> | ||
ss::future<result<iobuf>> | ||
basic_placeholder_extent<Clock>::materialize_from_cloud_storage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal of this code is to unblock further development and be correct.
yes, but keep things as simple as possible. i think @nvartolomei is right that it would be better to have a plain object--it would be simpler.
|
||
template<class Clock> | ||
ss::future<result<iobuf>> | ||
basic_placeholder_extent<Clock>::materialize_from_cloud_storage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nvartolomei after we merge this, i think you should feel free to come back and do some clean up / refactoring work for stuff like this. i'm planning to do this as well as a forcing function to get into the code in more detail as well.
|
||
namespace experimental::cloud_topics { | ||
|
||
// Extent represents dl_placeholder with the data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ss::future<result<bool>> materialize( | ||
cloud_storage_clients::bucket_name bucket, | ||
cloud_io::remote_api<Clock>* api, | ||
cloud_io::basic_cache_service_api<Clock>* cache, | ||
basic_retry_chain_node<Clock>* rtc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case the materialize is called conditionally. Several placeholders may share the same L0 object. So if the L0 object is materialized we're not calling materizlie for it anymore.
yeh, i still think the extent should be a dumb object and there should be free functions to construct it. i can work on that if you dont' want to.
return p.size_bytes; | ||
} | ||
|
||
inline ss::future<result<ss::circular_buffer<placeholder_extent>>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this does not need to be inline
retry_chain_node* rtc) { | ||
absl::node_hash_map<uuid_t, ss::lw_shared_ptr<hydrated_L0_object>> hydrated; | ||
ss::circular_buffer<placeholder_extent> extents; | ||
for (auto&& p : placeholders) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this auto&&?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's moved in the loop body
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's moved in the loop body
but auto&&
isn't an r-value reference, right? it's a forwarding reference, but there isn't any type deduction happening here?
|
||
ss::circular_buffer<model::record_batch> slice; | ||
for (auto& e : extents.value()) { | ||
slice.push_back(e.make_raft_data_batch()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make this a raft-data batch because the Kafka layer will filter out anything which isn't a raft-data batch
this is what i was looking for. thanks
|
||
namespace experimental::cloud_topics { | ||
|
||
model::record_batch_reader make_placeholder_extent_reader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the below tests from https://buildkite.com/redpanda/redpanda/builds/56578#019292b3-5176-4495-b57c-78246c95457a have failed and will be retried
|
Retry command for Build#56578please wait until all jobs are finished before running the slash command
|
The PR implements the reader which consumes
dl_placeholder
batches from the partition and materializes them by downloading the data from the cloud storage or fetching it from the cache.The PR adds an abstract base class for the
cloud_storage::cache
class. The interface is located in thecloud_io
package.WIP, not all code pushed yet.
Backports Required
Release Notes