Skip to content

Conversation

mmaslankaprv
Copy link
Member

@mmaslankaprv mmaslankaprv commented Sep 24, 2025

Passing all the client configurations to shadow linking underlying kafka client.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v25.2.x
  • v25.1.x
  • v24.3.x

Release Notes

  • none

@mmaslankaprv mmaslankaprv force-pushed the CORE-13396-pass-client-configuration-to-consumer branch from 25a9f82 to 24d565f Compare September 24, 2025 10:59
@mmaslankaprv mmaslankaprv changed the title Core 13396 pass client configuration to consumer CORE-13396 pass client configuration to consumer Sep 25, 2025
After talking to perf team we decided to adjust default parameters of
consumer used to fetch data from source cluster. The parameters we use
current are following:

```
fetch_max_wait: 500ms
fetch_min_bytes: 5 MiB
fetch_max_bytes: 20 MiB
partition_max_bytes: 1 MiB
```

Signed-off-by: Michał Maślanka <[email protected]>
Added configuration allowing users to set partition max bytes

Signed-off-by: Michał Maślanka <[email protected]>
Previously client configuration did not include max partition bytes.
Added missing value to configure max number of bytes fetched per
partition.

Signed-off-by: Michał Maślanka <[email protected]>
Cleaned up creation of consumer configuration. Now all the relevant
parameters can be configured through APIs.

Signed-off-by: Michał Maślanka <[email protected]>
@mmaslankaprv mmaslankaprv force-pushed the CORE-13396-pass-client-configuration-to-consumer branch from 0caf813 to a2559f6 Compare September 25, 2025 12:54
@mmaslankaprv mmaslankaprv marked this pull request as ready for review September 25, 2025 13:32
@mmaslankaprv mmaslankaprv requested a review from a team as a code owner September 25, 2025 13:32
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR passes client configuration parameters to the shadow linking Kafka consumer by adding the fetch_partition_max_bytes field and updating default values for fetch configuration. The changes enable better tuning of the underlying Kafka client performance.

Key Changes:

  • Added fetch_partition_max_bytes field to shadow link client configuration
  • Updated default values for fetch timeouts and byte limits to more performant values
  • Refactored implementation classes to be consolidated into service.cc

Reviewed Changes

