Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/reconciler' into feature/create-…
Browse files Browse the repository at this point in the history
…batcher2
  • Loading branch information
dotnwat committed Sep 28, 2024
2 parents b6a068b + 9a3109b commit cdc37b7
Show file tree
Hide file tree
Showing 16 changed files with 740 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/v/cloud_topics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ v_cc_library(
v::model
v::serde
)

add_subdirectory(reconciler)
46 changes: 46 additions & 0 deletions src/v/cloud_topics/reconciler/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
load("//bazel:build.bzl", "redpanda_cc_library")

package(default_visibility = ["//src/v/cloud_topics/reconciler/tests:__pkg__"])

redpanda_cc_library(
name = "range_batch_consumer",
srcs = [
"range_batch_consumer.cc",
],
hdrs = [
"range_batch_consumer.h",
],
include_prefix = "cloud_topics/reconciler",
deps = [
"//src/v/base",
"//src/v/bytes:iobuf",
"//src/v/model",
"@seastar",
],
)

redpanda_cc_library(
name = "reconciler",
srcs = [
"reconciler.cc",
],
hdrs = [
"reconciler.h",
],
include_prefix = "cloud_topics/reconciler",
visibility = ["//visibility:public"],
deps = [
":range_batch_consumer",
"//src/v/base",
"//src/v/cloud_io:remote",
"//src/v/cluster",
"//src/v/cluster:notification",
"//src/v/container:fragmented_vector",
"//src/v/kafka/server",
"//src/v/kafka/utils:txn_reader",
"//src/v/model",
"//src/v/random:generators",
"@abseil-cpp//absl/container:node_hash_map",
"@seastar",
],
)
13 changes: 13 additions & 0 deletions src/v/cloud_topics/reconciler/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
v_cc_library(
NAME cloud_topics_reconciler
SRCS
range_batch_consumer.cc
reconciler.cc
DEPS
v::bytes
v::container
v::json
v::strings
v::utils
v::cluster
)
36 changes: 36 additions & 0 deletions src/v/cloud_topics/reconciler/range_batch_consumer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#include "cloud_topics/reconciler/range_batch_consumer.h"

namespace experimental::cloud_topics::reconciler {

ss::future<ss::stop_iteration>
range_batch_consumer::operator()(model::record_batch batch) {
if (!_base_offset.has_value()) {
_base_offset = batch.base_offset();
}
_range.info.last_offset = batch.last_offset();

auto data = serde::to_iobuf(std::move(batch));
_range.data.append(std::move(data));

co_return ss::stop_iteration::no;
}

std::optional<range> range_batch_consumer::end_of_stream() {
if (_base_offset.has_value()) {
_range.info.base_offset = _base_offset.value();
return std::move(_range);
}
return std::nullopt;
}

} // namespace experimental::cloud_topics::reconciler
53 changes: 53 additions & 0 deletions src/v/cloud_topics/reconciler/range_batch_consumer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/
#pragma once

#include "bytes/iobuf.h"
#include "model/fundamental.h"
#include "model/record.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/future.hh>

#include <optional>

namespace experimental::cloud_topics::reconciler {

/*
* metadata about a range of batches.
*/
struct range_info {
model::offset base_offset;
model::offset last_offset;
};

/*
* a materialized range of batches.
*/
struct range {
iobuf data;
range_info info;
};

/*
* Consumer that builds a range from a record batch reader.
*/
class range_batch_consumer {
public:
ss::future<ss::stop_iteration> operator()(model::record_batch);
std::optional<range> end_of_stream();

private:
range _range;
std::optional<model::offset> _base_offset;
};

} // namespace experimental::cloud_topics::reconciler
Loading

0 comments on commit cdc37b7

Please sign in to comment.