From 10e2b5fb115bf9f539b5ee4f78c5aabbb57bb446 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 23 Sep 2025 13:34:36 +0200 Subject: [PATCH 01/10] cl/model: adjusted consumer defaults MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After talking to perf team we decided to adjust default parameters of consumer used to fetch data from source cluster. The parameters we use current are following: ``` fetch_max_wait: 500ms fetch_min_bytes: 5 MiB fetch_max_bytes: 20 MiB partition_max_bytes: 1 MiB ``` Signed-off-by: Michał Maślanka --- proto/redpanda/core/admin/v2/shadow_link.proto | 6 +++--- src/v/cluster_link/model/types.h | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/proto/redpanda/core/admin/v2/shadow_link.proto b/proto/redpanda/core/admin/v2/shadow_link.proto index e7e20227d00b6..5de0ae779031f 100644 --- a/proto/redpanda/core/admin/v2/shadow_link.proto +++ b/proto/redpanda/core/admin/v2/shadow_link.proto @@ -260,13 +260,13 @@ message ShadowLinkClientOptions { // If 0 is provided, defaults to 100ms int32 retry_backoff_ms = 8; // Fetch request timeout - // If 0 is provided, defaults to 100ms + // If 0 is provided, defaults to 500ms int32 fetch_wait_max_ms = 9; // Fetch min bytes - // If 0 is provided, defaults to 1 byte + // If 0 is provided, defaults to 5 MiB int32 fetch_min_bytes = 10; // Fetch max bytes - // If 0 is provided, defaults to 1MiB + // If 0 is provided, defaults to 20 MiB int32 fetch_max_bytes = 11; } // Options for syncing topic metadata diff --git a/src/v/cluster_link/model/types.h b/src/v/cluster_link/model/types.h index 8e17fb5da8443..52aee718c37fb 100644 --- a/src/v/cluster_link/model/types.h +++ b/src/v/cluster_link/model/types.h @@ -253,16 +253,16 @@ struct connection_config static constexpr auto retry_backoff_ms_default = 100; // Maximum fetch wait time std::optional fetch_wait_max_ms; - // Default value for fetch_wait_max_ms (100ms) - static constexpr auto fetch_wait_max_ms_default = 100; + // Default value for fetch_wait_max_ms (500ms) + static constexpr auto fetch_wait_max_ms_default = 500; // Minimum number of bytes to fetch std::optional fetch_min_bytes; - // Default minimum number of bytes to fetch (1B) - static constexpr auto fetch_min_bytes_default = 1; + // Default minimum number of bytes to fetch (5MiB) + static constexpr auto fetch_min_bytes_default = 5_MiB; // Maximum number of bytes to fetch std::optional fetch_max_bytes; // Default maximum number of bytes to fetch (1MiB) - static constexpr auto fetch_max_bytes_default = 1 * 1024 * 1024; + static constexpr auto fetch_max_bytes_default = 20 * 1024 * 1024; // Returns the metadata_max_age_ms value int32_t get_metadata_max_age_ms() const { From d02a5f27b7c28779f3cc6b9e2552f64090b6a997 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 23 Sep 2025 13:37:05 +0200 Subject: [PATCH 02/10] cl/model: added partition_max_bytes to client configuration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added configuration allowing users to set partition max bytes Signed-off-by: Michał Maślanka --- .../redpanda/core/admin/v2/shadow_link.proto | 3 + src/v/cluster_link/model/types.h | 18 +++- .../admin/services/shadow_link/converter.cc | 7 ++ .../redpanda/core/admin/v2/shadow_link_pb2.py | 96 +++++++++---------- .../core/admin/v2/shadow_link_pb2.pyi | 13 ++- 5 files changed, 81 insertions(+), 56 deletions(-) diff --git a/proto/redpanda/core/admin/v2/shadow_link.proto b/proto/redpanda/core/admin/v2/shadow_link.proto index 5de0ae779031f..1e2f28aca2882 100644 --- a/proto/redpanda/core/admin/v2/shadow_link.proto +++ b/proto/redpanda/core/admin/v2/shadow_link.proto @@ -268,6 +268,9 @@ message ShadowLinkClientOptions { // Fetch max bytes // If 0 is provided, defaults to 20 MiB int32 fetch_max_bytes = 11; + // Fetch partition max bytes + // If 0 is provided, defaults to 1 MiB + int32 fetch_partition_max_bytes = 12; } // Options for syncing topic metadata message TopicMetadataSyncOptions { diff --git a/src/v/cluster_link/model/types.h b/src/v/cluster_link/model/types.h index 52aee718c37fb..f5a057afd99c4 100644 --- a/src/v/cluster_link/model/types.h +++ b/src/v/cluster_link/model/types.h @@ -220,7 +220,7 @@ std::ostream& operator<<(std::ostream& os, const tls_file_or_value& t); */ struct connection_config : serde:: - envelope, serde::compat_version<0>> { + envelope, serde::compat_version<0>> { /// List of addresses to bootstrap the connection std::vector bootstrap_servers; /// Support authn variants. Currently only SCRAM but update this to add @@ -261,7 +261,13 @@ struct connection_config static constexpr auto fetch_min_bytes_default = 5_MiB; // Maximum number of bytes to fetch std::optional fetch_max_bytes; - // Default maximum number of bytes to fetch (1MiB) + // Maximum number of bytes to fetch per partition, this value represents the + // max amount of data that the broker returns for a single partition in a + // fetch response. + std::optional fetch_partition_max_bytes; + // Default maximum number of bytes to fetch per partition + static constexpr auto default_fetch_partition_max_bytes = 1_MiB; + // Default maximum number of bytes to fetch (20MiB) static constexpr auto fetch_max_bytes_default = 20 * 1024 * 1024; // Returns the metadata_max_age_ms value @@ -294,6 +300,11 @@ struct connection_config return fetch_max_bytes.value_or(fetch_max_bytes_default); } + int32_t get_fetch_partition_max_bytes() const { + return fetch_partition_max_bytes.value_or( + default_fetch_partition_max_bytes); + } + friend bool operator==(const connection_config&, const connection_config&) = default; @@ -311,7 +322,8 @@ struct connection_config fetch_wait_max_ms, fetch_min_bytes, fetch_max_bytes, - tls_enabled); + tls_enabled, + fetch_partition_max_bytes); } friend std::ostream& diff --git a/src/v/redpanda/admin/services/shadow_link/converter.cc b/src/v/redpanda/admin/services/shadow_link/converter.cc index 9f373b7e07d8b..8de6cc6838293 100644 --- a/src/v/redpanda/admin/services/shadow_link/converter.cc +++ b/src/v/redpanda/admin/services/shadow_link/converter.cc @@ -456,6 +456,11 @@ create_connection_config(const shadow_link& sl) { config.fetch_max_bytes = client_options.get_fetch_max_bytes(); } + if (client_options.get_fetch_partition_max_bytes() != 0) { + config.fetch_partition_max_bytes + = client_options.get_fetch_partition_max_bytes(); + } + return config; } @@ -606,6 +611,8 @@ create_shadow_link_client_options(const cluster_link::model::metadata& md) { options.set_fetch_wait_max_ms(md.connection.fetch_wait_max_ms.value_or(0)); options.set_fetch_min_bytes(md.connection.fetch_min_bytes.value_or(0)); options.set_fetch_max_bytes(md.connection.fetch_max_bytes.value_or(0)); + options.set_fetch_partition_max_bytes( + md.connection.fetch_partition_max_bytes.value_or(0)); return options; } diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.py b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.py index 94155d81f300b..5ce4cd6d9fa1a 100644 --- a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.py +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.py @@ -15,7 +15,7 @@ from google.protobuf import duration_pb2 as google_dot_protobuf_dot_duration__pb2 from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2 from google.protobuf import field_mask_pb2 as google_dot_protobuf_dot_field__mask__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n.proto/redpanda/core/admin/v2/shadow_link.proto\x12\x16redpanda.core.admin.v2\x1a\'proto/redpanda/core/pbgen/options.proto\x1a#proto/redpanda/core/pbgen/rpc.proto\x1a$proto/redpanda/core/common/acl.proto\x1a\x1fgoogle/api/field_behavior.proto\x1a\x1bgoogle/api/field_info.proto\x1a\x19google/api/resource.proto\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a google/protobuf/field_mask.proto"\xc2\x01\n\nShadowLink\x12\x11\n\x04name\x18\x01 \x01(\tB\x03\xe0A\x02\x12\x18\n\x03uid\x18\x02 \x01(\tB\x0b\xe0A\x03\xe2\x8c\xcf\xd7\x08\x02\x08\x01\x12H\n\x0econfigurations\x18\x03 \x01(\x0b20.redpanda.core.admin.v2.ShadowLinkConfigurations\x12=\n\x06status\x18\x04 \x01(\x0b2(.redpanda.core.admin.v2.ShadowLinkStatusB\x03\xe0A\x03"\xc0\x01\n\x0bShadowTopic\x12\x11\n\x04name\x18\x01 \x01(\tB\x03\xe0A\x03\x12\x1d\n\x08topic_id\x18\x02 \x01(\tB\x0b\xe0A\x03\xe2\x8c\xcf\xd7\x08\x02\x08\x01\x12\x19\n\x11source_topic_name\x18\x03 \x01(\t\x12$\n\x0fsource_topic_id\x18\x04 \x01(\tB\x0b\xe0A\x03\xe2\x8c\xcf\xd7\x08\x02\x08\x01\x12>\n\x06status\x18\x05 \x01(\x0b2).redpanda.core.admin.v2.ShadowTopicStatusB\x03\xe0A\x03"R\n\x17CreateShadowLinkRequest\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"S\n\x18CreateShadowLinkResponse\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"p\n\x17DeleteShadowLinkRequest\x12F\n\x04name\x18\x01 \x01(\tB8\xe0A\x02\xfaA2\n0redpanda.core.admin.ShadowLinkService/ShadowLink\x12\r\n\x05force\x18\x02 \x01(\x08"\x1a\n\x18DeleteShadowLinkResponse"^\n\x14GetShadowLinkRequest\x12F\n\x04name\x18\x01 \x01(\tB8\xe0A\x02\xfaA2\n0redpanda.core.admin.ShadowLinkService/ShadowLink"P\n\x15GetShadowLinkResponse\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"\x18\n\x16ListShadowLinksRequest"S\n\x17ListShadowLinksResponse\x128\n\x0cshadow_links\x18\x01 \x03(\x0b2".redpanda.core.admin.v2.ShadowLink"\x83\x01\n\x17UpdateShadowLinkRequest\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink\x12/\n\x0bupdate_mask\x18\x02 \x01(\x0b2\x1a.google.protobuf.FieldMask"S\n\x18UpdateShadowLinkResponse\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"y\n\x0fFailOverRequest\x12F\n\x04name\x18\x01 \x01(\tB8\xe0A\x02\xfaA2\n0redpanda.core.admin.ShadowLinkService/ShadowLink\x12\x1e\n\x11shadow_topic_name\x18\x02 \x01(\tB\x03\xe0A\x01"K\n\x10FailOverResponse\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"I\n\x15GetShadowTopicRequest\x12\x1d\n\x10shadow_link_name\x18\x01 \x01(\tB\x03\xe0A\x02\x12\x11\n\x04name\x18\x02 \x01(\tB\x03\xe0A\x02"S\n\x16GetShadowTopicResponse\x129\n\x0cshadow_topic\x18\x01 \x01(\x0b2#.redpanda.core.admin.v2.ShadowTopic"8\n\x17ListShadowTopicsRequest\x12\x1d\n\x10shadow_link_name\x18\x01 \x01(\tB\x03\xe0A\x02"V\n\x18ListShadowTopicsResponse\x12:\n\rshadow_topics\x18\x01 \x03(\x0b2#.redpanda.core.admin.v2.ShadowTopic"\xe7\x02\n\x18ShadowLinkConfigurations\x12G\n\x0eclient_options\x18\x01 \x01(\x0b2/.redpanda.core.admin.v2.ShadowLinkClientOptions\x12U\n\x1btopic_metadata_sync_options\x18\x02 \x01(\x0b20.redpanda.core.admin.v2.TopicMetadataSyncOptions\x12W\n\x1cconsumer_offset_sync_options\x18\x03 \x01(\x0b21.redpanda.core.admin.v2.ConsumerOffsetSyncOptions\x12R\n\x15security_sync_options\x18\x04 \x01(\x0b23.redpanda.core.admin.v2.SecuritySettingsSyncOptions"\xe1\x03\n\x17ShadowLinkClientOptions\x12\x1e\n\x11bootstrap_servers\x18\x01 \x03(\tB\x03\xe0A\x02\x12\x16\n\tclient_id\x18\x02 \x01(\tB\x03\xe0A\x03\x12\x19\n\x11source_cluster_id\x18\x03 \x01(\t\x12>\n\x0ctls_settings\x18\x04 \x01(\x0b2#.redpanda.core.admin.v2.TLSSettingsH\x00\x88\x01\x01\x12^\n\x1cauthentication_configuration\x18\x05 \x01(\x0b23.redpanda.core.admin.v2.AuthenticationConfigurationH\x01\x88\x01\x01\x12\x1b\n\x13metadata_max_age_ms\x18\x06 \x01(\x05\x12\x1d\n\x15connection_timeout_ms\x18\x07 \x01(\x05\x12\x18\n\x10retry_backoff_ms\x18\x08 \x01(\x05\x12\x19\n\x11fetch_wait_max_ms\x18\t \x01(\x05\x12\x17\n\x0ffetch_min_bytes\x18\n \x01(\x05\x12\x17\n\x0ffetch_max_bytes\x18\x0b \x01(\x05B\x0f\n\r_tls_settingsB\x1f\n\x1d_authentication_configuration"\xd6\x01\n\x18TopicMetadataSyncOptions\x12+\n\x08interval\x18\x01 \x01(\x0b2\x19.google.protobuf.Duration\x12L\n auto_create_shadow_topic_filters\x18\x02 \x03(\x0b2".redpanda.core.admin.v2.NameFilter\x12&\n\x1esynced_shadow_topic_properties\x18\x03 \x03(\t\x12\x17\n\x0fexclude_default\x18\x04 \x01(\x08"\x94\x01\n\x19ConsumerOffsetSyncOptions\x12+\n\x08interval\x18\x01 \x01(\x0b2\x19.google.protobuf.Duration\x12\x0f\n\x07enabled\x18\x02 \x01(\x08\x129\n\rgroup_filters\x18\x03 \x03(\x0b2".redpanda.core.admin.v2.NameFilter"\x93\x01\n\x1bSecuritySettingsSyncOptions\x12+\n\x08interval\x18\x01 \x01(\x0b2\x19.google.protobuf.Duration\x12\x0f\n\x07enabled\x18\x02 \x01(\x08\x126\n\x0bacl_filters\x18\x05 \x03(\x0b2!.redpanda.core.admin.v2.ACLFilter"\xb8\x01\n\x0bTLSSettings\x12\x0f\n\x07enabled\x18\x03 \x01(\x08\x12D\n\x11tls_file_settings\x18\x01 \x01(\x0b2\'.redpanda.core.admin.v2.TLSFileSettingsH\x00\x12B\n\x10tls_pem_settings\x18\x02 \x01(\x0b2&.redpanda.core.admin.v2.TLSPEMSettingsH\x00B\x0e\n\x0ctls_settings"s\n\x1bAuthenticationConfiguration\x12B\n\x13scram_configuration\x18\x01 \x01(\x0b2#.redpanda.core.admin.v2.ScramConfigH\x00B\x10\n\x0eauthentication"G\n\x0fTLSFileSettings\x12\x0f\n\x07ca_path\x18\x01 \x01(\t\x12\x10\n\x08key_path\x18\x02 \x01(\t\x12\x11\n\tcert_path\x18\x03 \x01(\t"Z\n\x0eTLSPEMSettings\x12\n\n\x02ca\x18\x01 \x01(\t\x12\x10\n\x03key\x18\x02 \x01(\tB\x03\xe0A\x04\x12\x1c\n\x0fkey_fingerprint\x18\x03 \x01(\tB\x03\xe0A\x03\x12\x0c\n\x04cert\x18\x04 \x01(\t"\xcc\x01\n\x0bScramConfig\x12\x10\n\x08username\x18\x01 \x01(\t\x12\x15\n\x08password\x18\x02 \x01(\tB\x03\xe0A\x04\x12\x19\n\x0cpassword_set\x18\x03 \x01(\x08B\x03\xe0A\x03\x128\n\x0fpassword_set_at\x18\x04 \x01(\x0b2\x1a.google.protobuf.TimestampB\x03\xe0A\x03\x12?\n\x0fscram_mechanism\x18\x05 \x01(\x0e2&.redpanda.core.admin.v2.ScramMechanism"\x8e\x01\n\nNameFilter\x129\n\x0cpattern_type\x18\x01 \x01(\x0e2#.redpanda.core.admin.v2.PatternType\x127\n\x0bfilter_type\x18\x02 \x01(\x0e2".redpanda.core.admin.v2.FilterType\x12\x0c\n\x04name\x18\x03 \x01(\t"\x8f\x01\n\tACLFilter\x12B\n\x0fresource_filter\x18\x01 \x01(\x0b2).redpanda.core.admin.v2.ACLResourceFilter\x12>\n\raccess_filter\x18\x02 \x01(\x0b2\'.redpanda.core.admin.v2.ACLAccessFilter"\x93\x01\n\x11ACLResourceFilter\x128\n\rresource_type\x18\x01 \x01(\x0e2!.redpanda.core.common.ACLResource\x126\n\x0cpattern_type\x18\x02 \x01(\x0e2 .redpanda.core.common.ACLPattern\x12\x0c\n\x04name\x18\x03 \x01(\t"\xab\x01\n\x0fACLAccessFilter\x12\x11\n\tprincipal\x18\x01 \x01(\t\x125\n\toperation\x18\x02 \x01(\x0e2".redpanda.core.common.ACLOperation\x12@\n\x0fpermission_type\x18\x03 \x01(\x0e2\'.redpanda.core.common.ACLPermissionType\x12\x0c\n\x04host\x18\x04 \x01(\t"\xf3\x01\n\x10ShadowLinkStatus\x126\n\x05state\x18\x01 \x01(\x0e2\'.redpanda.core.admin.v2.ShadowLinkState\x12C\n\rtask_statuses\x18\x02 \x03(\x0b2,.redpanda.core.admin.v2.ShadowLinkTaskStatus\x12:\n\rshadow_topics\x18\x03 \x03(\x0b2#.redpanda.core.admin.v2.ShadowTopic\x12&\n\x1esynced_shadow_topic_properties\x18\x04 \x03(\t"y\n\x14ShadowLinkTaskStatus\x12\x0c\n\x04name\x18\x01 \x01(\t\x120\n\x05state\x18\x02 \x01(\x0e2!.redpanda.core.admin.v2.TaskState\x12\x0e\n\x06reason\x18\x03 \x01(\t\x12\x11\n\tbroker_id\x18\x04 \x01(\x05"\x9e\x01\n\x11ShadowTopicStatus\x127\n\x05state\x18\x01 \x01(\x0e2(.redpanda.core.admin.v2.ShadowTopicState\x12P\n\x15partition_information\x18\x02 \x03(\x0b21.redpanda.core.admin.v2.TopicPartitionInformation"\xce\x01\n\x19TopicPartitionInformation\x12\x14\n\x0cpartition_id\x18\x01 \x01(\x03\x12!\n\x19source_last_stable_offset\x18\x02 \x01(\x03\x12\x1d\n\x15source_high_watermark\x18\x03 \x01(\x03\x12\x16\n\x0ehigh_watermark\x18\x04 \x01(\x03\x12A\n\x1dsource_last_updated_timestamp\x18\x05 \x01(\x0b2\x1a.google.protobuf.Timestamp*p\n\x0fShadowLinkState\x12!\n\x1dSHADOW_LINK_STATE_UNSPECIFIED\x10\x00\x12\x1c\n\x18SHADOW_LINK_STATE_ACTIVE\x10\x01\x12\x1c\n\x18SHADOW_LINK_STATE_PAUSED\x10\x02*w\n\x0eScramMechanism\x12\x1f\n\x1bSCRAM_MECHANISM_UNSPECIFIED\x10\x00\x12!\n\x1dSCRAM_MECHANISM_SCRAM_SHA_256\x10\x01\x12!\n\x1dSCRAM_MECHANISM_SCRAM_SHA_512\x10\x02*^\n\x0bPatternType\x12\x1c\n\x18PATTERN_TYPE_UNSPECIFIED\x10\x00\x12\x18\n\x14PATTERN_TYPE_LITERAL\x10\x01\x12\x17\n\x13PATTERN_TYPE_PREFIX\x10\x02*[\n\nFilterType\x12\x1b\n\x17FILTER_TYPE_UNSPECIFIED\x10\x00\x12\x17\n\x13FILTER_TYPE_INCLUDE\x10\x01\x12\x17\n\x13FILTER_TYPE_EXCLUDE\x10\x02*\xaa\x01\n\tTaskState\x12\x1a\n\x16TASK_STATE_UNSPECIFIED\x10\x00\x12\x15\n\x11TASK_STATE_ACTIVE\x10\x01\x12\x15\n\x11TASK_STATE_PAUSED\x10\x02\x12\x1f\n\x1bTASK_STATE_LINK_UNAVAILABLE\x10\x03\x12\x1a\n\x16TASK_STATE_NOT_RUNNING\x10\x04\x12\x16\n\x12TASK_STATE_FAULTED\x10\x05*\xa0\x02\n\x10ShadowTopicState\x12"\n\x1eSHADOW_TOPIC_STATE_UNSPECIFIED\x10\x00\x12\x1d\n\x19SHADOW_TOPIC_STATE_ACTIVE\x10\x01\x12\x1e\n\x1aSHADOW_TOPIC_STATE_FAULTED\x10\x02\x12\x1d\n\x19SHADOW_TOPIC_STATE_PAUSED\x10\x03\x12#\n\x1fSHADOW_TOPIC_STATE_FAILING_OVER\x10\x04\x12"\n\x1eSHADOW_TOPIC_STATE_FAILED_OVER\x10\x05\x12 \n\x1cSHADOW_TOPIC_STATE_PROMOTING\x10\x06\x12\x1f\n\x1bSHADOW_TOPIC_STATE_PROMOTED\x10\x072\xe1\x07\n\x11ShadowLinkService\x12}\n\x10CreateShadowLink\x12/.redpanda.core.admin.v2.CreateShadowLinkRequest\x1a0.redpanda.core.admin.v2.CreateShadowLinkResponse"\x06\xea\x92\x19\x02\x10\x03\x12}\n\x10DeleteShadowLink\x12/.redpanda.core.admin.v2.DeleteShadowLinkRequest\x1a0.redpanda.core.admin.v2.DeleteShadowLinkResponse"\x06\xea\x92\x19\x02\x10\x03\x12t\n\rGetShadowLink\x12,.redpanda.core.admin.v2.GetShadowLinkRequest\x1a-.redpanda.core.admin.v2.GetShadowLinkResponse"\x06\xea\x92\x19\x02\x10\x03\x12z\n\x0fListShadowLinks\x12..redpanda.core.admin.v2.ListShadowLinksRequest\x1a/.redpanda.core.admin.v2.ListShadowLinksResponse"\x06\xea\x92\x19\x02\x10\x03\x12}\n\x10UpdateShadowLink\x12/.redpanda.core.admin.v2.UpdateShadowLinkRequest\x1a0.redpanda.core.admin.v2.UpdateShadowLinkResponse"\x06\xea\x92\x19\x02\x10\x03\x12e\n\x08FailOver\x12\'.redpanda.core.admin.v2.FailOverRequest\x1a(.redpanda.core.admin.v2.FailOverResponse"\x06\xea\x92\x19\x02\x10\x03\x12w\n\x0eGetShadowTopic\x12-.redpanda.core.admin.v2.GetShadowTopicRequest\x1a..redpanda.core.admin.v2.GetShadowTopicResponse"\x06\xea\x92\x19\x02\x10\x03\x12}\n\x10ListShadowTopics\x12/.redpanda.core.admin.v2.ListShadowTopicsRequest\x1a0.redpanda.core.admin.v2.ListShadowTopicsResponse"\x06\xea\x92\x19\x02\x10\x03B\x10\xea\x92\x19\x0cproto::adminb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n.proto/redpanda/core/admin/v2/shadow_link.proto\x12\x16redpanda.core.admin.v2\x1a\'proto/redpanda/core/pbgen/options.proto\x1a#proto/redpanda/core/pbgen/rpc.proto\x1a$proto/redpanda/core/common/acl.proto\x1a\x1fgoogle/api/field_behavior.proto\x1a\x1bgoogle/api/field_info.proto\x1a\x19google/api/resource.proto\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a google/protobuf/field_mask.proto"\xc2\x01\n\nShadowLink\x12\x11\n\x04name\x18\x01 \x01(\tB\x03\xe0A\x02\x12\x18\n\x03uid\x18\x02 \x01(\tB\x0b\xe0A\x03\xe2\x8c\xcf\xd7\x08\x02\x08\x01\x12H\n\x0econfigurations\x18\x03 \x01(\x0b20.redpanda.core.admin.v2.ShadowLinkConfigurations\x12=\n\x06status\x18\x04 \x01(\x0b2(.redpanda.core.admin.v2.ShadowLinkStatusB\x03\xe0A\x03"\xc0\x01\n\x0bShadowTopic\x12\x11\n\x04name\x18\x01 \x01(\tB\x03\xe0A\x03\x12\x1d\n\x08topic_id\x18\x02 \x01(\tB\x0b\xe0A\x03\xe2\x8c\xcf\xd7\x08\x02\x08\x01\x12\x19\n\x11source_topic_name\x18\x03 \x01(\t\x12$\n\x0fsource_topic_id\x18\x04 \x01(\tB\x0b\xe0A\x03\xe2\x8c\xcf\xd7\x08\x02\x08\x01\x12>\n\x06status\x18\x05 \x01(\x0b2).redpanda.core.admin.v2.ShadowTopicStatusB\x03\xe0A\x03"R\n\x17CreateShadowLinkRequest\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"S\n\x18CreateShadowLinkResponse\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"p\n\x17DeleteShadowLinkRequest\x12F\n\x04name\x18\x01 \x01(\tB8\xe0A\x02\xfaA2\n0redpanda.core.admin.ShadowLinkService/ShadowLink\x12\r\n\x05force\x18\x02 \x01(\x08"\x1a\n\x18DeleteShadowLinkResponse"^\n\x14GetShadowLinkRequest\x12F\n\x04name\x18\x01 \x01(\tB8\xe0A\x02\xfaA2\n0redpanda.core.admin.ShadowLinkService/ShadowLink"P\n\x15GetShadowLinkResponse\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"\x18\n\x16ListShadowLinksRequest"S\n\x17ListShadowLinksResponse\x128\n\x0cshadow_links\x18\x01 \x03(\x0b2".redpanda.core.admin.v2.ShadowLink"\x83\x01\n\x17UpdateShadowLinkRequest\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink\x12/\n\x0bupdate_mask\x18\x02 \x01(\x0b2\x1a.google.protobuf.FieldMask"S\n\x18UpdateShadowLinkResponse\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"y\n\x0fFailOverRequest\x12F\n\x04name\x18\x01 \x01(\tB8\xe0A\x02\xfaA2\n0redpanda.core.admin.ShadowLinkService/ShadowLink\x12\x1e\n\x11shadow_topic_name\x18\x02 \x01(\tB\x03\xe0A\x01"K\n\x10FailOverResponse\x127\n\x0bshadow_link\x18\x01 \x01(\x0b2".redpanda.core.admin.v2.ShadowLink"I\n\x15GetShadowTopicRequest\x12\x1d\n\x10shadow_link_name\x18\x01 \x01(\tB\x03\xe0A\x02\x12\x11\n\x04name\x18\x02 \x01(\tB\x03\xe0A\x02"S\n\x16GetShadowTopicResponse\x129\n\x0cshadow_topic\x18\x01 \x01(\x0b2#.redpanda.core.admin.v2.ShadowTopic"8\n\x17ListShadowTopicsRequest\x12\x1d\n\x10shadow_link_name\x18\x01 \x01(\tB\x03\xe0A\x02"V\n\x18ListShadowTopicsResponse\x12:\n\rshadow_topics\x18\x01 \x03(\x0b2#.redpanda.core.admin.v2.ShadowTopic"\xe7\x02\n\x18ShadowLinkConfigurations\x12G\n\x0eclient_options\x18\x01 \x01(\x0b2/.redpanda.core.admin.v2.ShadowLinkClientOptions\x12U\n\x1btopic_metadata_sync_options\x18\x02 \x01(\x0b20.redpanda.core.admin.v2.TopicMetadataSyncOptions\x12W\n\x1cconsumer_offset_sync_options\x18\x03 \x01(\x0b21.redpanda.core.admin.v2.ConsumerOffsetSyncOptions\x12R\n\x15security_sync_options\x18\x04 \x01(\x0b23.redpanda.core.admin.v2.SecuritySettingsSyncOptions"\x84\x04\n\x17ShadowLinkClientOptions\x12\x1e\n\x11bootstrap_servers\x18\x01 \x03(\tB\x03\xe0A\x02\x12\x16\n\tclient_id\x18\x02 \x01(\tB\x03\xe0A\x03\x12\x19\n\x11source_cluster_id\x18\x03 \x01(\t\x12>\n\x0ctls_settings\x18\x04 \x01(\x0b2#.redpanda.core.admin.v2.TLSSettingsH\x00\x88\x01\x01\x12^\n\x1cauthentication_configuration\x18\x05 \x01(\x0b23.redpanda.core.admin.v2.AuthenticationConfigurationH\x01\x88\x01\x01\x12\x1b\n\x13metadata_max_age_ms\x18\x06 \x01(\x05\x12\x1d\n\x15connection_timeout_ms\x18\x07 \x01(\x05\x12\x18\n\x10retry_backoff_ms\x18\x08 \x01(\x05\x12\x19\n\x11fetch_wait_max_ms\x18\t \x01(\x05\x12\x17\n\x0ffetch_min_bytes\x18\n \x01(\x05\x12\x17\n\x0ffetch_max_bytes\x18\x0b \x01(\x05\x12!\n\x19fetch_partition_max_bytes\x18\x0c \x01(\x05B\x0f\n\r_tls_settingsB\x1f\n\x1d_authentication_configuration"\xd6\x01\n\x18TopicMetadataSyncOptions\x12+\n\x08interval\x18\x01 \x01(\x0b2\x19.google.protobuf.Duration\x12L\n auto_create_shadow_topic_filters\x18\x02 \x03(\x0b2".redpanda.core.admin.v2.NameFilter\x12&\n\x1esynced_shadow_topic_properties\x18\x03 \x03(\t\x12\x17\n\x0fexclude_default\x18\x04 \x01(\x08"\x94\x01\n\x19ConsumerOffsetSyncOptions\x12+\n\x08interval\x18\x01 \x01(\x0b2\x19.google.protobuf.Duration\x12\x0f\n\x07enabled\x18\x02 \x01(\x08\x129\n\rgroup_filters\x18\x03 \x03(\x0b2".redpanda.core.admin.v2.NameFilter"\x93\x01\n\x1bSecuritySettingsSyncOptions\x12+\n\x08interval\x18\x01 \x01(\x0b2\x19.google.protobuf.Duration\x12\x0f\n\x07enabled\x18\x02 \x01(\x08\x126\n\x0bacl_filters\x18\x05 \x03(\x0b2!.redpanda.core.admin.v2.ACLFilter"\xb8\x01\n\x0bTLSSettings\x12\x0f\n\x07enabled\x18\x03 \x01(\x08\x12D\n\x11tls_file_settings\x18\x01 \x01(\x0b2\'.redpanda.core.admin.v2.TLSFileSettingsH\x00\x12B\n\x10tls_pem_settings\x18\x02 \x01(\x0b2&.redpanda.core.admin.v2.TLSPEMSettingsH\x00B\x0e\n\x0ctls_settings"s\n\x1bAuthenticationConfiguration\x12B\n\x13scram_configuration\x18\x01 \x01(\x0b2#.redpanda.core.admin.v2.ScramConfigH\x00B\x10\n\x0eauthentication"G\n\x0fTLSFileSettings\x12\x0f\n\x07ca_path\x18\x01 \x01(\t\x12\x10\n\x08key_path\x18\x02 \x01(\t\x12\x11\n\tcert_path\x18\x03 \x01(\t"Z\n\x0eTLSPEMSettings\x12\n\n\x02ca\x18\x01 \x01(\t\x12\x10\n\x03key\x18\x02 \x01(\tB\x03\xe0A\x04\x12\x1c\n\x0fkey_fingerprint\x18\x03 \x01(\tB\x03\xe0A\x03\x12\x0c\n\x04cert\x18\x04 \x01(\t"\xcc\x01\n\x0bScramConfig\x12\x10\n\x08username\x18\x01 \x01(\t\x12\x15\n\x08password\x18\x02 \x01(\tB\x03\xe0A\x04\x12\x19\n\x0cpassword_set\x18\x03 \x01(\x08B\x03\xe0A\x03\x128\n\x0fpassword_set_at\x18\x04 \x01(\x0b2\x1a.google.protobuf.TimestampB\x03\xe0A\x03\x12?\n\x0fscram_mechanism\x18\x05 \x01(\x0e2&.redpanda.core.admin.v2.ScramMechanism"\x8e\x01\n\nNameFilter\x129\n\x0cpattern_type\x18\x01 \x01(\x0e2#.redpanda.core.admin.v2.PatternType\x127\n\x0bfilter_type\x18\x02 \x01(\x0e2".redpanda.core.admin.v2.FilterType\x12\x0c\n\x04name\x18\x03 \x01(\t"\x8f\x01\n\tACLFilter\x12B\n\x0fresource_filter\x18\x01 \x01(\x0b2).redpanda.core.admin.v2.ACLResourceFilter\x12>\n\raccess_filter\x18\x02 \x01(\x0b2\'.redpanda.core.admin.v2.ACLAccessFilter"\x93\x01\n\x11ACLResourceFilter\x128\n\rresource_type\x18\x01 \x01(\x0e2!.redpanda.core.common.ACLResource\x126\n\x0cpattern_type\x18\x02 \x01(\x0e2 .redpanda.core.common.ACLPattern\x12\x0c\n\x04name\x18\x03 \x01(\t"\xab\x01\n\x0fACLAccessFilter\x12\x11\n\tprincipal\x18\x01 \x01(\t\x125\n\toperation\x18\x02 \x01(\x0e2".redpanda.core.common.ACLOperation\x12@\n\x0fpermission_type\x18\x03 \x01(\x0e2\'.redpanda.core.common.ACLPermissionType\x12\x0c\n\x04host\x18\x04 \x01(\t"\xf3\x01\n\x10ShadowLinkStatus\x126\n\x05state\x18\x01 \x01(\x0e2\'.redpanda.core.admin.v2.ShadowLinkState\x12C\n\rtask_statuses\x18\x02 \x03(\x0b2,.redpanda.core.admin.v2.ShadowLinkTaskStatus\x12:\n\rshadow_topics\x18\x03 \x03(\x0b2#.redpanda.core.admin.v2.ShadowTopic\x12&\n\x1esynced_shadow_topic_properties\x18\x04 \x03(\t"y\n\x14ShadowLinkTaskStatus\x12\x0c\n\x04name\x18\x01 \x01(\t\x120\n\x05state\x18\x02 \x01(\x0e2!.redpanda.core.admin.v2.TaskState\x12\x0e\n\x06reason\x18\x03 \x01(\t\x12\x11\n\tbroker_id\x18\x04 \x01(\x05"\x9e\x01\n\x11ShadowTopicStatus\x127\n\x05state\x18\x01 \x01(\x0e2(.redpanda.core.admin.v2.ShadowTopicState\x12P\n\x15partition_information\x18\x02 \x03(\x0b21.redpanda.core.admin.v2.TopicPartitionInformation"\xce\x01\n\x19TopicPartitionInformation\x12\x14\n\x0cpartition_id\x18\x01 \x01(\x03\x12!\n\x19source_last_stable_offset\x18\x02 \x01(\x03\x12\x1d\n\x15source_high_watermark\x18\x03 \x01(\x03\x12\x16\n\x0ehigh_watermark\x18\x04 \x01(\x03\x12A\n\x1dsource_last_updated_timestamp\x18\x05 \x01(\x0b2\x1a.google.protobuf.Timestamp*p\n\x0fShadowLinkState\x12!\n\x1dSHADOW_LINK_STATE_UNSPECIFIED\x10\x00\x12\x1c\n\x18SHADOW_LINK_STATE_ACTIVE\x10\x01\x12\x1c\n\x18SHADOW_LINK_STATE_PAUSED\x10\x02*w\n\x0eScramMechanism\x12\x1f\n\x1bSCRAM_MECHANISM_UNSPECIFIED\x10\x00\x12!\n\x1dSCRAM_MECHANISM_SCRAM_SHA_256\x10\x01\x12!\n\x1dSCRAM_MECHANISM_SCRAM_SHA_512\x10\x02*^\n\x0bPatternType\x12\x1c\n\x18PATTERN_TYPE_UNSPECIFIED\x10\x00\x12\x18\n\x14PATTERN_TYPE_LITERAL\x10\x01\x12\x17\n\x13PATTERN_TYPE_PREFIX\x10\x02*[\n\nFilterType\x12\x1b\n\x17FILTER_TYPE_UNSPECIFIED\x10\x00\x12\x17\n\x13FILTER_TYPE_INCLUDE\x10\x01\x12\x17\n\x13FILTER_TYPE_EXCLUDE\x10\x02*\xaa\x01\n\tTaskState\x12\x1a\n\x16TASK_STATE_UNSPECIFIED\x10\x00\x12\x15\n\x11TASK_STATE_ACTIVE\x10\x01\x12\x15\n\x11TASK_STATE_PAUSED\x10\x02\x12\x1f\n\x1bTASK_STATE_LINK_UNAVAILABLE\x10\x03\x12\x1a\n\x16TASK_STATE_NOT_RUNNING\x10\x04\x12\x16\n\x12TASK_STATE_FAULTED\x10\x05*\xa0\x02\n\x10ShadowTopicState\x12"\n\x1eSHADOW_TOPIC_STATE_UNSPECIFIED\x10\x00\x12\x1d\n\x19SHADOW_TOPIC_STATE_ACTIVE\x10\x01\x12\x1e\n\x1aSHADOW_TOPIC_STATE_FAULTED\x10\x02\x12\x1d\n\x19SHADOW_TOPIC_STATE_PAUSED\x10\x03\x12#\n\x1fSHADOW_TOPIC_STATE_FAILING_OVER\x10\x04\x12"\n\x1eSHADOW_TOPIC_STATE_FAILED_OVER\x10\x05\x12 \n\x1cSHADOW_TOPIC_STATE_PROMOTING\x10\x06\x12\x1f\n\x1bSHADOW_TOPIC_STATE_PROMOTED\x10\x072\xe1\x07\n\x11ShadowLinkService\x12}\n\x10CreateShadowLink\x12/.redpanda.core.admin.v2.CreateShadowLinkRequest\x1a0.redpanda.core.admin.v2.CreateShadowLinkResponse"\x06\xea\x92\x19\x02\x10\x03\x12}\n\x10DeleteShadowLink\x12/.redpanda.core.admin.v2.DeleteShadowLinkRequest\x1a0.redpanda.core.admin.v2.DeleteShadowLinkResponse"\x06\xea\x92\x19\x02\x10\x03\x12t\n\rGetShadowLink\x12,.redpanda.core.admin.v2.GetShadowLinkRequest\x1a-.redpanda.core.admin.v2.GetShadowLinkResponse"\x06\xea\x92\x19\x02\x10\x03\x12z\n\x0fListShadowLinks\x12..redpanda.core.admin.v2.ListShadowLinksRequest\x1a/.redpanda.core.admin.v2.ListShadowLinksResponse"\x06\xea\x92\x19\x02\x10\x03\x12}\n\x10UpdateShadowLink\x12/.redpanda.core.admin.v2.UpdateShadowLinkRequest\x1a0.redpanda.core.admin.v2.UpdateShadowLinkResponse"\x06\xea\x92\x19\x02\x10\x03\x12e\n\x08FailOver\x12\'.redpanda.core.admin.v2.FailOverRequest\x1a(.redpanda.core.admin.v2.FailOverResponse"\x06\xea\x92\x19\x02\x10\x03\x12w\n\x0eGetShadowTopic\x12-.redpanda.core.admin.v2.GetShadowTopicRequest\x1a..redpanda.core.admin.v2.GetShadowTopicResponse"\x06\xea\x92\x19\x02\x10\x03\x12}\n\x10ListShadowTopics\x12/.redpanda.core.admin.v2.ListShadowTopicsRequest\x1a0.redpanda.core.admin.v2.ListShadowTopicsResponse"\x06\xea\x92\x19\x02\x10\x03B\x10\xea\x92\x19\x0cproto::adminb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'proto.redpanda.core.admin.v2.shadow_link_pb2', _globals) @@ -80,18 +80,18 @@ _globals['_SHADOWLINKSERVICE'].methods_by_name['GetShadowTopic']._serialized_options = b'\xea\x92\x19\x02\x10\x03' _globals['_SHADOWLINKSERVICE'].methods_by_name['ListShadowTopics']._loaded_options = None _globals['_SHADOWLINKSERVICE'].methods_by_name['ListShadowTopics']._serialized_options = b'\xea\x92\x19\x02\x10\x03' - _globals['_SHADOWLINKSTATE']._serialized_start = 5489 - _globals['_SHADOWLINKSTATE']._serialized_end = 5601 - _globals['_SCRAMMECHANISM']._serialized_start = 5603 - _globals['_SCRAMMECHANISM']._serialized_end = 5722 - _globals['_PATTERNTYPE']._serialized_start = 5724 - _globals['_PATTERNTYPE']._serialized_end = 5818 - _globals['_FILTERTYPE']._serialized_start = 5820 - _globals['_FILTERTYPE']._serialized_end = 5911 - _globals['_TASKSTATE']._serialized_start = 5914 - _globals['_TASKSTATE']._serialized_end = 6084 - _globals['_SHADOWTOPICSTATE']._serialized_start = 6087 - _globals['_SHADOWTOPICSTATE']._serialized_end = 6375 + _globals['_SHADOWLINKSTATE']._serialized_start = 5524 + _globals['_SHADOWLINKSTATE']._serialized_end = 5636 + _globals['_SCRAMMECHANISM']._serialized_start = 5638 + _globals['_SCRAMMECHANISM']._serialized_end = 5757 + _globals['_PATTERNTYPE']._serialized_start = 5759 + _globals['_PATTERNTYPE']._serialized_end = 5853 + _globals['_FILTERTYPE']._serialized_start = 5855 + _globals['_FILTERTYPE']._serialized_end = 5946 + _globals['_TASKSTATE']._serialized_start = 5949 + _globals['_TASKSTATE']._serialized_end = 6119 + _globals['_SHADOWTOPICSTATE']._serialized_start = 6122 + _globals['_SHADOWTOPICSTATE']._serialized_end = 6410 _globals['_SHADOWLINK']._serialized_start = 379 _globals['_SHADOWLINK']._serialized_end = 573 _globals['_SHADOWTOPIC']._serialized_start = 576 @@ -131,38 +131,38 @@ _globals['_SHADOWLINKCONFIGURATIONS']._serialized_start = 2096 _globals['_SHADOWLINKCONFIGURATIONS']._serialized_end = 2455 _globals['_SHADOWLINKCLIENTOPTIONS']._serialized_start = 2458 - _globals['_SHADOWLINKCLIENTOPTIONS']._serialized_end = 2939 - _globals['_TOPICMETADATASYNCOPTIONS']._serialized_start = 2942 - _globals['_TOPICMETADATASYNCOPTIONS']._serialized_end = 3156 - _globals['_CONSUMEROFFSETSYNCOPTIONS']._serialized_start = 3159 - _globals['_CONSUMEROFFSETSYNCOPTIONS']._serialized_end = 3307 - _globals['_SECURITYSETTINGSSYNCOPTIONS']._serialized_start = 3310 - _globals['_SECURITYSETTINGSSYNCOPTIONS']._serialized_end = 3457 - _globals['_TLSSETTINGS']._serialized_start = 3460 - _globals['_TLSSETTINGS']._serialized_end = 3644 - _globals['_AUTHENTICATIONCONFIGURATION']._serialized_start = 3646 - _globals['_AUTHENTICATIONCONFIGURATION']._serialized_end = 3761 - _globals['_TLSFILESETTINGS']._serialized_start = 3763 - _globals['_TLSFILESETTINGS']._serialized_end = 3834 - _globals['_TLSPEMSETTINGS']._serialized_start = 3836 - _globals['_TLSPEMSETTINGS']._serialized_end = 3926 - _globals['_SCRAMCONFIG']._serialized_start = 3929 - _globals['_SCRAMCONFIG']._serialized_end = 4133 - _globals['_NAMEFILTER']._serialized_start = 4136 - _globals['_NAMEFILTER']._serialized_end = 4278 - _globals['_ACLFILTER']._serialized_start = 4281 - _globals['_ACLFILTER']._serialized_end = 4424 - _globals['_ACLRESOURCEFILTER']._serialized_start = 4427 - _globals['_ACLRESOURCEFILTER']._serialized_end = 4574 - _globals['_ACLACCESSFILTER']._serialized_start = 4577 - _globals['_ACLACCESSFILTER']._serialized_end = 4748 - _globals['_SHADOWLINKSTATUS']._serialized_start = 4751 - _globals['_SHADOWLINKSTATUS']._serialized_end = 4994 - _globals['_SHADOWLINKTASKSTATUS']._serialized_start = 4996 - _globals['_SHADOWLINKTASKSTATUS']._serialized_end = 5117 - _globals['_SHADOWTOPICSTATUS']._serialized_start = 5120 - _globals['_SHADOWTOPICSTATUS']._serialized_end = 5278 - _globals['_TOPICPARTITIONINFORMATION']._serialized_start = 5281 - _globals['_TOPICPARTITIONINFORMATION']._serialized_end = 5487 - _globals['_SHADOWLINKSERVICE']._serialized_start = 6378 - _globals['_SHADOWLINKSERVICE']._serialized_end = 7371 \ No newline at end of file + _globals['_SHADOWLINKCLIENTOPTIONS']._serialized_end = 2974 + _globals['_TOPICMETADATASYNCOPTIONS']._serialized_start = 2977 + _globals['_TOPICMETADATASYNCOPTIONS']._serialized_end = 3191 + _globals['_CONSUMEROFFSETSYNCOPTIONS']._serialized_start = 3194 + _globals['_CONSUMEROFFSETSYNCOPTIONS']._serialized_end = 3342 + _globals['_SECURITYSETTINGSSYNCOPTIONS']._serialized_start = 3345 + _globals['_SECURITYSETTINGSSYNCOPTIONS']._serialized_end = 3492 + _globals['_TLSSETTINGS']._serialized_start = 3495 + _globals['_TLSSETTINGS']._serialized_end = 3679 + _globals['_AUTHENTICATIONCONFIGURATION']._serialized_start = 3681 + _globals['_AUTHENTICATIONCONFIGURATION']._serialized_end = 3796 + _globals['_TLSFILESETTINGS']._serialized_start = 3798 + _globals['_TLSFILESETTINGS']._serialized_end = 3869 + _globals['_TLSPEMSETTINGS']._serialized_start = 3871 + _globals['_TLSPEMSETTINGS']._serialized_end = 3961 + _globals['_SCRAMCONFIG']._serialized_start = 3964 + _globals['_SCRAMCONFIG']._serialized_end = 4168 + _globals['_NAMEFILTER']._serialized_start = 4171 + _globals['_NAMEFILTER']._serialized_end = 4313 + _globals['_ACLFILTER']._serialized_start = 4316 + _globals['_ACLFILTER']._serialized_end = 4459 + _globals['_ACLRESOURCEFILTER']._serialized_start = 4462 + _globals['_ACLRESOURCEFILTER']._serialized_end = 4609 + _globals['_ACLACCESSFILTER']._serialized_start = 4612 + _globals['_ACLACCESSFILTER']._serialized_end = 4783 + _globals['_SHADOWLINKSTATUS']._serialized_start = 4786 + _globals['_SHADOWLINKSTATUS']._serialized_end = 5029 + _globals['_SHADOWLINKTASKSTATUS']._serialized_start = 5031 + _globals['_SHADOWLINKTASKSTATUS']._serialized_end = 5152 + _globals['_SHADOWTOPICSTATUS']._serialized_start = 5155 + _globals['_SHADOWTOPICSTATUS']._serialized_end = 5313 + _globals['_TOPICPARTITIONINFORMATION']._serialized_start = 5316 + _globals['_TOPICPARTITIONINFORMATION']._serialized_end = 5522 + _globals['_SHADOWLINKSERVICE']._serialized_start = 6413 + _globals['_SHADOWLINKSERVICE']._serialized_end = 7406 \ No newline at end of file diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.pyi b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.pyi index 9be07ea78e414..e62e7c4879d09 100644 --- a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.pyi +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.pyi @@ -600,6 +600,7 @@ class ShadowLinkClientOptions(google.protobuf.message.Message): FETCH_WAIT_MAX_MS_FIELD_NUMBER: builtins.int FETCH_MIN_BYTES_FIELD_NUMBER: builtins.int FETCH_MAX_BYTES_FIELD_NUMBER: builtins.int + FETCH_PARTITION_MAX_BYTES_FIELD_NUMBER: builtins.int client_id: builtins.str 'The Client ID for the Kafka RPC requests setn by this cluster to the\n source cluster\n ' source_cluster_id: builtins.str @@ -611,11 +612,13 @@ class ShadowLinkClientOptions(google.protobuf.message.Message): retry_backoff_ms: builtins.int 'Retry base backoff\n If 0 is provided, defaults to 100ms\n ' fetch_wait_max_ms: builtins.int - 'Fetch request timeout\n If 0 is provided, defaults to 100ms\n ' + 'Fetch request timeout\n If 0 is provided, defaults to 500ms\n ' fetch_min_bytes: builtins.int - 'Fetch min bytes\n If 0 is provided, defaults to 1 byte\n ' + 'Fetch min bytes\n If 0 is provided, defaults to 5 MiB\n ' fetch_max_bytes: builtins.int - 'Fetch max bytes\n If 0 is provided, defaults to 1MiB\n ' + 'Fetch max bytes\n If 0 is provided, defaults to 20 MiB\n ' + fetch_partition_max_bytes: builtins.int + 'Fetch partition max bytes\n If 0 is provided, defaults to 1 MiB\n ' @property def bootstrap_servers(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: @@ -629,13 +632,13 @@ class ShadowLinkClientOptions(google.protobuf.message.Message): def authentication_configuration(self) -> global___AuthenticationConfiguration: """Authentication settings""" - def __init__(self, *, bootstrap_servers: collections.abc.Iterable[builtins.str] | None=..., client_id: builtins.str=..., source_cluster_id: builtins.str=..., tls_settings: global___TLSSettings | None=..., authentication_configuration: global___AuthenticationConfiguration | None=..., metadata_max_age_ms: builtins.int=..., connection_timeout_ms: builtins.int=..., retry_backoff_ms: builtins.int=..., fetch_wait_max_ms: builtins.int=..., fetch_min_bytes: builtins.int=..., fetch_max_bytes: builtins.int=...) -> None: + def __init__(self, *, bootstrap_servers: collections.abc.Iterable[builtins.str] | None=..., client_id: builtins.str=..., source_cluster_id: builtins.str=..., tls_settings: global___TLSSettings | None=..., authentication_configuration: global___AuthenticationConfiguration | None=..., metadata_max_age_ms: builtins.int=..., connection_timeout_ms: builtins.int=..., retry_backoff_ms: builtins.int=..., fetch_wait_max_ms: builtins.int=..., fetch_min_bytes: builtins.int=..., fetch_max_bytes: builtins.int=..., fetch_partition_max_bytes: builtins.int=...) -> None: ... def HasField(self, field_name: typing.Literal['_authentication_configuration', b'_authentication_configuration', '_tls_settings', b'_tls_settings', 'authentication_configuration', b'authentication_configuration', 'tls_settings', b'tls_settings']) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal['_authentication_configuration', b'_authentication_configuration', '_tls_settings', b'_tls_settings', 'authentication_configuration', b'authentication_configuration', 'bootstrap_servers', b'bootstrap_servers', 'client_id', b'client_id', 'connection_timeout_ms', b'connection_timeout_ms', 'fetch_max_bytes', b'fetch_max_bytes', 'fetch_min_bytes', b'fetch_min_bytes', 'fetch_wait_max_ms', b'fetch_wait_max_ms', 'metadata_max_age_ms', b'metadata_max_age_ms', 'retry_backoff_ms', b'retry_backoff_ms', 'source_cluster_id', b'source_cluster_id', 'tls_settings', b'tls_settings']) -> None: + def ClearField(self, field_name: typing.Literal['_authentication_configuration', b'_authentication_configuration', '_tls_settings', b'_tls_settings', 'authentication_configuration', b'authentication_configuration', 'bootstrap_servers', b'bootstrap_servers', 'client_id', b'client_id', 'connection_timeout_ms', b'connection_timeout_ms', 'fetch_max_bytes', b'fetch_max_bytes', 'fetch_min_bytes', b'fetch_min_bytes', 'fetch_partition_max_bytes', b'fetch_partition_max_bytes', 'fetch_wait_max_ms', b'fetch_wait_max_ms', 'metadata_max_age_ms', b'metadata_max_age_ms', 'retry_backoff_ms', b'retry_backoff_ms', 'source_cluster_id', b'source_cluster_id', 'tls_settings', b'tls_settings']) -> None: ... @typing.overload From 6de986b59c4dcba17ac25bcd7f1502141a41ece9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Tue, 23 Sep 2025 13:49:04 +0200 Subject: [PATCH 03/10] cl/service: pass all relevant configurations to client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cleaned up creation of consumer configuration. Now all the relevant parameters can be configured through APIs. Signed-off-by: Michał Maślanka --- src/v/cluster_link/service.cc | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/src/v/cluster_link/service.cc b/src/v/cluster_link/service.cc index a6724973f39c9..64f5f626b12c4 100644 --- a/src/v/cluster_link/service.cc +++ b/src/v/cluster_link/service.cc @@ -250,29 +250,39 @@ class default_link_factory : public link_factory { kafka::client::cluster& cluster, kafka::snc_quota_manager& snc_quota_mgr, const model::connection_config& conn_cfg) { - // todo0: make more these configurable at connection level - // todo1: make these dynamic + const auto max_buffered_bytes = 2 * conn_cfg.get_fetch_max_bytes(); kafka::client::direct_consumer::configuration cfg; + const auto max_wait_time = std::chrono::milliseconds( + conn_cfg.get_fetch_wait_max_ms()); + cfg.min_bytes = conn_cfg.get_fetch_min_bytes(); cfg.max_fetch_size = conn_cfg.get_fetch_max_bytes(); - cfg.partition_max_bytes = 512_KiB; - cfg.max_wait_time = 200ms; cfg.isolation_level = ::model::isolation_level::read_committed; - cfg.max_buffered_bytes = 5_MiB; + cfg.max_buffered_bytes = max_buffered_bytes; + // We are not interested in limiting the number of buffered fetches as + // we already set bytes limit cfg.max_buffered_elements = std::numeric_limits::max(); cfg.with_sessions = kafka::client::fetch_sessions_enabled::yes; - static constexpr size_t partition_max_buffered_bytes = 5_MiB; - static constexpr auto fetch_max_wait = 100ms; + + cfg.max_wait_time = max_wait_time; + cfg.partition_max_bytes = conn_cfg.get_fetch_partition_max_bytes(); + auto direct_consumer = std::make_unique( cluster, cfg); - + // Cache up to double the max fetch size in the consumer + // to allow more than one fetch to be buffered per partition. + vlog( + cllog.debug, + "Creating MUX consumer with {} direct consumer configuration", + cfg); return std::make_unique( std::move(client_id), std::move(direct_consumer), snc_quota_mgr, - partition_max_buffered_bytes, - fetch_max_wait); + max_buffered_bytes, + max_wait_time); } + ss::sharded* _partition_manager; ss::sharded* _snc_quota_mgr; }; From d4db7599d626daecbafe92e768c07853a71e253a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Thu, 25 Sep 2025 11:18:11 +0200 Subject: [PATCH 04/10] cl/replication: moved sink and source implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Moved source and sync implementation out of replication module. This way the `replication` sub-module does not have to depend on anything from cluster_link module and have no information about the specific implementation of actual source and sink that are used in cluster linking. Signed-off-by: Michał Maślanka --- src/v/cluster_link/BUILD | 3 +- src/v/cluster_link/replication/BUILD | 25 +- src/v/cluster_link/replication/deps_impl.cc | 229 --------------- src/v/cluster_link/replication/deps_impl.h | 92 ------ src/v/cluster_link/replication/tests/BUILD | 21 -- src/v/cluster_link/service.cc | 273 +++++++++++++++++- src/v/cluster_link/service.h | 12 + src/v/cluster_link/tests/BUILD | 22 ++ .../partition_replicator_fixture_tests.cc | 32 +- 9 files changed, 325 insertions(+), 384 deletions(-) delete mode 100644 src/v/cluster_link/replication/deps_impl.cc delete mode 100644 src/v/cluster_link/replication/deps_impl.h rename src/v/cluster_link/{replication => }/tests/partition_replicator_fixture_tests.cc (85%) diff --git a/src/v/cluster_link/BUILD b/src/v/cluster_link/BUILD index 4de585994a714..23e7448acfb4a 100644 --- a/src/v/cluster_link/BUILD +++ b/src/v/cluster_link/BUILD @@ -205,10 +205,11 @@ redpanda_cc_library( "//src/v/base", "//src/v/cluster", "//src/v/cluster/utils:partition_change_notifier_api", - "//src/v/cluster_link/replication:deps_impl", + "//src/v/cluster_link/replication:deps", "//src/v/cluster_link/replication:mux_remote_consumer", "//src/v/kafka/client/direct_consumer", "//src/v/kafka/server", + "//src/v/kafka/server:write_at_offset_stm", "//src/v/model", "//src/v/raft", "//src/v/rpc", diff --git a/src/v/cluster_link/replication/BUILD b/src/v/cluster_link/replication/BUILD index 07d5ce9710c45..e536c3631faed 100644 --- a/src/v/cluster_link/replication/BUILD +++ b/src/v/cluster_link/replication/BUILD @@ -41,7 +41,10 @@ redpanda_cc_library( hdrs = [ "partition_replicator.h", ], - visibility = ["__subpackages__"], + visibility = [ + "__subpackages__", + "//src/v/cluster_link:__subpackages__", + ], deps = [ ":deps", ":types", @@ -118,23 +121,3 @@ redpanda_cc_library( "@seastar", ], ) - -redpanda_cc_library( - name = "deps_impl", - srcs = [ - "deps_impl.cc", - ], - hdrs = [ - "deps_impl.h", - ], - visibility = ["//src/v/cluster_link:__subpackages__"], - deps = [ - ":deps", - ":mux_remote_consumer", - "//src/v/cluster", - "//src/v/cluster_link:logger", - "//src/v/kafka/server:write_at_offset_stm", - "//src/v/ssx:future_util", - "@seastar", - ], -) diff --git a/src/v/cluster_link/replication/deps_impl.cc b/src/v/cluster_link/replication/deps_impl.cc deleted file mode 100644 index d4a3296a7891d..0000000000000 --- a/src/v/cluster_link/replication/deps_impl.cc +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Copyright 2025 Redpanda Data, Inc. - * - * Licensed as a Redpanda Enterprise file under the Redpanda Community - * License (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md - */ - -#include "cluster_link/replication/deps_impl.h" - -#include "cluster/partition.h" -#include "cluster/partition_manager.h" -#include "cluster_link/logger.h" -#include "cluster_link/replication/mux_remote_consumer.h" -#include "kafka/server/write_at_offset_stm.h" -#include "ssx/future-util.h" - -static constexpr auto sync_timeout = 10s; - -namespace cluster_link::replication { - -remote_data_source_factory::remote_data_source_factory( - std::unique_ptr consumer) - : _consumer(std::move(consumer)) {} - -// to keep the unique_ptr fwd declaration happy -remote_data_source_factory::~remote_data_source_factory() = default; - -ss::future<> remote_data_source_factory::start() { return _consumer->start(); } - -ss::future<> remote_data_source_factory::stop() noexcept { - return _consumer->stop(); -} - -std::unique_ptr -remote_data_source_factory::make_source(const ::model::ntp& ntp) { - return std::make_unique(ntp.tp, *_consumer); -} - -ss::future<> remote_partition_source::start(kafka::offset offset) { - vlog(cllog.trace, "[{}] Starting remote partition source", _tp); - auto result = _consumer.add(_tp, offset); - if (!result.has_value()) [[unlikely]] { - // this is usually indicative of a bug in the manager where - // a previous source is not deregistered, bubble it up. - auto err = result.error(); - vlog( - cllog.error, - "[{}] Failed to add remote partition source: {}", - _tp, - err); - return ss::make_exception_future<>(err); - } - return ss::now(); -} - -ss::future<> remote_partition_source::stop() noexcept { - vlog(cllog.trace, "[{}] Stopping remote partition source", _tp); - auto f = _gate.close(); - co_await _consumer.remove(_tp); - co_await std::move(f); -} - -ss::future<> remote_partition_source::reset(kafka::offset offset) { - _gate.check(); - auto result = _consumer.reset(_tp, offset); - if (!result.has_value()) [[unlikely]] { - auto err = result.error(); - vlog( - cllog.error, - "[{}] Failed to reset remote partition source: {}", - _tp, - err); - return ss::make_exception_future<>(err); - } - return ss::now(); -} - -ss::future -remote_partition_source::fetch_next(ss::abort_source& as) { - auto holder = _gate.hold(); - auto result = co_await _consumer.fetch(_tp, as); - if (!result.has_value()) [[unlikely]] { - auto err = result.error(); - vlog( - cllog.error, - "[{}] Failed to fetch from remote partition source: {}", - _tp, - result.error()); - throw std::runtime_error( - fmt::format( - "[{}] Failed to fetch from remote partition source: {}", _tp, err)); - } - auto [batches, units] = std::move(*result); - co_return data_source::data{ - .batches = std::move(batches), .units = std::move(units)}; -} - -std::optional -remote_partition_source::get_offsets() { - auto offsets = _consumer.get_source_offsets(_tp); - if (!offsets.has_value()) { - return std::nullopt; - } - return data_source::source_partition_offsets_report{ - .source_start_offset = offsets->log_start_offset, - .source_hwm = offsets->high_watermark, - .source_lso = offsets->last_stable_offset, - .update_time = offsets->last_offset_update_timestamp, - }; -} - -std::unique_ptr -local_partition_data_sink_factory::make_sink(const ::model::ntp& ntp) { - auto partition = _partition_manager.local().get(ntp); - if (!partition) { - throw std::runtime_error( - fmt::format("Partition not found: {} on this shard", ntp)); - } - return std::make_unique(std::move(partition)); -} - -local_partition_sink::local_partition_sink( - ss::lw_shared_ptr partition) - : _partition(std::move(partition)) - , _stm(_partition->raft()->stm_manager()->get()) { - vassert( - _stm, - "write_at_offset_stm not attached to partition {}", - _partition->ntp()); -} -ss::future<> local_partition_sink::start() { - auto holder = _gate.hold(); - auto sync_offset = co_await _stm->get_expected_last_offset(sync_timeout); - if (sync_offset.has_error()) { - throw std::runtime_error( - fmt::format( - "Failed to sync write_at_offset_stm for partition {}: {}", - _partition->ntp(), - sync_offset.error().message())); - } - vlog( - cllog.trace, - "[{}] Starting local partition sink at offset {}", - _partition->ntp(), - sync_offset.value()); - _last_replicated_offset = sync_offset.value(); -} - -ss::future<> local_partition_sink::stop() noexcept { - vlog(cllog.trace, "[{}] Stopping local partition sink", _partition->ntp()); - co_await _gate.close(); -} - -kafka::offset local_partition_sink::last_replicated_offset() const { - vassert(_last_replicated_offset, "Sink has not been started"); - return _last_replicated_offset.value(); -} - -raft::replicate_stages local_partition_sink::replicate( - chunked_vector<::model::record_batch> batches, - ::model::timeout_clock::duration timeout, - ss::abort_source& as) { - _gate.check(); - vassert(_last_replicated_offset, "Sink has not been started"); - vassert( - !batches.empty(), - "Cannot replicate empty batch vector {}", - _partition->ntp()); - chunked_vector expected_offsets; - expected_offsets.reserve(batches.size()); - for (const auto& batch : batches) { - expected_offsets.push_back(::model::offset_cast(batch.base_offset())); - } - auto new_last_replicated_begin = ::model::offset_cast( - batches.front().base_offset()); - auto new_last_replicated_end = ::model::offset_cast( - batches.back().last_offset()); - vassert( - new_last_replicated_begin > _last_replicated_offset - && new_last_replicated_end > _last_replicated_offset, - "[{}] Replicating offsets must be monotonically increasing last " - "replicated: {}, attempting to replicate: [{}, {}]", - _partition->ntp(), - _last_replicated_offset, - new_last_replicated_begin, - new_last_replicated_end); - vlog( - cllog.trace, - "[{}] Replicating batches in range [{} - {}], last_replicated: {}, " - "new_last_replicated: {}", - _partition->ntp(), - batches.front().header(), - batches.back().header(), - _last_replicated_offset, - new_last_replicated_end); - auto stages = _stm->replicate( - std::move(batches), - std::move(expected_offsets), - _last_replicated_offset, - timeout, - as); - _last_replicated_offset = new_last_replicated_end; - return stages; -} - -void local_partition_sink::notify_replicator_failure(::model::term_id term) { - if (_gate.is_closed()) { - return; - } - // If the replicator failed to start _and_ the partition is still the - // leader in the same term we are effectively stuck without a replicator. - // Here we step down to ensure a new leader comes up and a replicator start - // is triggered again on the new leader. - if (_partition->term() == term) { - ssx::spawn_with_gate(_gate, [this, term] { - return _partition->raft()->step_down( - fmt::format("Unable to start replicator in term: {}", term)); - }); - } -} - -kafka::offset local_partition_sink::high_watermark() const { - _gate.check(); - return ::model::offset_cast(_partition->high_watermark()); -} -} // namespace cluster_link::replication diff --git a/src/v/cluster_link/replication/deps_impl.h b/src/v/cluster_link/replication/deps_impl.h deleted file mode 100644 index a2ad3ee27c167..0000000000000 --- a/src/v/cluster_link/replication/deps_impl.h +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright 2025 Redpanda Data, Inc. - * - * Licensed as a Redpanda Enterprise file under the Redpanda Community - * License (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md - */ - -#include "cluster/fwd.h" -#include "cluster_link/replication/deps.h" - -#include -#include - -namespace kafka { -class write_at_offset_stm; -}; - -namespace cluster_link::replication { - -class mux_remote_consumer; -/* - * Source backed by partition data on a remote cluster. - */ -class remote_partition_source : public data_source { -public: - explicit remote_partition_source( - ::model::topic_partition tp, mux_remote_consumer& consumer) - : _tp(std::move(tp)) - , _consumer(consumer) {} - ss::future<> start(kafka::offset) override; - ss::future<> stop() noexcept override; - ss::future<> reset(kafka::offset) override; - ss::future fetch_next(ss::abort_source&) override; - std::optional get_offsets() override; - -private: - ::model::topic_partition _tp; - mux_remote_consumer& _consumer; - ss::gate _gate; -}; - -class remote_data_source_factory : public data_source_factory { -public: - explicit remote_data_source_factory(std::unique_ptr); - ~remote_data_source_factory() override; - ss::future<> start() override; - ss::future<> stop() noexcept override; - std::unique_ptr make_source(const ::model::ntp&) override; - -private: - std::unique_ptr _consumer; -}; - -/* - * Sink for writing partition data to the partition leader on the local shard. - */ -class local_partition_sink : public data_sink { -public: - explicit local_partition_sink(ss::lw_shared_ptr); - ss::future<> start() override; - ss::future<> stop() noexcept override; - kafka::offset last_replicated_offset() const override; - raft::replicate_stages replicate( - chunked_vector<::model::record_batch> batches, - ::model::timeout_clock::duration timeout, - ss::abort_source& as) override; - void notify_replicator_failure(::model::term_id) override; - kafka::offset high_watermark() const final; - -private: - ss::gate _gate; - ss::lw_shared_ptr _partition; - ss::shared_ptr _stm; - // set in start(); - std::optional _last_replicated_offset; -}; - -class local_partition_data_sink_factory : public data_sink_factory { -public: - explicit local_partition_data_sink_factory( - ss::sharded& pm) - : _partition_manager(pm) {} - std::unique_ptr make_sink(const ::model::ntp&) override; - -private: - ss::sharded& _partition_manager; -}; - -} // namespace cluster_link::replication diff --git a/src/v/cluster_link/replication/tests/BUILD b/src/v/cluster_link/replication/tests/BUILD index 8e0906e91d2b8..2c754a050db5d 100644 --- a/src/v/cluster_link/replication/tests/BUILD +++ b/src/v/cluster_link/replication/tests/BUILD @@ -89,24 +89,3 @@ redpanda_test_cc_library( "@seastar", ], ) - -redpanda_cc_gtest( - name = "partition_replicator_fixture_test", - timeout = "short", - srcs = [ - "partition_replicator_fixture_tests.cc", - ], - cpu = 1, - deps = [ - "//src/v/cluster_link/replication:deps_impl", - "//src/v/cluster_link/replication:mux_remote_consumer", - "//src/v/cluster_link/replication:partition_replicator", - "//src/v/kafka/client/direct_consumer/tests:direct_consumer_fixture", - "//src/v/model", - "//src/v/redpanda/tests:fixture", - "@fmt", - "@googletest//:gtest", - "@seastar", - "@seastar//:testing", - ], -) diff --git a/src/v/cluster_link/service.cc b/src/v/cluster_link/service.cc index 64f5f626b12c4..8281ba62cc719 100644 --- a/src/v/cluster_link/service.cc +++ b/src/v/cluster_link/service.cc @@ -21,7 +21,7 @@ #include "cluster_link/logger.h" #include "cluster_link/manager.h" #include "cluster_link/model/types.h" -#include "cluster_link/replication/deps_impl.h" +#include "cluster_link/replication/deps.h" #include "cluster_link/replication/mux_remote_consumer.h" #include "cluster_link/security_migrator.h" #include "cluster_link/shadow_linking_rpc_service.h" @@ -29,7 +29,7 @@ #include "kafka/client/direct_consumer/direct_consumer.h" #include "kafka/server/group_router.h" #include "kafka/server/snc_quota_manager.h" -#include "ssx/future-util.h" +#include "kafka/server/write_at_offset_stm.h" #include @@ -115,8 +115,6 @@ using kafka::data::rpc::partition_leader_cache; using kafka::data::rpc::partition_manager; using kafka::data::rpc::topic_creator; using kafka::data::rpc::topic_metadata_cache; -using data_src_factory = replication::remote_data_source_factory; -using data_sink_factory = replication::local_partition_data_sink_factory; class link_registry_adapter : public link_registry { public: @@ -212,6 +210,268 @@ class link_registry_adapter : public link_registry { service* _svc; }; +class remote_partition_source : public replication::data_source { +public: + explicit remote_partition_source( + ::model::topic_partition tp, replication::mux_remote_consumer& consumer) + : _tp(std::move(tp)) + , _consumer(consumer) {} + + ss::future<> start(kafka::offset offset) final { + vlog(cllog.trace, "[{}] Starting remote partition source", _tp); + auto result = _consumer.add(_tp, offset); + if (!result.has_value()) [[unlikely]] { + // this is usually indicative of a bug in the manager where + // a previous source is not deregistered, bubble it up. + auto err = result.error(); + vlog( + cllog.error, + "[{}] Failed to add remote partition source: {}", + _tp, + err); + return ss::make_exception_future<>(err); + } + return ss::now(); + } + + ss::future<> stop() noexcept final { + vlog(cllog.trace, "[{}] Stopping remote partition source", _tp); + auto f = _gate.close(); + co_await _consumer.remove(_tp); + co_await std::move(f); + } + + ss::future<> reset(kafka::offset offset) final { + _gate.check(); + auto result = _consumer.reset(_tp, offset); + if (!result.has_value()) [[unlikely]] { + auto err = result.error(); + vlog( + cllog.error, + "[{}] Failed to reset remote partition source: {}", + _tp, + err); + return ss::make_exception_future<>(err); + } + return ss::now(); + } + + ss::future + fetch_next(ss::abort_source& as) final { + auto holder = _gate.hold(); + auto result = co_await _consumer.fetch(_tp, as); + if (!result.has_value()) [[unlikely]] { + auto err = result.error(); + vlog( + cllog.error, + "[{}] Failed to fetch from remote partition source: {}", + _tp, + result.error()); + throw std::runtime_error( + fmt::format( + "[{}] Failed to fetch from remote partition source: {}", + _tp, + err)); + } + auto [batches, units] = std::move(*result); + co_return data_source::data{ + .batches = std::move(batches), .units = std::move(units)}; + } + + std::optional get_offsets() { + auto offsets = _consumer.get_source_offsets(_tp); + if (!offsets.has_value()) { + return std::nullopt; + } + return data_source::source_partition_offsets_report{ + .source_start_offset = offsets->log_start_offset, + .source_hwm = offsets->high_watermark, + .source_lso = offsets->last_stable_offset, + .update_time = offsets->last_offset_update_timestamp, + }; + } + +private: + ::model::topic_partition _tp; + replication::mux_remote_consumer& _consumer; + ss::gate _gate; +}; + +class remote_data_source_factory : public replication::data_source_factory { +public: + explicit remote_data_source_factory( + std::unique_ptr consumer) + : _consumer(std::move(consumer)) {} + + ss::future<> start() final { return _consumer->start(); } + + ss::future<> stop() noexcept final { return _consumer->stop(); } + + std::unique_ptr + make_source(const ::model::ntp& ntp) final { + return make_default_data_source(ntp.tp, *_consumer); + } + +private: + std::unique_ptr _consumer; +}; + +/* + * Sink for writing partition data to the partition leader on the local shard. + */ +class local_partition_sink : public replication::data_sink { +public: + static constexpr auto sync_timeout = 10s; + explicit local_partition_sink( + ss::lw_shared_ptr partition) + : _partition(std::move(partition)) + , _stm(_partition->raft() + ->stm_manager() + ->get()) { + vassert( + _stm, + "write_at_offset_stm not attached to partition {}", + _partition->ntp()); + } + ss::future<> start() final { + auto holder = _gate.hold(); + auto sync_offset = co_await _stm->get_expected_last_offset( + sync_timeout); + if (sync_offset.has_error()) { + throw std::runtime_error( + fmt::format( + "Failed to sync write_at_offset_stm for partition {}: {}", + _partition->ntp(), + sync_offset.error().message())); + } + vlog( + cllog.trace, + "[{}] Starting local partition sink at offset {}", + _partition->ntp(), + sync_offset.value()); + _last_replicated_offset = sync_offset.value(); + } + + ss::future<> stop() noexcept final { + vlog( + cllog.trace, "[{}] Stopping local partition sink", _partition->ntp()); + co_await _gate.close(); + } + + kafka::offset last_replicated_offset() const final { + vassert(_last_replicated_offset, "Sink has not been started"); + return _last_replicated_offset.value(); + } + + raft::replicate_stages replicate( + chunked_vector<::model::record_batch> batches, + ::model::timeout_clock::duration timeout, + ss::abort_source& as) final { + _gate.check(); + vassert(_last_replicated_offset, "Sink has not been started"); + vassert( + !batches.empty(), + "Cannot replicate empty batch vector {}", + _partition->ntp()); + chunked_vector expected_offsets; + expected_offsets.reserve(batches.size()); + for (const auto& batch : batches) { + expected_offsets.push_back( + ::model::offset_cast(batch.base_offset())); + } + auto new_last_replicated_begin = ::model::offset_cast( + batches.front().base_offset()); + auto new_last_replicated_end = ::model::offset_cast( + batches.back().last_offset()); + vassert( + new_last_replicated_begin > _last_replicated_offset + && new_last_replicated_end > _last_replicated_offset, + "[{}] Replicating offsets must be monotonically increasing last " + "replicated: {}, attempting to replicate: [{}, {}]", + _partition->ntp(), + _last_replicated_offset, + new_last_replicated_begin, + new_last_replicated_end); + vlog( + cllog.trace, + "[{}] Replicating batches in range [{} - {}], last_replicated: {}, " + "new_last_replicated: {}", + _partition->ntp(), + batches.front().header(), + batches.back().header(), + _last_replicated_offset, + new_last_replicated_end); + auto stages = _stm->replicate( + std::move(batches), + std::move(expected_offsets), + _last_replicated_offset, + timeout, + as); + _last_replicated_offset = new_last_replicated_end; + return stages; + } + + void notify_replicator_failure(::model::term_id term) final { + if (_gate.is_closed()) { + return; + } + // If the replicator failed to start _and_ the partition is still the + // leader in the same term we are effectively stuck without a + // replicator. Here we step down to ensure a new leader comes up and a + // replicator start is triggered again on the new leader. + if (_partition->term() == term) { + ssx::spawn_with_gate(_gate, [this, term] { + return _partition->raft()->step_down( + fmt::format("Unable to start replicator in term: {}", term)); + }); + } + } + + kafka::offset high_watermark() const final { + _gate.check(); + return ::model::offset_cast(_partition->high_watermark()); + } + +private: + ss::gate _gate; + ss::lw_shared_ptr _partition; + ss::shared_ptr _stm; + // set in start(); + std::optional _last_replicated_offset; +}; + +class local_partition_data_sink_factory + : public replication::data_sink_factory { +public: + explicit local_partition_data_sink_factory( + ss::sharded& pm) + : _partition_manager(pm) {} + + std::unique_ptr + make_sink(const ::model::ntp& ntp) final { + auto partition = _partition_manager.local().get(ntp); + if (!partition) { + throw std::runtime_error( + fmt::format("Partition not found: {} on this shard", ntp)); + } + return make_default_data_sink(std::move(partition)); + } + +private: + ss::sharded& _partition_manager; +}; + +std::unique_ptr make_default_data_source( + const ::model::topic_partition& tp, + replication::mux_remote_consumer& consumer) { + return std::make_unique(tp, consumer); +} + +std::unique_ptr +make_default_data_sink(ss::lw_shared_ptr partition) { + return std::make_unique(std::move(partition)); +} + class default_link_factory : public link_factory { public: explicit default_link_factory( @@ -236,12 +496,13 @@ class default_link_factory : public link_factory { link_reconciler_period, std::move(config), std::move(cluster_connection), - std::make_unique(make_remote_consumer( + std::make_unique(make_remote_consumer( std::move(client_id), *cluster_connection, _snc_quota_mgr->local(), config.connection)), - std::make_unique(*_partition_manager)); + std::make_unique( + *_partition_manager)); } private: diff --git a/src/v/cluster_link/service.h b/src/v/cluster_link/service.h index 032ff119c568a..2e1aae956af3c 100644 --- a/src/v/cluster_link/service.h +++ b/src/v/cluster_link/service.h @@ -29,6 +29,11 @@ #include namespace cluster_link { +namespace replication { +class data_source; +class data_sink; +class mux_remote_consumer; +} // namespace replication /** * @brief API access for cluster link service */ @@ -216,4 +221,11 @@ class service : public ss::peering_sharded_service { ss::abort_source _as; mutex _shadow_link_config_mutex{"shadow_link/config"}; }; + +std::unique_ptr make_default_data_source( + const ::model::topic_partition& tp, + replication::mux_remote_consumer& consumer); + +std::unique_ptr +make_default_data_sink(ss::lw_shared_ptr partition); } // namespace cluster_link diff --git a/src/v/cluster_link/tests/BUILD b/src/v/cluster_link/tests/BUILD index b3e400d700a8f..b7f0fb73518ce 100644 --- a/src/v/cluster_link/tests/BUILD +++ b/src/v/cluster_link/tests/BUILD @@ -164,3 +164,25 @@ redpanda_cc_gtest( "@googletest//:gtest", ], ) + +redpanda_cc_gtest( + name = "partition_replicator_fixture_test", + timeout = "short", + srcs = [ + "partition_replicator_fixture_tests.cc", + ], + cpu = 1, + deps = [ + "//src/v/cluster_link", + "//src/v/cluster_link/replication:deps", + "//src/v/cluster_link/replication:mux_remote_consumer", + "//src/v/cluster_link/replication:partition_replicator", + "//src/v/kafka/client/direct_consumer/tests:direct_consumer_fixture", + "//src/v/model", + "//src/v/redpanda/tests:fixture", + "@fmt", + "@googletest//:gtest", + "@seastar", + "@seastar//:testing", + ], +) diff --git a/src/v/cluster_link/replication/tests/partition_replicator_fixture_tests.cc b/src/v/cluster_link/tests/partition_replicator_fixture_tests.cc similarity index 85% rename from src/v/cluster_link/replication/tests/partition_replicator_fixture_tests.cc rename to src/v/cluster_link/tests/partition_replicator_fixture_tests.cc index 8fd399a72122a..abcb4cf8279fc 100644 --- a/src/v/cluster_link/replication/tests/partition_replicator_fixture_tests.cc +++ b/src/v/cluster_link/tests/partition_replicator_fixture_tests.cc @@ -8,15 +8,15 @@ * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ -#include "cluster_link/replication/deps_impl.h" #include "cluster_link/replication/mux_remote_consumer.h" #include "cluster_link/replication/partition_replicator.h" +#include "cluster_link/service.h" #include "kafka/client/direct_consumer/tests/direct_consumer_fixture.h" namespace { ss::logger logger{"replicator-fixture-test"}; } -using namespace cluster_link::replication; + using BasicConsumerFixture = kafka::client::tests::basic_consumer_fixture; static constexpr std::chrono::milliseconds fetch_max_wait{10}; @@ -28,12 +28,13 @@ class ReplicatorFixture : public BasicConsumerFixture { basic_consumer_fixture::SetUp(); auto consumer = make_consumer(); auto* rp = instance(model::node_id{0}); - _mux_consumer = std::make_unique( - client_id, - make_consumer(), - rp->app.snc_quota_mgr.local(), - partition_max_buffered, - fetch_max_wait); + _mux_consumer + = std::make_unique( + client_id, + make_consumer(), + rp->app.snc_quota_mgr.local(), + partition_max_buffered, + fetch_max_wait); _mux_consumer->start().get(); // create source and target topics; create_topic(model::topic_namespace_view{_source}).get(); @@ -42,11 +43,12 @@ class ReplicatorFixture : public BasicConsumerFixture { auto [_, partition] = get_leader(_target); vassert(partition, "no partition for {}", _target); - auto source = std::make_unique( + auto source = cluster_link::make_default_data_source( _source.tp, *_mux_consumer); - auto sink = std::make_unique(partition); - _replicator = std::make_unique( - _source, model::term_id{0}, std::move(source), std::move(sink)); + auto sink = cluster_link::make_default_data_sink(partition); + _replicator + = std::make_unique( + _source, model::term_id{0}, std::move(source), std::move(sink)); _replicator->start().get(); } @@ -78,8 +80,10 @@ class ReplicatorFixture : public BasicConsumerFixture { protected: const ss::sstring client_id = "replicator_fixture_test"; - std::unique_ptr _mux_consumer; - std::unique_ptr _replicator; + std::unique_ptr + _mux_consumer; + std::unique_ptr + _replicator; model::ntp _source{model::kafka_namespace, "source", 0}; model::ntp _target{model::kafka_namespace, "target", 0}; }; From 73bf0c78547f3fa73b17a4e06205949ec2a2dc9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Thu, 25 Sep 2025 11:37:33 +0200 Subject: [PATCH 05/10] cl/manager: added ability to be notified when link configuration changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Some components may have to react when link configuration changes to update its properties. Added a notification mechanism that allows registering a callback to be notified about link configuration changes. Signed-off-by: Michał Maślanka --- src/v/cluster_link/manager.cc | 10 ++++++++++ src/v/cluster_link/manager.h | 9 +++++++++ 2 files changed, 19 insertions(+) diff --git a/src/v/cluster_link/manager.cc b/src/v/cluster_link/manager.cc index da0c905ae0845..533a05f8e737e 100644 --- a/src/v/cluster_link/manager.cc +++ b/src/v/cluster_link/manager.cc @@ -471,6 +471,7 @@ manager::handle_on_link_change(model::id_t id, ::model::revision_id revision) { id, link_metadata); it->second->update_config(link_metadata.copy(), revision); + _cfg_change_notifications.notify(id, link_metadata); } else { // Create a new link vlog( @@ -782,6 +783,15 @@ ss::future<> manager::on_controller_stepdown() { } } +manager::notification_id manager::register_link_config_changes_callback( + link_cfg_change_notification_cb cb) { + return _cfg_change_notifications.register_cb(std::move(cb)); +} + +void manager::unregister_link_config_changes_callback(notification_id id) { + _cfg_change_notifications.unregister_cb(id); +} + consumer_groups_router& manager::get_group_router() noexcept { return *_group_router; } diff --git a/src/v/cluster_link/manager.h b/src/v/cluster_link/manager.h index ee3ccdea9ae74..7f635f5bc9c4d 100644 --- a/src/v/cluster_link/manager.h +++ b/src/v/cluster_link/manager.h @@ -36,6 +36,9 @@ namespace cluster_link { */ class manager { public: + using notification_id = named_type; + using link_cfg_change_notification_cb + = ss::noncopyable_function; manager( ::model::node_id self, std::unique_ptr @@ -179,6 +182,10 @@ class manager { return _scheduling_group; } + notification_id + register_link_config_changes_callback(link_cfg_change_notification_cb cb); + void unregister_link_config_changes_callback(notification_id cb); + std::unique_ptr& registry() noexcept { return _registry; } cl_result< @@ -215,6 +222,8 @@ class manager { chunked_vector> _task_factories; absl::flat_hash_map> _links; + notification_list + _cfg_change_notifications; ss::lowres_clock::duration _task_reconciler_interval; mutex _link_task_reconciler_mutex{ From d658018336f2ad7e71e43939c09f6b8df7f13c19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Thu, 25 Sep 2025 13:47:22 +0200 Subject: [PATCH 06/10] cl/mux: make mux consumer responsible for direct_consumer lifecycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously direct consumer instance was injected into mux_consumer. This made the mux consumer unaware of direct consumer configuration. Changed the way how mux consumer is initialized. Delegated responsibility of instantiating direct consumer to `mux_consumer` internals. This way an ownership is clear and we can encapsulate configuration changes. Signed-off-by: Michał Maślanka --- src/v/cluster_link/replication/BUILD | 1 + .../replication/mux_remote_consumer.cc | 62 +++++++++------ .../replication/mux_remote_consumer.h | 19 +++-- .../tests/mux_consumer_fixture_tests.cc | 16 ++-- src/v/cluster_link/service.cc | 78 +++++++++---------- .../partition_replicator_fixture_tests.cc | 14 +++- 6 files changed, 108 insertions(+), 82 deletions(-) diff --git a/src/v/cluster_link/replication/BUILD b/src/v/cluster_link/replication/BUILD index e536c3631faed..c1c459419bb5f 100644 --- a/src/v/cluster_link/replication/BUILD +++ b/src/v/cluster_link/replication/BUILD @@ -112,6 +112,7 @@ redpanda_cc_library( deps = [ ":partition_data_queue", ":types", + "//src/v/base", "//src/v/cluster_link:logger", "//src/v/config", "//src/v/container:chunked_hash_map", diff --git a/src/v/cluster_link/replication/mux_remote_consumer.cc b/src/v/cluster_link/replication/mux_remote_consumer.cc index b1fefc8ab5d41..31c12f0ac9e83 100644 --- a/src/v/cluster_link/replication/mux_remote_consumer.cc +++ b/src/v/cluster_link/replication/mux_remote_consumer.cc @@ -17,16 +17,16 @@ namespace cluster_link::replication { mux_remote_consumer::mux_remote_consumer( - ss::sstring client_id, - std::unique_ptr consumer, + kafka::client::cluster& cluster, kafka::snc_quota_manager& snc_quota_mgr, - size_t partition_max_buffered, - std::chrono::milliseconds fetch_max_wait) - : _client_id(std::move(client_id)) - , _consumer(std::move(consumer)) + mux_remote_consumer::configuration consumer_configuration) + : _client_id(std::move(consumer_configuration.client_id)) + , _consumer( + std::make_unique( + cluster, consumer_configuration.direct_consumer_configuration)) , _snc_quota_mgr(snc_quota_mgr) - , _partition_max_buffered(partition_max_buffered) - , _fetch_max_wait(fetch_max_wait) + , _partition_max_buffered(consumer_configuration.partition_max_buffered) + , _fetch_max_wait(consumer_configuration.fetch_max_wait) , _kafka_tput_controlled_api_keys( config::shard_local_cfg().kafka_throughput_controlled_api_keys.bind()) , _produce_throttling_enabled(should_throttle_produce()) { @@ -146,9 +146,9 @@ ss::future<> mux_remote_consumer::assign_pending_partitions() { vassert(it != _partitions.end(), "Partition {} not found", tp); const auto& queue = it->second; if (queue->full()) { - // The queue is still full which indicates it was not drained, - // so we do not assign it in this round. This usually indicates - // there is an issue with the data sink. + // The queue is still full which indicates it was not + // drained, so we do not assign it in this round. This + // usually indicates there is an issue with the data sink. vlog( cllog.debug, "[{}] Skipping assignment due to full queue", @@ -196,8 +196,9 @@ bool mux_remote_consumer::can_ignore_partition_data( const model::topic_partition& tp) { auto partition_inactive = _partitions.find(tp) == _partitions.end(); auto it = _pending_assignment.find(tp.topic); - // If a partition is in pending_assignment, it got reseeked to a new offset - // ignore data for now and reassign to the new offset in the next iteration. + // If a partition is in pending_assignment, it got reseeked to a new + // offset ignore data for now and reassign to the new offset in the next + // iteration. auto partition_got_reset = (it != _pending_assignment.end()) && it->second.contains(tp.partition); return partition_inactive || partition_got_reset; @@ -237,20 +238,20 @@ ss::future<> mux_remote_consumer::process_fetched_data( } auto it = _partitions.find(tp); vassert(it != _partitions.end(), "Partition {} not found", tp); - // Note: enqueue _always_ succeeds and may oversubscribe memory by - // a tiny amount since the consumed batches (per partition) are - // usually not that large and it is very unlikely that the sink is - // not able to catch up. This keeps the design simple. Here we try - // to be good citizen and not overwhelm the queue further and - // unassign the partition temporarily if the queue is full and then - // retry at a later time. + // Note: enqueue _always_ succeeds and may oversubscribe memory + // by a tiny amount since the consumed batches (per partition) + // are usually not that large and it is very unlikely that the + // sink is not able to catch up. This keeps the design simple. + // Here we try to be good citizen and not overwhelm the queue + // further and unassign the partition temporarily if the queue + // is full and then retry at a later time. auto can_enqueue_more = it->second->enqueue( std::move(partition.data)); if (!can_enqueue_more) { - // the queue is full, this is usually a rare case indicating the - // sink is not able to catchup with the rate of incoming data. - // We put it to the pending_assignments map and retry in the - // next pass once the queue has some space. + // the queue is full, this is usually a rare case indicating + // the sink is not able to catchup with the rate of incoming + // data. We put it to the pending_assignments map and retry + // in the next pass once the queue has some space. to_unassign.push_back(tp); } } @@ -282,6 +283,19 @@ ss::future<> mux_remote_consumer::fetch_loop() { co_await process_fetched_data(std::move(fetch_result.value())); } } + +fmt::iterator +mux_remote_consumer::configuration::format_to(fmt::iterator it) const { + return fmt::format_to( + it, + "{{ client_id: {}, direct_consumer_configuration: {}, " + "partition_max_buffered: {}, " + "fetch_max_wait: {}ms }}", + client_id, + direct_consumer_configuration, + partition_max_buffered, + fetch_max_wait.count()); +} } // namespace cluster_link::replication auto fmt::formatter:: diff --git a/src/v/cluster_link/replication/mux_remote_consumer.h b/src/v/cluster_link/replication/mux_remote_consumer.h index 733bbf563512e..b7264543ead23 100644 --- a/src/v/cluster_link/replication/mux_remote_consumer.h +++ b/src/v/cluster_link/replication/mux_remote_consumer.h @@ -10,6 +10,7 @@ #pragma once +#include "base/format_to.h" #include "cluster_link/replication/partition_data_queue.h" #include "container/chunked_hash_map.h" #include "kafka/client/direct_consumer/api_types.h" @@ -39,18 +40,26 @@ namespace cluster_link::replication { */ class mux_remote_consumer { public: + struct configuration { + ss::sstring client_id; + kafka::client::direct_consumer::configuration + direct_consumer_configuration; + size_t partition_max_buffered; + std::chrono::milliseconds fetch_max_wait; + + fmt::iterator format_to(fmt::iterator it) const; + }; + enum class errc : int8_t { partition_not_found = 1, partition_already_exists = 2, }; using result = std::expected; - explicit mux_remote_consumer( - ss::sstring client_id, - std::unique_ptr consumer, + mux_remote_consumer( + kafka::client::cluster& cluster, kafka::snc_quota_manager& snc_quota_mgr, - size_t partition_max_buffered, - std::chrono::milliseconds fetch_max_wait); + configuration consumer_configuration); ss::future<> start(); ss::future<> stop() noexcept; diff --git a/src/v/cluster_link/replication/tests/mux_consumer_fixture_tests.cc b/src/v/cluster_link/replication/tests/mux_consumer_fixture_tests.cc index b8add5aab60bb..9e5e038b9def0 100644 --- a/src/v/cluster_link/replication/tests/mux_consumer_fixture_tests.cc +++ b/src/v/cluster_link/replication/tests/mux_consumer_fixture_tests.cc @@ -27,16 +27,20 @@ class MuxConsumerFixture : public BasicConsumerFixture { public: void SetUp() override { basic_consumer_fixture::SetUp(); - auto consumer = make_consumer(); - _raw_consumer = consumer.get(); auto* rp = instance(model::node_id{0}); + _mux_consumer = std::make_unique( - client_id, - std::move(consumer), + *cluster, rp->app.snc_quota_mgr.local(), - partition_max_buffered, - fetch_max_wait); + mux_remote_consumer::configuration{ + .client_id = client_id, + .direct_consumer_configuration = direct_consumer:: + configuration{.with_sessions = fetch_sessions_enabled{GetParam() == kafka::client::tests::session_config::with_sessions}}, + .partition_max_buffered = partition_max_buffered, + .fetch_max_wait = fetch_max_wait, + }); _mux_consumer->start().get(); + _raw_consumer = _mux_consumer->_consumer.get(); } void TearDown() override { if (_mux_consumer) { diff --git a/src/v/cluster_link/service.cc b/src/v/cluster_link/service.cc index 8281ba62cc719..021e8a1d8c93b 100644 --- a/src/v/cluster_link/service.cc +++ b/src/v/cluster_link/service.cc @@ -209,6 +209,36 @@ class link_registry_adapter : public link_registry { frontend* _plf; service* _svc; }; +namespace { +replication::mux_remote_consumer::configuration +make_remote_consumer_configuration(const model::connection_config& conn_cfg) { + const size_t max_buffered_bytes = 2 * conn_cfg.get_fetch_max_bytes(); + kafka::client::direct_consumer::configuration dc_configuration; + const auto max_wait_time = std::chrono::milliseconds( + conn_cfg.get_fetch_wait_max_ms()); + + dc_configuration.min_bytes = conn_cfg.get_fetch_min_bytes(); + dc_configuration.max_fetch_size = conn_cfg.get_fetch_max_bytes(); + dc_configuration.isolation_level = ::model::isolation_level::read_committed; + dc_configuration.max_buffered_bytes = max_buffered_bytes; + // We are not interested in limiting the number of buffered fetches as + // we already set bytes limit + dc_configuration.max_buffered_elements = std::numeric_limits::max(); + dc_configuration.with_sessions = kafka::client::fetch_sessions_enabled::yes; + + dc_configuration.max_wait_time = max_wait_time; + dc_configuration.partition_max_bytes + = conn_cfg.get_fetch_partition_max_bytes(); + + return replication::mux_remote_consumer::configuration{ + .client_id = conn_cfg.client_id, + .direct_consumer_configuration = dc_configuration, + .partition_max_buffered = max_buffered_bytes, + .fetch_max_wait = max_wait_time, + }; +} + +} // namespace class remote_partition_source : public replication::data_source { public: @@ -496,54 +526,16 @@ class default_link_factory : public link_factory { link_reconciler_period, std::move(config), std::move(cluster_connection), - std::make_unique(make_remote_consumer( - std::move(client_id), - *cluster_connection, - _snc_quota_mgr->local(), - config.connection)), + std::make_unique( + std::make_unique( + *cluster_connection, + _snc_quota_mgr->local(), + make_remote_consumer_configuration(config.connection))), std::make_unique( *_partition_manager)); } private: - std::unique_ptr make_remote_consumer( - ss::sstring client_id, - kafka::client::cluster& cluster, - kafka::snc_quota_manager& snc_quota_mgr, - const model::connection_config& conn_cfg) { - const auto max_buffered_bytes = 2 * conn_cfg.get_fetch_max_bytes(); - kafka::client::direct_consumer::configuration cfg; - const auto max_wait_time = std::chrono::milliseconds( - conn_cfg.get_fetch_wait_max_ms()); - - cfg.min_bytes = conn_cfg.get_fetch_min_bytes(); - cfg.max_fetch_size = conn_cfg.get_fetch_max_bytes(); - cfg.isolation_level = ::model::isolation_level::read_committed; - cfg.max_buffered_bytes = max_buffered_bytes; - // We are not interested in limiting the number of buffered fetches as - // we already set bytes limit - cfg.max_buffered_elements = std::numeric_limits::max(); - cfg.with_sessions = kafka::client::fetch_sessions_enabled::yes; - - cfg.max_wait_time = max_wait_time; - cfg.partition_max_bytes = conn_cfg.get_fetch_partition_max_bytes(); - - auto direct_consumer = std::make_unique( - cluster, cfg); - // Cache up to double the max fetch size in the consumer - // to allow more than one fetch to be buffered per partition. - vlog( - cllog.debug, - "Creating MUX consumer with {} direct consumer configuration", - cfg); - return std::make_unique( - std::move(client_id), - std::move(direct_consumer), - snc_quota_mgr, - max_buffered_bytes, - max_wait_time); - } - ss::sharded* _partition_manager; ss::sharded* _snc_quota_mgr; }; diff --git a/src/v/cluster_link/tests/partition_replicator_fixture_tests.cc b/src/v/cluster_link/tests/partition_replicator_fixture_tests.cc index abcb4cf8279fc..d63653ac8ed7c 100644 --- a/src/v/cluster_link/tests/partition_replicator_fixture_tests.cc +++ b/src/v/cluster_link/tests/partition_replicator_fixture_tests.cc @@ -28,13 +28,19 @@ class ReplicatorFixture : public BasicConsumerFixture { basic_consumer_fixture::SetUp(); auto consumer = make_consumer(); auto* rp = instance(model::node_id{0}); + direct_consumer::configuration dc_cfg{ + .with_sessions = fetch_sessions_enabled{ + GetParam() == kafka::client::tests::session_config::with_sessions}}; + _mux_consumer = std::make_unique( - client_id, - make_consumer(), + *cluster, rp->app.snc_quota_mgr.local(), - partition_max_buffered, - fetch_max_wait); + cluster_link::replication::mux_remote_consumer::configuration{ + .client_id = client_id, + .direct_consumer_configuration = dc_cfg, + .partition_max_buffered = partition_max_buffered, + .fetch_max_wait = fetch_max_wait}); _mux_consumer->start().get(); // create source and target topics; create_topic(model::topic_namespace_view{_source}).get(); From 011934d85c3c193c55685dd04c5933477f101e71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Thu, 25 Sep 2025 13:59:50 +0200 Subject: [PATCH 07/10] cl/mux_consumer: support updating consumer configuration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- .../replication/mux_remote_consumer.cc | 10 ++++++++++ .../replication/mux_remote_consumer.h | 6 ++++++ .../replication/partition_data_queue.cc | 20 ++++++++++++++++++- .../replication/partition_data_queue.h | 3 +++ 4 files changed, 38 insertions(+), 1 deletion(-) diff --git a/src/v/cluster_link/replication/mux_remote_consumer.cc b/src/v/cluster_link/replication/mux_remote_consumer.cc index 31c12f0ac9e83..efa2e8169c2a6 100644 --- a/src/v/cluster_link/replication/mux_remote_consumer.cc +++ b/src/v/cluster_link/replication/mux_remote_consumer.cc @@ -284,6 +284,16 @@ ss::future<> mux_remote_consumer::fetch_loop() { } } +void mux_remote_consumer::update_configuration(const configuration& cfg) { + vlog(cllog.info, "Updating mux consumer configuration: {}", cfg); + _consumer->update_configuration(cfg.direct_consumer_configuration); + _partition_max_buffered = cfg.partition_max_buffered; + _fetch_max_wait = cfg.fetch_max_wait; + for (auto& [tp, queue] : _partitions) { + queue->update_max_buffered(_partition_max_buffered); + } +} + fmt::iterator mux_remote_consumer::configuration::format_to(fmt::iterator it) const { return fmt::format_to( diff --git a/src/v/cluster_link/replication/mux_remote_consumer.h b/src/v/cluster_link/replication/mux_remote_consumer.h index b7264543ead23..1e164c4ccfdce 100644 --- a/src/v/cluster_link/replication/mux_remote_consumer.h +++ b/src/v/cluster_link/replication/mux_remote_consumer.h @@ -83,6 +83,12 @@ class mux_remote_consumer { */ ss::future> fetch(const ::model::topic_partition&, ss::abort_source&); + /** + * Update the configuration of the consumer. + * + * The changes in the configuration will be applied for subsequent fetches. + */ + void update_configuration(const configuration& cfg); /** * @brief Get the source offsets object diff --git a/src/v/cluster_link/replication/partition_data_queue.cc b/src/v/cluster_link/replication/partition_data_queue.cc index ffc8d2878aaea..cc1fc21e0f606 100644 --- a/src/v/cluster_link/replication/partition_data_queue.cc +++ b/src/v/cluster_link/replication/partition_data_queue.cc @@ -15,8 +15,26 @@ namespace cluster_link::replication { partition_data_queue::partition_data_queue(size_t max_buffered_bytes) - : _sem(max_buffered_bytes, "partition_data_queue") {} + : _max_buffered_bytes(max_buffered_bytes) + , _sem(max_buffered_bytes, "partition_data_queue") {} +void partition_data_queue::update_max_buffered(size_t new_value) { + // ignore update if gate is closed, the queue is stopping + if (_gate.is_closed()) { + return; + } + + if (new_value == _max_buffered_bytes) { + return; + } + if (new_value > _max_buffered_bytes) { + _sem.signal(new_value - _max_buffered_bytes); + + } else { + _sem.consume(_max_buffered_bytes - new_value); + } + _max_buffered_bytes = new_value; +} void partition_data_queue::reset(kafka::offset next) { _gate.check(); do_reset(next); diff --git a/src/v/cluster_link/replication/partition_data_queue.h b/src/v/cluster_link/replication/partition_data_queue.h index 724f2c25a1880..ef8ddf63d1696 100644 --- a/src/v/cluster_link/replication/partition_data_queue.h +++ b/src/v/cluster_link/replication/partition_data_queue.h @@ -56,11 +56,14 @@ class partition_data_queue { bool empty() const { return _batches.empty(); } bool full() const { return _sem.available_units() <= 0; } + void update_max_buffered(size_t max_buffered_bytes); + private: void do_reset(kafka::offset next); kafka::offset _next{}; void maybe_notify_waiter(); std::optional> _waiter; + size_t _max_buffered_bytes; ssx::semaphore _sem; ssx::semaphore_units _batch_units; chunked_vector<::model::record_batch> _batches; From 605fa9268bab96c830635a7bc7fbb5f55e6df603 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Thu, 25 Sep 2025 14:35:21 +0200 Subject: [PATCH 08/10] cl/service: update consumer configuration when link configuration change MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When link configuration changes we must update configuration of consumer used to fetch data from the source cluster. This commits register manager notification that listens for the link configuration changes and updates the mux consumer configuration. Signed-off-by: Michał Maślanka --- src/v/cluster_link/service.cc | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/src/v/cluster_link/service.cc b/src/v/cluster_link/service.cc index 021e8a1d8c93b..a24e6be72f3f8 100644 --- a/src/v/cluster_link/service.cc +++ b/src/v/cluster_link/service.cc @@ -330,12 +330,30 @@ class remote_partition_source : public replication::data_source { class remote_data_source_factory : public replication::data_source_factory { public: explicit remote_data_source_factory( + model::id_t link_id, + manager* manager, std::unique_ptr consumer) - : _consumer(std::move(consumer)) {} + : _link_id(link_id) + , _manager(manager) + , _consumer(std::move(consumer)) {} - ss::future<> start() final { return _consumer->start(); } + ss::future<> start() final { + _notification_id = _manager->register_link_config_changes_callback( + [this](model::id_t link_id, const model::metadata& md) { + // Ignore updates for other links + if (link_id != _link_id) { + return; + } + _consumer->update_configuration( + make_remote_consumer_configuration(md.connection)); + }); + return _consumer->start(); + } - ss::future<> stop() noexcept final { return _consumer->stop(); } + ss::future<> stop() noexcept final { + _manager->unregister_link_config_changes_callback(_notification_id); + return _consumer->stop(); + } std::unique_ptr make_source(const ::model::ntp& ntp) final { @@ -343,6 +361,9 @@ class remote_data_source_factory : public replication::data_source_factory { } private: + model::id_t _link_id; + manager* _manager; + manager::notification_id _notification_id; std::unique_ptr _consumer; }; @@ -527,6 +548,8 @@ class default_link_factory : public link_factory { std::move(config), std::move(cluster_connection), std::make_unique( + link_id, + manager, std::make_unique( *cluster_connection, _snc_quota_mgr->local(), From 8112578561212feb2ab713c7c68d22a2b4b7e273 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Thu, 25 Sep 2025 15:30:56 +0200 Subject: [PATCH 09/10] cl/tests: include changing client configuration into link update test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michał Maślanka --- tests/rptest/tests/cluster_linking_e2e_test.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/tests/rptest/tests/cluster_linking_e2e_test.py b/tests/rptest/tests/cluster_linking_e2e_test.py index 3feb6bc35f997..1ea9113870657 100644 --- a/tests/rptest/tests/cluster_linking_e2e_test.py +++ b/tests/rptest/tests/cluster_linking_e2e_test.py @@ -392,10 +392,17 @@ def _any_topics_are_present_in_target_cluster(): ), ] ) - + shadow_link.configurations.client_options.fetch_wait_max_ms = 100 + shadow_link.configurations.client_options.fetch_min_bytes = 10 + shadow_link.configurations.client_options.fetch_partition_max_bytes = ( + 500 * 1024 * 1024 + ) update_mask: google.protobuf.field_mask_pb2.FieldMask = google.protobuf.field_mask_pb2.FieldMask( paths=[ - "configurations.topic_metadata_sync_options.auto_create_shadow_topic_filters" + "configurations.topic_metadata_sync_options.auto_create_shadow_topic_filters", + "configurations.client_options.fetch_partition_max_bytes", + "configurations.client_options.fetch_wait_max_ms", + "configurations.client_options.fetch_min_bytes", ] ) @@ -409,6 +416,12 @@ def _any_topics_are_present_in_target_cluster(): ), ( f"Expected updated link to be returned, {updated_link.configurations.topic_metadata_sync_options} != {shadow_link.configurations.topic_metadata_sync_options}" ) + assert ( + updated_link.configurations.client_options + == shadow_link.configurations.client_options + ), ( + f"Expected updated link to be returned, {updated_link.configurations.client_options} != {shadow_link.configurations.client_options}" + ) def _all_but_one_topic_are_present_in_target_cluster(): topics_in_target = {t for t in self.target_cluster_rpk.list_topics()} From ae8fc3a9e58bcabce30292fc0c706e2de8fcc670 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Ma=C5=9Blanka?= Date: Thu, 2 Oct 2025 14:43:04 +0200 Subject: [PATCH 10/10] protos: added dot at the end of first line description MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added dot to separate first line of documentation description. Signed-off-by: Michał Maślanka --- proto/redpanda/core/admin/v2/shadow_link.proto | 14 +++++++------- .../redpanda/core/admin/v2/shadow_link_pb2.pyi | 14 +++++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/proto/redpanda/core/admin/v2/shadow_link.proto b/proto/redpanda/core/admin/v2/shadow_link.proto index 1e2f28aca2882..0878005d9e650 100644 --- a/proto/redpanda/core/admin/v2/shadow_link.proto +++ b/proto/redpanda/core/admin/v2/shadow_link.proto @@ -250,25 +250,25 @@ message ShadowLinkClientOptions { optional TLSSettings tls_settings = 4; // Authentication settings optional AuthenticationConfiguration authentication_configuration = 5; - // Max metadata age + // Max metadata age. // If 0 is provided, defaults to 10 seconds int32 metadata_max_age_ms = 6; - // Connection timeout + // Connection timeout. // If 0 is provided, defaults to 1 second int32 connection_timeout_ms = 7; - // Retry base backoff + // Retry base backoff. // If 0 is provided, defaults to 100ms int32 retry_backoff_ms = 8; - // Fetch request timeout + // Fetch request timeout. // If 0 is provided, defaults to 500ms int32 fetch_wait_max_ms = 9; - // Fetch min bytes + // Fetch min bytes. // If 0 is provided, defaults to 5 MiB int32 fetch_min_bytes = 10; - // Fetch max bytes + // Fetch max bytes. // If 0 is provided, defaults to 20 MiB int32 fetch_max_bytes = 11; - // Fetch partition max bytes + // Fetch partition max bytes. // If 0 is provided, defaults to 1 MiB int32 fetch_partition_max_bytes = 12; } diff --git a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.pyi b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.pyi index e62e7c4879d09..bd5d77106170a 100644 --- a/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.pyi +++ b/tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.pyi @@ -606,19 +606,19 @@ class ShadowLinkClientOptions(google.protobuf.message.Message): source_cluster_id: builtins.str 'If provided, this is the expected ID of the source cluster. If it does\n not match then the connection will be rejected. If provided, this value\n must match the `ClusterId` field returned in the Kafka Metadata response\n message\n ' metadata_max_age_ms: builtins.int - 'Max metadata age\n If 0 is provided, defaults to 10 seconds\n ' + 'Max metadata age.\n If 0 is provided, defaults to 10 seconds\n ' connection_timeout_ms: builtins.int - 'Connection timeout\n If 0 is provided, defaults to 1 second\n ' + 'Connection timeout.\n If 0 is provided, defaults to 1 second\n ' retry_backoff_ms: builtins.int - 'Retry base backoff\n If 0 is provided, defaults to 100ms\n ' + 'Retry base backoff.\n If 0 is provided, defaults to 100ms\n ' fetch_wait_max_ms: builtins.int - 'Fetch request timeout\n If 0 is provided, defaults to 500ms\n ' + 'Fetch request timeout.\n If 0 is provided, defaults to 500ms\n ' fetch_min_bytes: builtins.int - 'Fetch min bytes\n If 0 is provided, defaults to 5 MiB\n ' + 'Fetch min bytes.\n If 0 is provided, defaults to 5 MiB\n ' fetch_max_bytes: builtins.int - 'Fetch max bytes\n If 0 is provided, defaults to 20 MiB\n ' + 'Fetch max bytes.\n If 0 is provided, defaults to 20 MiB\n ' fetch_partition_max_bytes: builtins.int - 'Fetch partition max bytes\n If 0 is provided, defaults to 1 MiB\n ' + 'Fetch partition max bytes.\n If 0 is provided, defaults to 1 MiB\n ' @property def bootstrap_servers(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: