Skip to content

Commit

Permalink
change disk circuit breaker to cluster settings (opensearch-project#2634
Browse files Browse the repository at this point in the history
)

* change disk circuit breaker to cluster settings

Signed-off-by: zane-neo <[email protected]>

* Fix IT failure

Signed-off-by: zane-neo <[email protected]>

* format code

Signed-off-by: zane-neo <[email protected]>

* fix failure UT

Signed-off-by: zane-neo <[email protected]>

* fix failure IT

Signed-off-by: zane-neo <[email protected]>

* Address comments

Signed-off-by: zane-neo <[email protected]>

---------

Signed-off-by: zane-neo <[email protected]>
  • Loading branch information
zane-neo authored Jul 12, 2024
1 parent e99614c commit b6618b2
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,31 @@

package org.opensearch.ml.breaker;

import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_DISK_FREE_SPACE_THRESHOLD;

import java.io.File;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Optional;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.ml.common.exception.MLException;

/**
* A circuit breaker for disk usage.
*/
public class DiskCircuitBreaker extends ThresholdCircuitBreaker<Long> {
// TODO: make this value configurable as cluster setting
public class DiskCircuitBreaker extends ThresholdCircuitBreaker<ByteSizeValue> {
private static final String ML_DISK_CB = "Disk Circuit Breaker";
public static final long DEFAULT_DISK_SHORTAGE_THRESHOLD = 5L;
private static final long GB = 1024 * 1024 * 1024;
private String diskDir;

public DiskCircuitBreaker(String diskDir) {
super(DEFAULT_DISK_SHORTAGE_THRESHOLD);
this.diskDir = diskDir;
}
public static final ByteSizeValue DEFAULT_DISK_SHORTAGE_THRESHOLD = new ByteSizeValue(5, ByteSizeUnit.GB);
private final File diskDir;

public DiskCircuitBreaker(long threshold, String diskDir) {
super(threshold);
public DiskCircuitBreaker(Settings settings, ClusterService clusterService, File diskDir) {
super(Optional.ofNullable(ML_COMMONS_DISK_FREE_SPACE_THRESHOLD.get(settings)).orElse(DEFAULT_DISK_SHORTAGE_THRESHOLD));
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_DISK_FREE_SPACE_THRESHOLD, super::setThreshold);
this.diskDir = diskDir;
}

Expand All @@ -42,7 +43,7 @@ public String getName() {
public boolean isOpen() {
try {
return AccessController.doPrivileged((PrivilegedExceptionAction<Boolean>) () -> {
return (new File(diskDir).getFreeSpace() / GB) < getThreshold(); // in GB
return new ByteSizeValue(diskDir.getFreeSpace(), ByteSizeUnit.BYTES).compareTo(getThreshold()) < 0; // in GB
});
} catch (PrivilegedActionException e) {
throw new MLException("Failed to run disk circuit breaker");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.ml.breaker;

import java.io.File;
import java.nio.file.Path;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -76,7 +77,7 @@ public MLCircuitBreakerService init(Path path) {
// Register memory circuit breaker
registerBreaker(BreakerName.MEMORY, new MemoryCircuitBreaker(this.settings, this.clusterService, this.jvmService));
log.info("Registered ML memory breaker.");
registerBreaker(BreakerName.DISK, new DiskCircuitBreaker(path.toString()));
registerBreaker(BreakerName.DISK, new DiskCircuitBreaker(this.settings, this.clusterService, new File(path.toString())));
log.info("Registered ML disk breaker.");
// Register native memory circuit breaker, disabling due to unstability.
// registerBreaker(BreakerName.NATIVE_MEMORY, new NativeMemoryCircuitBreaker(this.osService, this.settings, this.clusterService));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_JVM_HEAP_MEM_THRESHOLD;

import java.util.Optional;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.monitor.jvm.JvmService;
Expand All @@ -15,11 +17,9 @@
* A circuit breaker for memory usage.
*/
public class MemoryCircuitBreaker extends ThresholdCircuitBreaker<Short> {
// TODO: make this value configurable as cluster setting
private static final String ML_MEMORY_CB = "Memory Circuit Breaker";
public static final short DEFAULT_JVM_HEAP_USAGE_THRESHOLD = 85;
private final JvmService jvmService;
private volatile Integer jvmHeapMemThreshold = 85;

public MemoryCircuitBreaker(JvmService jvmService) {
super(DEFAULT_JVM_HEAP_USAGE_THRESHOLD);
Expand All @@ -32,22 +32,23 @@ public MemoryCircuitBreaker(short threshold, JvmService jvmService) {
}

public MemoryCircuitBreaker(Settings settings, ClusterService clusterService, JvmService jvmService) {
super(DEFAULT_JVM_HEAP_USAGE_THRESHOLD);
super(
Optional
.ofNullable(ML_COMMONS_JVM_HEAP_MEM_THRESHOLD.get(settings))
.map(Integer::shortValue)
.orElse(DEFAULT_JVM_HEAP_USAGE_THRESHOLD)
);
this.jvmService = jvmService;
this.jvmHeapMemThreshold = ML_COMMONS_JVM_HEAP_MEM_THRESHOLD.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_JVM_HEAP_MEM_THRESHOLD, it -> jvmHeapMemThreshold = it);
clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(ML_COMMONS_JVM_HEAP_MEM_THRESHOLD, it -> super.setThreshold(it.shortValue()));
}

@Override
public String getName() {
return ML_MEMORY_CB;
}

@Override
public Short getThreshold() {
return this.jvmHeapMemThreshold.shortValue();
}

@Override
public boolean isOpen() {
return getThreshold() < 100 && jvmService.stats().getMem().getHeapUsedPercent() > getThreshold();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD;

import java.util.Optional;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.monitor.os.OsService;
Expand All @@ -18,18 +20,22 @@ public class NativeMemoryCircuitBreaker extends ThresholdCircuitBreaker<Short> {
private static final String ML_MEMORY_CB = "Native Memory Circuit Breaker";
public static final short DEFAULT_NATIVE_MEM_USAGE_THRESHOLD = 90;
private final OsService osService;
private volatile Integer nativeMemThreshold = 90;

public NativeMemoryCircuitBreaker(OsService osService, Settings settings, ClusterService clusterService) {
super(DEFAULT_NATIVE_MEM_USAGE_THRESHOLD);
super(
Optional
.ofNullable(ML_COMMONS_NATIVE_MEM_THRESHOLD.get(settings))
.map(Integer::shortValue)
.orElse(DEFAULT_NATIVE_MEM_USAGE_THRESHOLD)
);
this.osService = osService;
this.nativeMemThreshold = ML_COMMONS_NATIVE_MEM_THRESHOLD.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_NATIVE_MEM_THRESHOLD, it -> nativeMemThreshold = it);
clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(ML_COMMONS_NATIVE_MEM_THRESHOLD, it -> super.setThreshold(it.shortValue()));
}

public NativeMemoryCircuitBreaker(Integer threshold, OsService osService) {
super(threshold.shortValue());
this.nativeMemThreshold = threshold;
this.osService = osService;
}

Expand All @@ -38,13 +44,8 @@ public String getName() {
return ML_MEMORY_CB;
}

@Override
public Short getThreshold() {
return this.nativeMemThreshold.shortValue();
}

@Override
public boolean isOpen() {
return osService.stats().getMem().getUsedPercent() > this.nativeMemThreshold.shortValue();
return osService.stats().getMem().getUsedPercent() > getThreshold();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,21 @@

package org.opensearch.ml.breaker;

import lombok.Data;

/**
* An abstract class for all breakers with threshold.
* @param <T> data type of threshold
*/
@Data
public abstract class ThresholdCircuitBreaker<T> implements CircuitBreaker {

private T threshold;
private volatile T threshold;

public ThresholdCircuitBreaker(T threshold) {
this.threshold = threshold;
}

public T getThreshold() {
return threshold;
}

@Override
public abstract boolean isOpen();
}
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,7 @@ public List<Setting<?>> getSettings() {
MLCommonsSettings.ML_COMMONS_MAX_DEPLOY_MODEL_TASKS_PER_NODE,
MLCommonsSettings.ML_COMMONS_TRUSTED_URL_REGEX,
MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD,
MLCommonsSettings.ML_COMMONS_DISK_FREE_SPACE_THRESHOLD,
MLCommonsSettings.ML_COMMONS_JVM_HEAP_MEM_THRESHOLD,
MLCommonsSettings.ML_COMMONS_EXCLUDE_NODE_NAMES,
MLCommonsSettings.ML_COMMONS_ALLOW_CUSTOM_DEPLOYMENT_PLAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.function.Function;

import org.opensearch.common.settings.Setting;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.ml.common.conversation.ConversationalIndexConstants;
import org.opensearch.searchpipelines.questionanswering.generative.GenerativeQAProcessorConstants;

Expand Down Expand Up @@ -77,6 +79,14 @@ private MLCommonsSettings() {}
public static final Setting<Integer> ML_COMMONS_JVM_HEAP_MEM_THRESHOLD = Setting
.intSetting("plugins.ml_commons.jvm_heap_memory_threshold", 85, 0, 100, Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final Setting<ByteSizeValue> ML_COMMONS_DISK_FREE_SPACE_THRESHOLD = Setting
.byteSizeSetting(
"plugins.ml_commons.disk_free_space_threshold",
new ByteSizeValue(5L, ByteSizeUnit.GB),
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public static final Setting<String> ML_COMMONS_EXCLUDE_NODE_NAMES = Setting
.simpleString("plugins.ml_commons.exclude_nodes._name", Setting.Property.NodeScope, Setting.Property.Dynamic);
public static final Setting<Boolean> ML_COMMONS_ALLOW_CUSTOM_DEPLOYMENT_PLAN = Setting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ml.breaker;

import static org.mockito.Mockito.when;
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_DISK_FREE_SPACE_THRESHOLD;

import java.io.File;
import java.util.HashSet;
import java.util.List;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;

public class DiskCircuitBreakerTests {
@Mock
ClusterService clusterService;

@Mock
File file;

@Before
public void setup() {
MockitoAnnotations.openMocks(this);
when(clusterService.getClusterSettings())
.thenReturn(new ClusterSettings(Settings.EMPTY, new HashSet<>(List.of(ML_COMMONS_DISK_FREE_SPACE_THRESHOLD))));
}

@Test
public void test_isOpen_whenDiskFreeSpaceIsHigherThanMinValue_breakerIsNotOpen() {
CircuitBreaker breaker = new DiskCircuitBreaker(
Settings.builder().put(ML_COMMONS_DISK_FREE_SPACE_THRESHOLD.getKey(), new ByteSizeValue(4L, ByteSizeUnit.GB)).build(),
clusterService,
file
);
when(file.getFreeSpace()).thenReturn(5 * 1024 * 1024 * 1024L);
Assert.assertFalse(breaker.isOpen());
}

@Test
public void test_isOpen_whenDiskFreeSpaceIsLessThanMinValue_breakerIsOpen() {
CircuitBreaker breaker = new DiskCircuitBreaker(
Settings.builder().put(ML_COMMONS_DISK_FREE_SPACE_THRESHOLD.getKey(), new ByteSizeValue(5L, ByteSizeUnit.GB)).build(),
clusterService,
file
);
when(file.getFreeSpace()).thenReturn(4 * 1024 * 1024 * 1024L);
Assert.assertTrue(breaker.isOpen());
}

@Test
public void test_isOpen_whenDiskFreeSpaceConfiguredToZero_breakerIsNotOpen() {
CircuitBreaker breaker = new DiskCircuitBreaker(
Settings.builder().put(ML_COMMONS_DISK_FREE_SPACE_THRESHOLD.getKey(), new ByteSizeValue(0L, ByteSizeUnit.KB)).build(),
clusterService,
file
);
when(file.getFreeSpace()).thenReturn((long) (Math.random() * 1024 * 1024 * 1024 * 1024L));
Assert.assertFalse(breaker.isOpen());
}

@Test
public void test_getName() {
CircuitBreaker breaker = new DiskCircuitBreaker(Settings.EMPTY, clusterService, file);
Assert.assertEquals("Disk Circuit Breaker", breaker.getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.ml.breaker;

import static org.mockito.Mockito.when;
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_DISK_FREE_SPACE_THRESHOLD;
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_JVM_HEAP_MEM_THRESHOLD;
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_NATIVE_MEM_THRESHOLD;

Expand Down Expand Up @@ -103,7 +104,9 @@ public void testInit() {
.build();
ClusterSettings clusterSettings = new ClusterSettings(
settings,
new HashSet<>(Arrays.asList(ML_COMMONS_NATIVE_MEM_THRESHOLD, ML_COMMONS_JVM_HEAP_MEM_THRESHOLD))
new HashSet<>(
Arrays.asList(ML_COMMONS_NATIVE_MEM_THRESHOLD, ML_COMMONS_JVM_HEAP_MEM_THRESHOLD, ML_COMMONS_DISK_FREE_SPACE_THRESHOLD)
)
);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
mlCircuitBreakerService = new MLCircuitBreakerService(jvmService, osService, settings, clusterService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ public void setupSettings() throws IOException {
String jsonEntity = "{\n"
+ " \"persistent\" : {\n"
+ " \"plugins.ml_commons.jvm_heap_memory_threshold\" : 100, \n"
+ " \"plugins.ml_commons.native_memory_threshold\" : 100 \n"
+ " \"plugins.ml_commons.native_memory_threshold\" : 100, \n"
+ " \"plugins.ml_commons.disk_free_space_threshold\" : 0 \n"
+ " }\n"
+ "}";
response = TestHelper
Expand Down

0 comments on commit b6618b2

Please sign in to comment.