Skip to content

Commit 7c40ce7

Browse files
committed
cloud_topics/reconciler: Use L1 io and object primitives
This alters the reconciler so it builds and uploads proper L1 objects, using the l1::io and l1::object_builder abstractions. A follow-up will change the commit and LRO offset portions of the reconciler to use the metastore and the ctp_stm.
1 parent c30275a commit 7c40ce7

File tree

8 files changed

+251
-135
lines changed

8 files changed

+251
-135
lines changed

src/v/cloud_topics/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,13 @@ redpanda_cc_library(
9595
":data_plane_api",
9696
"//src/v/base",
9797
"//src/v/cloud_topics:state_accessors",
98+
"//src/v/cloud_topics/level_one/common:file_io",
9899
"//src/v/cloud_topics/level_one/domain:domain_supervisor",
99100
"//src/v/cloud_topics/level_one/metastore:frontend",
100101
"//src/v/cloud_topics/level_zero/common:extent_meta",
101102
"//src/v/cloud_topics/reconciler",
102103
"//src/v/cluster",
104+
"//src/v/config",
103105
"//src/v/container:chunked_vector",
104106
"//src/v/model",
105107
"//src/v/ssx:sharded_service_container",

src/v/cloud_topics/app.cc

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "cloud_topics/manager/manager.h"
1717
#include "cluster/cluster_epoch_service.h"
1818
#include "cluster/controller.h"
19+
#include "config/node_config.h"
1920
#include "ssx/sharded_service_container.h"
2021

2122
#include <seastar/core/coroutine.hh>
@@ -50,13 +51,26 @@ ss::future<> app::construct(
5051

5152
co_await construct_service(state, data_plane.get());
5253

54+
// Touch the L1 staging directory before L1 i/o starts.
55+
co_await ss::recursive_touch_directory(
56+
config::node().l1_staging_path().string());
57+
58+
co_await construct_service(
59+
l1_io,
60+
config::node().l1_staging_path(),
61+
ss::sharded_parameter([&remote] { return &remote->local(); }),
62+
bucket,
63+
ss::sharded_parameter([&cloud_cache] { return &cloud_cache->local(); }));
64+
5365
co_await construct_service(
5466
reconciler,
5567
ss::sharded_parameter(
5668
[&partition_mgr] { return &partition_mgr->local(); }),
57-
remote,
5869
data_plane.get(),
59-
bucket);
70+
ss::sharded_parameter([this] { return &l1_io.local(); }),
71+
ss::sharded_parameter(
72+
[&metadata_cache] { return &metadata_cache->local(); }));
73+
6074
co_await construct_service(domain_supervisor, controller);
6175
co_await construct_service(
6276
l1_metastore_fe,

src/v/cloud_topics/app.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
#pragma once
1212

13+
#include "cloud_topics/level_one/common/file_io.h"
1314
#include "cloud_topics/level_one/domain/domain_supervisor.h"
1415
#include "cloud_topics/level_one/metastore/frontend.h"
1516
#include "cloud_topics/reconciler/reconciler.h"
@@ -72,6 +73,7 @@ class app : public ssx::sharded_service_container {
7273
ss::sstring _logger_name;
7374
std::unique_ptr<data_plane_api> data_plane;
7475
ss::sharded<state_accessors> state;
76+
ss::sharded<l1::file_io> l1_io;
7577
ss::sharded<reconciler::reconciler> reconciler;
7678
ss::sharded<l1::domain_supervisor> domain_supervisor;
7779
ss::sharded<l1::frontend> l1_metastore_fe;

src/v/cloud_topics/level_one/common/BUILD

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,21 @@ package(default_visibility = ["//src/v/cloud_topics/level_one:__subpackages__"])
44

55
redpanda_cc_library(
66
name = "object_id",
7+
srcs = [
8+
"object_id.cc",
9+
],
710
hdrs = [
811
"object_id.h",
912
],
13+
visibility = [
14+
"//src/v/cloud_topics/level_one:__subpackages__",
15+
"//src/v/cloud_topics/reconciler:__subpackages__",
16+
],
1017
deps = [
1118
"//src/v/base",
1219
"//src/v/utils:named_type",
1320
"//src/v/utils:uuid",
21+
"@fmt",
1422
],
1523
)
1624

@@ -67,6 +75,10 @@ redpanda_cc_library(
6775
name = "abstract_io",
6876
srcs = ["abstract_io.cc"],
6977
hdrs = ["abstract_io.h"],
78+
visibility = [
79+
"//src/v/cloud_topics/level_one:__subpackages__",
80+
"//src/v/cloud_topics/reconciler:__subpackages__",
81+
],
7082
deps = [
7183
":object_id",
7284
"//src/v/container:chunked_vector",
@@ -79,6 +91,11 @@ redpanda_cc_library(
7991
name = "file_io",
8092
srcs = ["file_io.cc"],
8193
hdrs = ["file_io.h"],
94+
visibility = [
95+
"//src/v/cloud_topics:__pkg__",
96+
"//src/v/cloud_topics/level_one:__subpackages__",
97+
"//src/v/cloud_topics/reconciler:__pkg__",
98+
],
8299
deps = [
83100
":abstract_io",
84101
":object_id",

src/v/cloud_topics/reconciler/BUILD

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,14 @@ redpanda_cc_library(
5050
],
5151
visibility = ["//visibility:public"],
5252
deps = [
53-
":range_batch_consumer",
53+
":reconciliation_consumer",
5454
"//src/v/base",
55-
"//src/v/cloud_io:remote",
56-
"//src/v/cloud_storage",
5755
"//src/v/cloud_storage_clients",
5856
"//src/v/cloud_topics:object_utils",
5957
"//src/v/cloud_topics:types",
58+
"//src/v/cloud_topics/level_one/common:abstract_io",
59+
"//src/v/cloud_topics/level_one/common:object",
60+
"//src/v/cloud_topics/level_one/common:object_id",
6061
"//src/v/cloud_topics/level_one/common:object_utils",
6162
"//src/v/cloud_topics/level_zero/stm:ctp_stm_api",
6263
"//src/v/cluster",

0 commit comments

Comments
 (0)