Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions proto/redpanda/core/admin/v2/shadow_link.proto
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,17 @@ message ShadowLinkClientOptions {
// If 0 is provided, defaults to 100ms
int32 retry_backoff_ms = 8;
// Fetch request timeout
// If 0 is provided, defaults to 100ms
// If 0 is provided, defaults to 500ms
int32 fetch_wait_max_ms = 9;
// Fetch min bytes
// If 0 is provided, defaults to 1 byte
// If 0 is provided, defaults to 5 MiB
int32 fetch_min_bytes = 10;
// Fetch max bytes
// If 0 is provided, defaults to 1MiB
// If 0 is provided, defaults to 20 MiB
int32 fetch_max_bytes = 11;
// Fetch partition max bytes
// If 0 is provided, defaults to 1 MiB
int32 fetch_partition_max_bytes = 12;
}
// Options for syncing topic metadata
message TopicMetadataSyncOptions {
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster_link/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,11 @@ redpanda_cc_library(
"//src/v/base",
"//src/v/cluster",
"//src/v/cluster/utils:partition_change_notifier_api",
"//src/v/cluster_link/replication:deps_impl",
"//src/v/cluster_link/replication:deps",
"//src/v/cluster_link/replication:mux_remote_consumer",
"//src/v/kafka/client/direct_consumer",
"//src/v/kafka/server",
"//src/v/kafka/server:write_at_offset_stm",
"//src/v/model",
"//src/v/raft",
"@seastar",
Expand Down
10 changes: 10 additions & 0 deletions src/v/cluster_link/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ ss::future<> manager::handle_on_link_change(model::id_t id) {
id,
link_metadata);
it->second->update_config(link_metadata.copy());
_cfg_change_notifications.notify(id, link_metadata);
} else {
// Create a new link
vlog(
Expand Down Expand Up @@ -613,6 +614,15 @@ ss::future<> manager::stop_topic_reconciler() {
}
}

manager::notification_id manager::register_link_config_changes_callback(
link_cfg_change_notification_cb cb) {
return _cfg_change_notifications.register_cb(std::move(cb));
}

void manager::unregister_link_config_changes_callback(notification_id id) {
_cfg_change_notifications.unregister_cb(id);
}

consumer_groups_router& manager::get_group_router() noexcept {
return *_group_router;
}
Expand Down
9 changes: 9 additions & 0 deletions src/v/cluster_link/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ namespace cluster_link {
*/
class manager {
public:
using notification_id = named_type<uint32_t, struct mgr_notification_tag>;
using link_cfg_change_notification_cb
= ss::noncopyable_function<void(model::id_t, const model::metadata&)>;
manager(
::model::node_id self,
std::unique_ptr<kafka::data::rpc::partition_leader_cache>
Expand Down Expand Up @@ -154,6 +157,10 @@ class manager {
return _scheduling_group;
}

notification_id
register_link_config_changes_callback(link_cfg_change_notification_cb cb);
void unregister_link_config_changes_callback(notification_id cb);

private:
/// Called periodically to reconcile registered tasks on created links
ss::future<> link_task_reconciler();
Expand All @@ -178,6 +185,8 @@ class manager {

chunked_vector<std::unique_ptr<task_factory>> _task_factories;
absl::flat_hash_map<id_t, std::unique_ptr<link>> _links;
notification_list<link_cfg_change_notification_cb, notification_id>
_cfg_change_notifications;

ss::lowres_clock::duration _task_reconciler_interval;
mutex _link_task_reconciler_mutex{
Expand Down
22 changes: 16 additions & 6 deletions src/v/cluster_link/model/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ std::ostream& operator<<(std::ostream& os, const tls_file_or_value& t);
*/
struct connection_config
: serde::
envelope<connection_config, serde::version<0>, serde::compat_version<0>> {
envelope<connection_config, serde::version<1>, serde::compat_version<0>> {
/// List of addresses to bootstrap the connection
std::vector<net::unresolved_address> bootstrap_servers;
/// Support authn variants. Currently only SCRAM but update this to add
Expand Down Expand Up @@ -164,15 +164,19 @@ struct connection_config
// Maximum fetch wait time
std::optional<int32_t> fetch_wait_max_ms;
// Default value for fetch_wait_max_ms (100ms)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: missed a spot

Suggested change
// Default value for fetch_wait_max_ms (100ms)
// Default value for fetch_wait_max_ms (500ms)

another one below (fetch_max_bytes_default). incidentally, why bother duplicating these value in code comments at all?

static constexpr auto fetch_wait_max_ms_default = 100;
static constexpr auto fetch_wait_max_ms_default = 500;
// Minimum number of bytes to fetch
std::optional<int32_t> fetch_min_bytes;
// Default minimum number of bytes to fetch (1B)
static constexpr auto fetch_min_bytes_default = 1;
// Default minimum number of bytes to fetch (5MiB)
static constexpr auto fetch_min_bytes_default = 5_MiB;
// Maximum number of bytes to fetch
std::optional<int32_t> fetch_max_bytes;
// Maximum number of bytes to fetch per partition
std::optional<int32_t> fetch_partition_max_bytes;
// Default maximum number of bytes to fetch per partition
static constexpr auto default_fetch_partition_max_bytes = 1_MiB;
// Default maximum number of bytes to fetch (1MiB)
static constexpr auto fetch_max_bytes_default = 1 * 1024 * 1024;
static constexpr auto fetch_max_bytes_default = 20 * 1024 * 1024;

// Returns the metadata_max_age_ms value
int32_t get_metadata_max_age_ms() const {
Expand Down Expand Up @@ -204,6 +208,11 @@ struct connection_config
return fetch_max_bytes.value_or(fetch_max_bytes_default);
}

int32_t get_fetch_partition_max_bytes() const {
return fetch_partition_max_bytes.value_or(
default_fetch_partition_max_bytes);
}

friend bool operator==(const connection_config&, const connection_config&)
= default;

Expand All @@ -220,7 +229,8 @@ struct connection_config
retry_backoff_ms,
fetch_wait_max_ms,
fetch_min_bytes,
fetch_max_bytes);
fetch_max_bytes,
fetch_partition_max_bytes);
}

friend std::ostream&
Expand Down
26 changes: 5 additions & 21 deletions src/v/cluster_link/replication/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ redpanda_cc_library(
hdrs = [
"partition_replicator.h",
],
visibility = ["__subpackages__"],
visibility = [
"__subpackages__",
"//src/v/cluster_link:__subpackages__",
],
deps = [
":deps",
"//src/v/cluster_link:logger",
Expand Down Expand Up @@ -88,30 +91,11 @@ redpanda_cc_library(
visibility = ["//src/v/cluster_link:__subpackages__"],
deps = [
":partition_data_queue",
"//src/v/base",
"//src/v/cluster_link:logger",
"//src/v/container:chunked_hash_map",
"//src/v/kafka/client/direct_consumer",
"//src/v/ssx:future_util",
"@seastar",
],
)

redpanda_cc_library(
name = "deps_impl",
srcs = [
"deps_impl.cc",
],
hdrs = [
"deps_impl.h",
],
visibility = ["//src/v/cluster_link:__subpackages__"],
deps = [
":deps",
":mux_remote_consumer",
"//src/v/cluster",
"//src/v/cluster_link:logger",
"//src/v/kafka/server:write_at_offset_stm",
"//src/v/ssx:future_util",
"@seastar",
],
)
210 changes: 0 additions & 210 deletions src/v/cluster_link/replication/deps_impl.cc

This file was deleted.

Loading