Skip to content

Commit

Permalink
Merge pull request #23544 from Lazin/feature/create-batcher2
Browse files Browse the repository at this point in the history
ct: Add batcher component
  • Loading branch information
dotnwat authored Sep 29, 2024
2 parents c684d17 + 0af23a0 commit 3df9def
Show file tree
Hide file tree
Showing 33 changed files with 2,413 additions and 2 deletions.
5 changes: 4 additions & 1 deletion src/v/cloud_topics/BUILD
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
load("//bazel:build.bzl", "redpanda_cc_library")

package(default_visibility = ["//src/v/cloud_topics/tests:__pkg__"])
package(default_visibility = [
"//src/v/cloud_topics/batcher:__pkg__",
"//src/v/cloud_topics/tests:__pkg__",
])

redpanda_cc_library(
name = "logger",
Expand Down
2 changes: 2 additions & 0 deletions src/v/cloud_topics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ v_cc_library(
v::model
v::serde
)

add_subdirectory(reconciler)
102 changes: 102 additions & 0 deletions src/v/cloud_topics/batcher/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
load("//bazel:build.bzl", "redpanda_cc_library")

package(default_visibility = ["//src/v/cloud_topics/batcher/tests:__pkg__"])

redpanda_cc_library(
name = "serializer",
srcs = [
"serializer.cc",
],
hdrs = [
"serializer.h",
],
implementation_deps = [
"//src/v/storage:record_batch_utils",
],
include_prefix = "cloud_topics/batcher",
deps = [
"//src/v/base",
"//src/v/bytes:iobuf",
"//src/v/container:fragmented_vector",
"//src/v/model",
],
)

redpanda_cc_library(
name = "write_request",
srcs = [
"write_request.cc",
],
hdrs = [
"write_request.h",
],
implementation_deps = [
"//src/v/cloud_topics:logger",
],
include_prefix = "cloud_topics/batcher",
deps = [
":serializer",
"//src/v/base",
"//src/v/cloud_topics:types",
"//src/v/model",
"@seastar",
],
)

redpanda_cc_library(
name = "aggregator",
srcs = [
"aggregator.cc",
],
hdrs = [
"aggregator.h",
],
implementation_deps = [
":serializer",
"//src/v/cloud_topics:logger",
"//src/v/cloud_topics:placeholder",
"//src/v/storage:record_batch_builder",
],
include_prefix = "cloud_topics/batcher",
deps = [
":write_request",
"//src/v/base",
"//src/v/cloud_topics:types",
"//src/v/container:fragmented_vector",
"//src/v/model",
"@abseil-cpp//absl/container:btree",
"@seastar",
],
)

redpanda_cc_library(
name = "batcher",
srcs = [
"batcher.cc",
],
hdrs = [
"batcher.h",
],
implementation_deps = [
"//src/v/cloud_io:remote",
"//src/v/cloud_topics:logger",
"//src/v/cloud_topics/batcher:aggregator",
"//src/v/cloud_topics/batcher:serializer",
"//src/v/ssx:sformat",
"//src/v/utils:human",
],
include_prefix = "cloud_topics/batcher",
deps = [
"//src/v/base",
"//src/v/bytes",
"//src/v/bytes:iobuf",
"//src/v/cloud_topics:types",
"//src/v/cloud_topics/batcher:write_request",
"//src/v/config",
"//src/v/model",
"//src/v/utils:retry_chain_node",
"//src/v/utils:uuid",
"@abseil-cpp//absl/container:btree",
"@seastar",
],
)
186 changes: 186 additions & 0 deletions src/v/cloud_topics/batcher/aggregator.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#include "cloud_topics/batcher/aggregator.h"

#include "cloud_topics/batcher/serializer.h"
#include "cloud_topics/batcher/write_request.h"
#include "cloud_topics/dl_placeholder.h"
#include "storage/record_batch_builder.h"

#include <seastar/core/future.hh>
#include <seastar/util/defer.hh>

namespace experimental::cloud_topics::details {

template<class Clock>
aggregator<Clock>::aggregator(object_id id)
: _id(id) {}

template<class Clock>
aggregator<Clock>::~aggregator() {
ack_error(errc::timeout);
if (!_staging.empty()) {
for (auto& [key, list] : _staging) {
std::ignore = key;
for (auto& req : list) {
req.set_value(errc::timeout);
}
}
}
}

template<class Clock>
struct prepared_placeholder_batches {
const object_id id;
chunked_vector<std::unique_ptr<batches_for_req<Clock>>> placeholders;
uint64_t size_bytes{0};
};

namespace {
/// Convert multiple chunk elements into placeholder batches
///
/// Byte offsets in the chunk are zero based. Because we're
/// concatenating multiple chunks the offset has to be corrected.
/// This is done using the `base_byte_offset` parameter.
template<class Clock>
void make_dl_placeholder_batches(
prepared_placeholder_batches<Clock>& ctx,
write_request<Clock>& req,
const serialized_chunk& chunk) {
auto result = std::make_unique<batches_for_req<Clock>>();
for (const auto& b : chunk.batches) {
dl_placeholder placeholder{
.id = ctx.id,
.offset = first_byte_offset_t(ctx.size_bytes),
.size_bytes = byte_range_size_t(b.size_bytes),
};

storage::record_batch_builder builder(
model::record_batch_type::
version_fence /*TODO: use dl_placeholder batch type*/,
b.base);

// TX data (producer id, control flag) are not copied from 'src' yet.

// Put the payload
builder.add_raw_kv(
serde::to_iobuf(dl_placeholder_record_key::payload),
serde::to_iobuf(placeholder));

// TODO: fix this
for (int i = 1; i < b.num_records - 1; i++) {
iobuf empty;
builder.add_raw_kv(
serde::to_iobuf(dl_placeholder_record_key::empty),
std::move(empty));
}
result->placeholders.push_back(std::move(builder).build());
ctx.size_bytes += b.size_bytes;
}
result->ref = req.weak_from_this();
ctx.placeholders.push_back(std::move(result));
}
} // namespace

template<class Clock>
chunked_vector<std::unique_ptr<batches_for_req<Clock>>>
aggregator<Clock>::get_placeholders() {
prepared_placeholder_batches<Clock> ctx{
.id = _id,
};
for (auto& [key, list] : _staging) {
for (auto& req : list) {
vassert(
!req.data_chunk.payload.empty(),
"Empty write request for ntp: {}",
key);
make_dl_placeholder_batches(ctx, req, req.data_chunk);
}
}
return std::move(ctx.placeholders);
}

template<class Clock>
iobuf aggregator<Clock>::get_stream() {
iobuf concat;
for (auto& p : _aggregated) {
if (p->ref != nullptr) {
concat.append(std::move(p->ref->data_chunk.payload));
}
}
return concat;
}

template<class Clock>
object_id aggregator<Clock>::get_object_id() const noexcept {
return _id;
}

template<class Clock>
iobuf aggregator<Clock>::prepare() {
// Move data from staging to aggregated
_aggregated = get_placeholders();
_staging.clear();
// Produce input stream
return get_stream();
}

template<class Clock>
void aggregator<Clock>::ack() {
if (_aggregated.empty()) {
return;
}
auto d = ss::defer([this] { _aggregated.clear(); });
for (auto& p : _aggregated) {
if (p->ref != nullptr) {
try {
p->ref->set_value(std::move(p->placeholders));
} catch (const ss::broken_promise&) {
}
}
}
}

template<class Clock>
void aggregator<Clock>::ack_error(errc e) {
if (_aggregated.empty()) {
return;
}
auto d = ss::defer([this] { _aggregated.clear(); });
for (auto& p : _aggregated) {
if (p->ref != nullptr) {
try {
p->ref->set_value(e);
} catch (const ss::broken_promise&) {
}
}
}
}

template<class Clock>
void aggregator<Clock>::add(write_request<Clock>& req) {
auto it = _staging.find(req.ntp);
if (it == _staging.end()) {
it = _staging.emplace_hint(it, req.ntp, write_request_list<Clock>());
}
req._hook.unlink();
it->second.push_back(req);
_size_bytes += req.size_bytes();
}

template<class Clock>
size_t aggregator<Clock>::size_bytes() const noexcept {
return _size_bytes;
}

template class aggregator<ss::lowres_clock>;
template class aggregator<ss::manual_clock>;
} // namespace experimental::cloud_topics::details
87 changes: 87 additions & 0 deletions src/v/cloud_topics/batcher/aggregator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#pragma once

#include "base/seastarx.h"
#include "cloud_topics/batcher/write_request.h"
#include "cloud_topics/errc.h"
#include "cloud_topics/types.h"
#include "container/fragmented_vector.h"
#include "model/record.h"

#include <seastar/core/circular_buffer.hh>
#include <seastar/core/weak_ptr.hh>

#include <absl/container/btree_map.h>

namespace experimental::cloud_topics::details {

/// List of placeholder batches that has to be propagated
/// to the particular write request.
template<class Clock>
struct batches_for_req {
/// Generated placeholder batches
ss::circular_buffer<model::record_batch> placeholders;
/// Source write request
ss::weak_ptr<write_request<Clock>> ref;
};

// This component aggregates a bunch of write
// requests and produces single serialized object.
template<class Clock>
class aggregator {
public:
explicit aggregator(object_id id = object_id{uuid_t::create()});
aggregator(const aggregator&) = delete;
aggregator(aggregator&&) = delete;
aggregator& operator=(const aggregator&) = delete;
aggregator& operator=(aggregator&&) = delete;
~aggregator();

/// Add content of the write request to the
/// L0 object.
/// If write request is destroyed before the 'prepare'
/// call the content of the write request will not be
/// included into L0 object. The size value returned by
/// the 'size_bytes' call will not match the actual size
/// of the object.
void add(write_request<Clock>& req);

/// Estimate L0 object size
size_t size_bytes() const noexcept;

/// Prepare upload byte stream
iobuf prepare();

object_id get_object_id() const noexcept;

void ack();
void ack_error(errc);

private:
/// Generate placeholders.
/// This method should be invoked before 'get_result'
chunked_vector<std::unique_ptr<batches_for_req<Clock>>> get_placeholders();

/// Produce L0 object payload.
/// The method messes up the state so it can only
/// be called once.
iobuf get_stream();

object_id _id;
/// Source data for the aggregator
absl::btree_map<model::ntp, write_request_list<Clock>> _staging;
/// Prepared placeholders
chunked_vector<std::unique_ptr<batches_for_req<Clock>>> _aggregated;
size_t _size_bytes{0};
};

} // namespace experimental::cloud_topics::details
Loading

0 comments on commit 3df9def

Please sign in to comment.