diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/TransportCreateQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/TransportCreateQueryGroupAction.java index 670fb870e44ce..aa3d724c20c2d 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/TransportCreateQueryGroupAction.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/TransportCreateQueryGroupAction.java @@ -13,7 +13,7 @@ import org.opensearch.cluster.metadata.QueryGroup; import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; -import org.opensearch.plugin.wlm.service.Persistable; +import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -28,7 +28,7 @@ public class TransportCreateQueryGroupAction extends HandledTransportAction { private final ThreadPool threadPool; - private final Persistable queryGroupPersistenceService; + private final QueryGroupPersistenceService queryGroupPersistenceService; /** * Constructor for TransportCreateQueryGroupAction @@ -37,7 +37,7 @@ public class TransportCreateQueryGroupAction extends HandledTransportAction queryGroupPersistenceService + QueryGroupPersistenceService queryGroupPersistenceService ) { super(CreateQueryGroupAction.NAME, transportService, actionFilters, CreateQueryGroupRequest::new); this.threadPool = threadPool; @@ -60,6 +60,7 @@ protected void doExecute(Task task, CreateQueryGroupRequest request, ActionListe .resourceLimits(request.getResourceLimits()) .updatedAt(request.getUpdatedAtInMillis()) .build(); - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> queryGroupPersistenceService.persist(queryGroup, listener)); + threadPool.executor(ThreadPool.Names.GENERIC) + .execute(() -> queryGroupPersistenceService.persistInClusterStateMetadata(queryGroup, listener)); } } diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPluginModule.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPluginModule.java index 58208af293f14..87109d06259bb 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPluginModule.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/WorkloadManagementPluginModule.java @@ -8,11 +8,7 @@ package org.opensearch.plugin.wlm; -import org.opensearch.cluster.metadata.QueryGroup; import org.opensearch.common.inject.AbstractModule; -import org.opensearch.common.inject.TypeLiteral; -import org.opensearch.plugin.wlm.service.Persistable; -import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService; /** * Guice Module to manage WorkloadManagement related objects @@ -25,8 +21,5 @@ public class WorkloadManagementPluginModule extends AbstractModule { public WorkloadManagementPluginModule() {} @Override - protected void configure() { - bind(new TypeLiteral>() { - }).to(QueryGroupPersistenceService.class).asEagerSingleton(); - } + protected void configure() {} } diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestCreateQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestCreateQueryGroupAction.java index 0f488be15bd11..45d190fdd7a78 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestCreateQueryGroupAction.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/rest/RestCreateQueryGroupAction.java @@ -50,7 +50,7 @@ public String getName() { */ @Override public List routes() { - return List.of(new Route(POST, "_wlm/_query_group/"), new Route(PUT, "_wlm/_query_group/")); + return List.of(new Route(POST, "_wlm/query_group/"), new Route(PUT, "_wlm/query_group/")); } @Override diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/Persistable.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/Persistable.java deleted file mode 100644 index 9683a5b0ecc42..0000000000000 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/Persistable.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.plugin.wlm.service; - -import org.opensearch.core.action.ActionListener; -import org.opensearch.plugin.wlm.CreateQueryGroupResponse; - -/** - * This interface defines the key APIs for implementing QueruGroup persistence - */ -public interface Persistable { - - /** - * persists the QueryGroup in a durable storage - * @param queryGroup - queryGroup to be persisted - * @param listener - ActionListener for CreateQueryGroupResponse - */ - void persist(T queryGroup, ActionListener listener); -} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java index cb833f02db569..e903b947ad929 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/service/QueryGroupPersistenceService.java @@ -34,7 +34,7 @@ /** * This class defines the functions for QueryGroup persistence */ -public class QueryGroupPersistenceService implements Persistable { +public class QueryGroupPersistenceService { private static final Logger logger = LogManager.getLogger(QueryGroupPersistenceService.class); private final ClusterService clusterService; private static final String SOURCE = "query-group-persistence-service"; @@ -78,16 +78,12 @@ public void setMaxQueryGroupCount(int newMaxQueryGroupCount) { this.maxQueryGroupCount = newMaxQueryGroupCount; } - @Override - public void persist(QueryGroup queryGroup, ActionListener listener) { - persistInClusterStateMetadata(queryGroup, listener); - } - /** * Update cluster state to include the new QueryGroup * @param queryGroup {@link QueryGroup} - the QueryGroup we're currently creating + * @param listener - ActionListener for CreateQueryGroupResponse */ - void persistInClusterStateMetadata(QueryGroup queryGroup, ActionListener listener) { + public void persistInClusterStateMetadata(QueryGroup queryGroup, ActionListener listener) { clusterService.submitStateUpdateTask(SOURCE, new ClusterStateUpdateTask(Priority.NORMAL) { @Override public ClusterState execute(ClusterState currentState) throws Exception { @@ -184,11 +180,4 @@ private double calculateExistingUsage(String resourceName, Map NODE_LEVEL_REJECTION_THRESHOLD = Setting.doubleSetting( - NODE_REJECTION_THRESHOLD_SETTING_NAME, - DEFAULT_NODE_LEVEL_REJECTION_THRESHOLD, + public static final Setting NODE_LEVEL_MEMORY_REJECTION_THRESHOLD = Setting.doubleSetting( + NODE_MEMORY_REJECTION_THRESHOLD_SETTING_NAME, + DEFAULT_NODE_LEVEL_MEMORY_REJECTION_THRESHOLD, Setting.Property.Dynamic, Setting.Property.NodeScope ); /** - * Setting name for node level cancellation threshold + * Setting name for node level cpu rejection threshold for QSB */ - public static final String NODE_CANCELLATION_THRESHOLD_SETTING_NAME = "query_group.node.cancellation_threshold"; + public static final String NODE_CPU_REJECTION_THRESHOLD_SETTING_NAME = "query_group.node.cpu_rejection_threshold"; /** - * Setting name for node level cancellation threshold + * Setting to control the cpu rejection threshold */ - public static final Setting NODE_LEVEL_CANCELLATION_THRESHOLD = Setting.doubleSetting( - NODE_CANCELLATION_THRESHOLD_SETTING_NAME, - DEFAULT_NODE_LEVEL_CANCELLATION_THRESHOLD, + public static final Setting NODE_LEVEL_CPU_REJECTION_THRESHOLD = Setting.doubleSetting( + NODE_CPU_REJECTION_THRESHOLD_SETTING_NAME, + DEFAULT_NODE_LEVEL_CPU_REJECTION_THRESHOLD, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + /** + * Setting name for node level memory cancellation threshold + */ + public static final String NODE_MEMORY_CANCELLATION_THRESHOLD_SETTING_NAME = "query_group.node.memory_cancellation_threshold"; + /** + * Setting name for node level memory cancellation threshold + */ + public static final Setting NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD = Setting.doubleSetting( + NODE_MEMORY_CANCELLATION_THRESHOLD_SETTING_NAME, + DEFAULT_NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + /** + * Setting name for node level cpu cancellation threshold + */ + public static final String NODE_CPU_CANCELLATION_THRESHOLD_SETTING_NAME = "query_group.node.cpu_cancellation_threshold"; + /** + * Setting name for node level cpu cancellation threshold + */ + public static final Setting NODE_LEVEL_CPU_CANCELLATION_THRESHOLD = Setting.doubleSetting( + NODE_CPU_CANCELLATION_THRESHOLD_SETTING_NAME, + DEFAULT_NODE_LEVEL_CPU_CANCELLATION_THRESHOLD, Setting.Property.Dynamic, Setting.Property.NodeScope ); @@ -97,15 +129,20 @@ public class QueryGroupServiceSettings { */ public QueryGroupServiceSettings(Settings settings, ClusterSettings clusterSettings) { runIntervalMillis = new TimeValue(QUERY_GROUP_RUN_INTERVAL_SETTING.get(settings)); - nodeLevelMemoryCancellationThreshold = NODE_LEVEL_CANCELLATION_THRESHOLD.get(settings); - nodeLevelMemoryRejectionThreshold = NODE_LEVEL_REJECTION_THRESHOLD.get(settings); + nodeLevelMemoryCancellationThreshold = NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD.get(settings); + nodeLevelMemoryRejectionThreshold = NODE_LEVEL_MEMORY_REJECTION_THRESHOLD.get(settings); + nodeLevelCpuCancellationThreshold = NODE_LEVEL_CPU_CANCELLATION_THRESHOLD.get(settings); + nodeLevelCpuRejectionThreshold = NODE_LEVEL_CPU_REJECTION_THRESHOLD.get(settings); maxQueryGroupCount = MAX_QUERY_GROUP_COUNT.get(settings); - ensureRejectionThresholdIsLessThanCancellation(nodeLevelMemoryRejectionThreshold, nodeLevelMemoryCancellationThreshold); + ensureMemoryRejectionThresholdIsLessThanCancellation(nodeLevelMemoryRejectionThreshold, nodeLevelMemoryCancellationThreshold); + ensureCpuRejectionThresholdIsLessThanCancellation(nodeLevelCpuRejectionThreshold, nodeLevelCpuCancellationThreshold); clusterSettings.addSettingsUpdateConsumer(MAX_QUERY_GROUP_COUNT, this::setMaxQueryGroupCount); - clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_CANCELLATION_THRESHOLD, this::setNodeLevelMemoryCancellationThreshold); - clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_REJECTION_THRESHOLD, this::setNodeLevelMemoryRejectionThreshold); + clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD, this::setNodeLevelMemoryCancellationThreshold); + clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_MEMORY_REJECTION_THRESHOLD, this::setNodeLevelMemoryRejectionThreshold); + clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_CPU_CANCELLATION_THRESHOLD, this::setNodeLevelCpuCancellationThreshold); + clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_CPU_REJECTION_THRESHOLD, this::setNodeLevelCpuRejectionThreshold); } /** @@ -128,62 +165,125 @@ public void setMaxQueryGroupCount(int newMaxQueryGroupCount) { } /** - * Method to get the node level cancellation threshold - * @return current node level cancellation threshold + * Method to get the node level memory cancellation threshold + * @return current node level memory cancellation threshold */ public Double getNodeLevelMemoryCancellationThreshold() { return nodeLevelMemoryCancellationThreshold; } /** - * Method to set the node level cancellation threshold - * @param nodeLevelMemoryCancellationThreshold sets the new node level cancellation threshold + * Method to set the node level memory cancellation threshold + * @param nodeLevelMemoryCancellationThreshold sets the new node level memory cancellation threshold * @throws IllegalArgumentException if the value is > 0.95 and cancellation < rejection threshold */ public void setNodeLevelMemoryCancellationThreshold(Double nodeLevelMemoryCancellationThreshold) { - if (Double.compare(nodeLevelMemoryCancellationThreshold, NODE_LEVEL_CANCELLATION_THRESHOLD_MAX_VALUE) > 0) { + if (Double.compare(nodeLevelMemoryCancellationThreshold, NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD_MAX_VALUE) > 0) { throw new IllegalArgumentException( - NODE_CANCELLATION_THRESHOLD_SETTING_NAME + " value should not be greater than 0.95 as it pose a threat of node drop" + NODE_MEMORY_CANCELLATION_THRESHOLD_SETTING_NAME + " value should not be greater than 0.95 as it pose a threat of node drop" ); } - ensureRejectionThresholdIsLessThanCancellation(nodeLevelMemoryRejectionThreshold, nodeLevelMemoryCancellationThreshold); + ensureMemoryRejectionThresholdIsLessThanCancellation(nodeLevelMemoryRejectionThreshold, nodeLevelMemoryCancellationThreshold); this.nodeLevelMemoryCancellationThreshold = nodeLevelMemoryCancellationThreshold; } /** - * Method to get the node level rejection threshold - * @return the current node level rejection threshold + * Method to get the node level cpu cancellation threshold + * @return current node level cpu cancellation threshold + */ + public Double getNodeLevelCpuCancellationThreshold() { + return nodeLevelCpuCancellationThreshold; + } + + /** + * Method to set the node level cpu cancellation threshold + * @param nodeLevelCpuCancellationThreshold sets the new node level cpu cancellation threshold + * @throws IllegalArgumentException if the value is > 0.95 and cancellation < rejection threshold + */ + public void setNodeLevelCpuCancellationThreshold(Double nodeLevelCpuCancellationThreshold) { + if (Double.compare(nodeLevelCpuCancellationThreshold, NODE_LEVEL_CPU_CANCELLATION_THRESHOLD_MAX_VALUE) > 0) { + throw new IllegalArgumentException( + NODE_CPU_CANCELLATION_THRESHOLD_SETTING_NAME + " value should not be greater than 0.95 as it pose a threat of node drop" + ); + } + + ensureCpuRejectionThresholdIsLessThanCancellation(nodeLevelCpuRejectionThreshold, nodeLevelCpuCancellationThreshold); + + this.nodeLevelCpuCancellationThreshold = nodeLevelCpuCancellationThreshold; + } + + /** + * Method to get the memory node level rejection threshold + * @return the current memory node level rejection threshold */ public Double getNodeLevelMemoryRejectionThreshold() { return nodeLevelMemoryRejectionThreshold; } /** - * Method to set the node level rejection threshold - * @param nodeLevelMemoryRejectionThreshold sets the new rejection threshold + * Method to set the node level memory rejection threshold + * @param nodeLevelMemoryRejectionThreshold sets the new memory rejection threshold * @throws IllegalArgumentException if rejection > 0.90 and rejection < cancellation threshold */ public void setNodeLevelMemoryRejectionThreshold(Double nodeLevelMemoryRejectionThreshold) { - if (Double.compare(nodeLevelMemoryRejectionThreshold, NODE_LEVEL_REJECTION_THRESHOLD_MAX_VALUE) > 0) { + if (Double.compare(nodeLevelMemoryRejectionThreshold, NODE_LEVEL_MEMORY_REJECTION_THRESHOLD_MAX_VALUE) > 0) { throw new IllegalArgumentException( - NODE_REJECTION_THRESHOLD_SETTING_NAME + " value not be greater than 0.90 as it pose a threat of node drop" + NODE_MEMORY_REJECTION_THRESHOLD_SETTING_NAME + " value not be greater than 0.90 as it pose a threat of node drop" ); } - ensureRejectionThresholdIsLessThanCancellation(nodeLevelMemoryRejectionThreshold, nodeLevelMemoryCancellationThreshold); + ensureMemoryRejectionThresholdIsLessThanCancellation(nodeLevelMemoryRejectionThreshold, nodeLevelMemoryCancellationThreshold); this.nodeLevelMemoryRejectionThreshold = nodeLevelMemoryRejectionThreshold; } - private void ensureRejectionThresholdIsLessThanCancellation( + /** + * Method to get the cpu node level rejection threshold + * @return the current cpu node level rejection threshold + */ + public Double getNodeLevelCpuRejectionThreshold() { + return nodeLevelCpuRejectionThreshold; + } + + /** + * Method to set the node level cpu rejection threshold + * @param nodeLevelCpuRejectionThreshold sets the new cpu rejection threshold + * @throws IllegalArgumentException if rejection > 0.90 and rejection < cancellation threshold + */ + public void setNodeLevelCpuRejectionThreshold(Double nodeLevelCpuRejectionThreshold) { + if (Double.compare(nodeLevelCpuRejectionThreshold, NODE_LEVEL_CPU_REJECTION_THRESHOLD_MAX_VALUE) > 0) { + throw new IllegalArgumentException( + NODE_CPU_REJECTION_THRESHOLD_SETTING_NAME + " value not be greater than 0.90 as it pose a threat of node drop" + ); + } + + ensureCpuRejectionThresholdIsLessThanCancellation(nodeLevelCpuRejectionThreshold, nodeLevelCpuCancellationThreshold); + + this.nodeLevelCpuRejectionThreshold = nodeLevelCpuRejectionThreshold; + } + + private void ensureMemoryRejectionThresholdIsLessThanCancellation( Double nodeLevelMemoryRejectionThreshold, Double nodeLevelMemoryCancellationThreshold ) { if (Double.compare(nodeLevelMemoryCancellationThreshold, nodeLevelMemoryRejectionThreshold) < 0) { throw new IllegalArgumentException( - NODE_CANCELLATION_THRESHOLD_SETTING_NAME + " value should not be less than " + NODE_REJECTION_THRESHOLD_SETTING_NAME + NODE_MEMORY_CANCELLATION_THRESHOLD_SETTING_NAME + + " value should not be less than " + + NODE_MEMORY_REJECTION_THRESHOLD_SETTING_NAME + ); + } + } + + private void ensureCpuRejectionThresholdIsLessThanCancellation( + Double nodeLevelCpuRejectionThreshold, + Double nodeLevelCpuCancellationThreshold + ) { + if (Double.compare(nodeLevelCpuCancellationThreshold, nodeLevelCpuRejectionThreshold) < 0) { + throw new IllegalArgumentException( + NODE_CPU_CANCELLATION_THRESHOLD_SETTING_NAME + " value should not be less than " + NODE_CPU_REJECTION_THRESHOLD_SETTING_NAME ); } }