Copilot reviewed 21 out of 21 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
tests/rptest/tests/cluster_linking_e2e_test.py Added test coverage for new fetch_partition_max_bytes configuration
tests/rptest/clients/admin/proto/redpanda/core/admin/v2/shadow_link_pb2.* Generated protobuf files with new field and updated defaults
src/v/redpanda/admin/services/shadow_link/converter.cc Added conversion logic for fetch_partition_max_bytes parameter
src/v/cluster_link/tests/partition_replicator_fixture_tests.cc Updated test to use new API structure
src/v/cluster_link/service.* Consolidated implementation classes and added configuration update support
src/v/cluster_link/replication/* Added configuration update methods and removed separate implementation files
src/v/cluster_link/model/types.h Added fetch_partition_max_bytes field and updated default values
src/v/cluster_link/manager.* Added callback system for configuration change notifications
proto/redpanda/core/admin/v2/shadow_link.proto Added fetch_partition_max_bytes field to protobuf definition

@mmaslankaprv mmaslankaprv force-pushed the CORE-13396-pass-client-configuration-to-consumer branch from 717c1d3 to 4d0faf5 Compare September 25, 2025 14:19
Moved source and sync implementation out of replication module. This way
the `replication` sub-module does not have to depend on anything from
cluster_link module and have no information about the specific
implementation of actual source and sink that are used in cluster
linking.

Signed-off-by: Michał Maślanka <[email protected]>
Some components may have to react when link configuration changes to
update its properties. Added a notification mechanism that allows
registering a callback to be notified about link configuration changes.

Signed-off-by: Michał Maślanka <[email protected]>
@mmaslankaprv mmaslankaprv force-pushed the CORE-13396-pass-client-configuration-to-consumer branch from 4d0faf5 to 66fc2b9 Compare September 25, 2025 14:28
Previously direct consumer instance was injected into mux_consumer.
This made the mux consumer unaware of direct consumer configuration.

Changed the way how mux consumer is initialized. Delegated
responsibility of instantiating direct consumer to `mux_consumer`
internals. This way an ownership is clear and we can encapsulate
configuration changes.

Signed-off-by: Michał Maślanka <[email protected]>
When link configuration changes we must update configuration of consumer
used to fetch data from the source cluster.

This commits register manager notification that listens for the link
configuration changes and updates the mux consumer configuration.

Signed-off-by: Michał Maślanka <[email protected]>
@mmaslankaprv mmaslankaprv force-pushed the CORE-13396-pass-client-configuration-to-consumer branch from 66fc2b9 to 38f3ecb Compare September 25, 2025 14:59
, _sem(max_buffered_bytes, "partition_data_queue") {}

void partition_data_queue::update_max_buffered(size_t new_value) {
_gate.check();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think callbacks shouldn't throw exceptions? return gracefully maybe?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

} else {
_sem.consume(_current_max_buffered - new_value);
}
_current_max_buffered = new_value;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think "current_max_buffered" gives an impression that that much is currently buffered, just call it "_max_buffered_bytes" or something?

@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Sep 25, 2025

Retry command for Build#72959

please wait until all jobs are finished before running the slash command

/ci-repeat 1
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkingReplicationTests.test_replication_basic@{"shuffle_leadership":false,"source_cluster_spec":{"cluster_type":"kafka","kafka_quorum":"COMBINED_KRAFT","kafka_version":"3.8.0"}}
tests/rptest/tests/cluster_linking_e2e_test.py::ShadowLinkingReplicationTests.test_replication_basic@{"shuffle_leadership":true,"source_cluster_spec":{"cluster_type":"kafka","kafka_quorum":"COMBINED_KRAFT","kafka_version":"3.8.0"}}

@vbotbuildovich
Copy link
Collaborator

CI test results

test results on build#72959
test_class test_method test_arguments test_kind job_url test_status passed reason test_history
ShadowLinkConsumeGroupsMirroringTest test_continuous_group_sync {"source_cluster_spec": {"cluster_type": "redpanda"}, "with_failures": true} integration https://buildkite.com/redpanda/redpanda/builds/72959#01998188-abc7-4fc9-b9a9-39fd1fbd2108 FLAKY 19/21 upstream reliability is '93.0'. current run reliability is '90.47619047619048'. drift is 2.52381 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkConsumeGroupsMirroringTest&test_method=test_continuous_group_sync
ShadowLinkingReplicationTests test_replication_basic {"shuffle_leadership": false, "source_cluster_spec": {"cluster_type": "kafka", "kafka_quorum": "COMBINED_KRAFT", "kafka_version": "3.8.0"}} integration https://buildkite.com/redpanda/redpanda/builds/72959#01998188-1757-4301-a062-266ed3ef4632 FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_basic
ShadowLinkingReplicationTests test_replication_basic {"shuffle_leadership": false, "source_cluster_spec": {"cluster_type": "kafka", "kafka_quorum": "COMBINED_KRAFT", "kafka_version": "3.8.0"}} integration https://buildkite.com/redpanda/redpanda/builds/72959#01998188-abc8-43e2-ad40-4bb38cfcc968 FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_basic
ShadowLinkingReplicationTests test_replication_basic {"shuffle_leadership": true, "source_cluster_spec": {"cluster_type": "kafka", "kafka_quorum": "COMBINED_KRAFT", "kafka_version": "3.8.0"}} integration https://buildkite.com/redpanda/redpanda/builds/72959#01998188-1748-44eb-9b22-45c63f27f379 FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_basic
ShadowLinkingReplicationTests test_replication_basic {"shuffle_leadership": true, "source_cluster_spec": {"cluster_type": "kafka", "kafka_quorum": "COMBINED_KRAFT", "kafka_version": "3.8.0"}} integration https://buildkite.com/redpanda/redpanda/builds/72959#01998188-abbf-4594-b63d-387d6a5f02e6 FAIL 0/21 The test has failed across all retries https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_basic
ClusterRateQuotaTest test_client_response_throttle_mechanism null integration https://buildkite.com/redpanda/redpanda/builds/72959#01998188-abc4-46fe-8fc9-ef72ada1d6bb FLAKY 16/21 upstream reliability is '81.81818181818183'. current run reliability is '76.19047619047619'. drift is 5.62771 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ClusterRateQuotaTest&test_method=test_client_response_throttle_mechanism
ClusterRateQuotaTest test_client_response_throttle_mechanism_applies_to_next_request null integration https://buildkite.com/redpanda/redpanda/builds/72959#01998188-1752-4657-9709-af5c573799bd FLAKY 14/21 upstream reliability is '82.39999999999999'. current run reliability is '66.66666666666666'. drift is 15.73333 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ClusterRateQuotaTest&test_method=test_client_response_throttle_mechanism_applies_to_next_request
DisablingPartitionsTest test_disable null integration https://buildkite.com/redpanda/redpanda/builds/72959#01998188-1757-4301-a062-266ed3ef4632 FLAKY 9/21 upstream reliability is '82.55352894528151'. current run reliability is '42.857142857142854'. drift is 39.69639 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=DisablingPartitionsTest&test_method=test_disable
TimeQueryTest test_timequery_with_local_gc null integration https://buildkite.com/redpanda/redpanda/builds/72959#01998188-1756-402b-977d-20913284696c FLAKY 20/21 upstream reliability is '97.13701431492842'. current run reliability is '95.23809523809523'. drift is 1.89892 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=TimeQueryTest&test_method=test_timequery_with_local_gc
TopicRecoveryTest test_prevent_recovery {"cloud_storage_type": 1} integration https://buildkite.com/redpanda/redpanda/builds/72959#01998188-1746-4a93-9863-bf5fac9fdd6e FLAKY 19/21 upstream reliability is '96.2233169129721'. current run reliability is '90.47619047619048'. drift is 5.74713 and the allowed drift is set to 50. The test should PASS https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=TopicRecoveryTest&test_method=test_prevent_recovery

Copy link
Member

@oleiman oleiman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good modulo a couple nits

static constexpr auto retry_backoff_ms_default = 100;
// Maximum fetch wait time
std::optional<int32_t> fetch_wait_max_ms;
// Default value for fetch_wait_max_ms (100ms)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: missed a spot

Suggested change
// Default value for fetch_wait_max_ms (100ms)
// Default value for fetch_wait_max_ms (500ms)

another one below (fetch_max_bytes_default). incidentally, why bother duplicating these value in code comments at all?

Comment on lines +382 to +385
assert (
updated_link.configurations.client_options
== shadow_link.configurations.client_options
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: assert message

, _sem(max_buffered_bytes, "partition_data_queue") {}

void partition_data_queue::update_max_buffered(size_t new_value) {
_gate.check();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants