From 5c47e01466752023f0952f42e6d29cfced430add Mon Sep 17 00:00:00 2001 From: maxliu Date: Tue, 9 Apr 2024 23:09:01 +0800 Subject: [PATCH] [segment replication] decouple the rateLimiter of segrep and recovery (12939) setting "indices.replication.max_bytes_per_sec" takes effect when not negative Signed-off-by: maxliu --- .../common/settings/ClusterSettings.java | 1 - .../indices/recovery/RecoverySettings.java | 31 +++++-------------- .../RecoverySettingsDynamicUpdateTests.java | 21 ++++--------- 3 files changed, 14 insertions(+), 39 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 741236e8c967f..ef42240f80e29 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -291,7 +291,6 @@ public void apply(Settings value, Settings current, Settings previous) { ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER, ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, - RecoverySettings.INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING, RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index 92ab40767f3ef..f2f76ed24ab17 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -65,16 +65,12 @@ public class RecoverySettings { Property.NodeScope ); - public static final Setting INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING = Setting.boolSetting( - "indices.replication.use_individual_rate_limiter", - false, - Property.Dynamic, - Property.NodeScope - ); - + /** + * Individual speed setting for segment replication, default -1B to reuse the setting of recovery. + */ public static final Setting INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting( "indices.replication.max_bytes_per_sec", - new ByteSizeValue(200, ByteSizeUnit.MB), + new ByteSizeValue(-1), Property.Dynamic, Property.NodeScope ); @@ -184,7 +180,6 @@ public class RecoverySettings { public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES); private volatile ByteSizeValue recoveryMaxBytesPerSec; - private volatile boolean useReplicationIndividualRateLimiter; private volatile ByteSizeValue replicationMaxBytesPerSec; private volatile int maxConcurrentFileChunks; private volatile int maxConcurrentOperations; @@ -221,7 +216,6 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { } else { recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac()); } - this.useReplicationIndividualRateLimiter = INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING.get(settings); this.replicationMaxBytesPerSec = INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings); updateReplicationRateLimiter(); @@ -229,10 +223,6 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec); - clusterSettings.addSettingsUpdateConsumer( - INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING, - this::setUseReplicationIndividualRateLimiter - ); clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations); @@ -332,12 +322,7 @@ private void setRecoveryMaxBytesPerSec(ByteSizeValue recoveryMaxBytesPerSec) { } else { recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac()); } - if (useReplicationIndividualRateLimiter == false) updateReplicationRateLimiter(); - } - - private void setUseReplicationIndividualRateLimiter(boolean useReplicationIndividualRateLimiter) { - this.useReplicationIndividualRateLimiter = useReplicationIndividualRateLimiter; - updateReplicationRateLimiter(); + if (replicationMaxBytesPerSec.getBytes() < 0) updateReplicationRateLimiter(); } private void setReplicationMaxBytesPerSec(ByteSizeValue replicationMaxBytesPerSec) { @@ -346,15 +331,15 @@ private void setReplicationMaxBytesPerSec(ByteSizeValue replicationMaxBytesPerSe } private void updateReplicationRateLimiter() { - if (useReplicationIndividualRateLimiter == true) { - if (replicationMaxBytesPerSec.getBytes() <= 0) { + if (replicationMaxBytesPerSec.getBytes() >= 0) { + if (replicationMaxBytesPerSec.getBytes() == 0) { replicationRateLimiter = null; } else if (replicationRateLimiter != null) { replicationRateLimiter.setMBPerSec(replicationMaxBytesPerSec.getMbFrac()); } else { replicationRateLimiter = new SimpleRateLimiter(replicationMaxBytesPerSec.getMbFrac()); } - } else { + } else { // when replicationMaxBytesPerSec = -1B, use setting of recovery if (recoveryMaxBytesPerSec.getBytes() <= 0) { replicationRateLimiter = null; } else if (replicationRateLimiter != null) { diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java index d116f7521f92f..5256c9fe58519 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java @@ -51,34 +51,25 @@ public void testZeroBytesPerSecondIsNoRateLimit() { ); assertEquals(null, recoverySettings.recoveryRateLimiter()); clusterSettings.applySettings( - Settings.builder() - .put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), 0) - .put(RecoverySettings.INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING.getKey(), true) - .build() + Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build() ); assertEquals(null, recoverySettings.replicaitonRateLimiter()); } public void testSetReplicationMaxBytesPerSec() { + assertEquals(40, (int) recoverySettings.replicaitonRateLimiter().getMBPerSec()); clusterSettings.applySettings( Settings.builder() - .put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(60, ByteSizeUnit.MB)) - .put(RecoverySettings.INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING.getKey(), false) + .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(60, ByteSizeUnit.MB)) .build() ); - assertEquals(40, (int) recoverySettings.replicaitonRateLimiter().getMBPerSec()); - clusterSettings.applySettings( - Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build() - ); - assertNull(recoverySettings.recoveryRateLimiter()); - assertNull(recoverySettings.replicaitonRateLimiter()); + assertEquals(60, (int) recoverySettings.replicaitonRateLimiter().getMBPerSec()); clusterSettings.applySettings( Settings.builder() - .put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(60, ByteSizeUnit.MB)) - .put(RecoverySettings.INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING.getKey(), true) + .put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(80, ByteSizeUnit.MB)) .build() ); - assertEquals(60, (int) recoverySettings.replicaitonRateLimiter().getMBPerSec()); + assertEquals(80, (int) recoverySettings.replicaitonRateLimiter().getMBPerSec()); } public void testRetryDelayStateSync() {