Skip to content

Commit

Permalink
[segment replication] decouple the rateLimiter of segrep and recovery…
Browse files Browse the repository at this point in the history
… (12939)

setting "indices.replication.max_bytes_per_sec" takes effect when not negative

Signed-off-by: maxliu <[email protected]>
  • Loading branch information
Ferrari248 committed Apr 9, 2024
1 parent 93fb77e commit 5c47e01
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER,
ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING,
RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,12 @@ public class RecoverySettings {
Property.NodeScope
);

public static final Setting<Boolean> INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING = Setting.boolSetting(
"indices.replication.use_individual_rate_limiter",
false,
Property.Dynamic,
Property.NodeScope
);

/**
* Individual speed setting for segment replication, default -1B to reuse the setting of recovery.
*/
public static final Setting<ByteSizeValue> INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting(
"indices.replication.max_bytes_per_sec",
new ByteSizeValue(200, ByteSizeUnit.MB),
new ByteSizeValue(-1),
Property.Dynamic,
Property.NodeScope
);
Expand Down Expand Up @@ -184,7 +180,6 @@ public class RecoverySettings {
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);

private volatile ByteSizeValue recoveryMaxBytesPerSec;
private volatile boolean useReplicationIndividualRateLimiter;
private volatile ByteSizeValue replicationMaxBytesPerSec;
private volatile int maxConcurrentFileChunks;
private volatile int maxConcurrentOperations;
Expand Down Expand Up @@ -221,18 +216,13 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
} else {
recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
}
this.useReplicationIndividualRateLimiter = INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING.get(settings);
this.replicationMaxBytesPerSec = INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings);
updateReplicationRateLimiter();

logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec);
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(
INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING,
this::setUseReplicationIndividualRateLimiter
);
clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations);
Expand Down Expand Up @@ -332,12 +322,7 @@ private void setRecoveryMaxBytesPerSec(ByteSizeValue recoveryMaxBytesPerSec) {
} else {
recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
}
if (useReplicationIndividualRateLimiter == false) updateReplicationRateLimiter();
}

private void setUseReplicationIndividualRateLimiter(boolean useReplicationIndividualRateLimiter) {
this.useReplicationIndividualRateLimiter = useReplicationIndividualRateLimiter;
updateReplicationRateLimiter();
if (replicationMaxBytesPerSec.getBytes() < 0) updateReplicationRateLimiter();
}

private void setReplicationMaxBytesPerSec(ByteSizeValue replicationMaxBytesPerSec) {
Expand All @@ -346,15 +331,15 @@ private void setReplicationMaxBytesPerSec(ByteSizeValue replicationMaxBytesPerSe
}

private void updateReplicationRateLimiter() {
if (useReplicationIndividualRateLimiter == true) {
if (replicationMaxBytesPerSec.getBytes() <= 0) {
if (replicationMaxBytesPerSec.getBytes() >= 0) {
if (replicationMaxBytesPerSec.getBytes() == 0) {
replicationRateLimiter = null;
} else if (replicationRateLimiter != null) {
replicationRateLimiter.setMBPerSec(replicationMaxBytesPerSec.getMbFrac());
} else {
replicationRateLimiter = new SimpleRateLimiter(replicationMaxBytesPerSec.getMbFrac());
}
} else {
} else { // when replicationMaxBytesPerSec = -1B, use setting of recovery
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
replicationRateLimiter = null;
} else if (replicationRateLimiter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,34 +51,25 @@ public void testZeroBytesPerSecondIsNoRateLimit() {
);
assertEquals(null, recoverySettings.recoveryRateLimiter());
clusterSettings.applySettings(
Settings.builder()
.put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), 0)
.put(RecoverySettings.INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING.getKey(), true)
.build()
Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
);
assertEquals(null, recoverySettings.replicaitonRateLimiter());
}

public void testSetReplicationMaxBytesPerSec() {
assertEquals(40, (int) recoverySettings.replicaitonRateLimiter().getMBPerSec());
clusterSettings.applySettings(
Settings.builder()
.put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(60, ByteSizeUnit.MB))
.put(RecoverySettings.INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING.getKey(), false)
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(60, ByteSizeUnit.MB))
.build()
);
assertEquals(40, (int) recoverySettings.replicaitonRateLimiter().getMBPerSec());
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
);
assertNull(recoverySettings.recoveryRateLimiter());
assertNull(recoverySettings.replicaitonRateLimiter());
assertEquals(60, (int) recoverySettings.replicaitonRateLimiter().getMBPerSec());
clusterSettings.applySettings(
Settings.builder()
.put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(60, ByteSizeUnit.MB))
.put(RecoverySettings.INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING.getKey(), true)
.put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(80, ByteSizeUnit.MB))
.build()
);
assertEquals(60, (int) recoverySettings.replicaitonRateLimiter().getMBPerSec());
assertEquals(80, (int) recoverySettings.replicaitonRateLimiter().getMBPerSec());
}

public void testRetryDelayStateSync() {
Expand Down

0 comments on commit 5c47e01

Please sign in to comment.