diff --git a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/actions/SearchBackPressureAction.java b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/actions/SearchBackPressureAction.java deleted file mode 100644 index 57381f5b8..000000000 --- a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/actions/SearchBackPressureAction.java +++ /dev/null @@ -1,308 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.decisionmaker.actions; - - -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.annotations.SerializedName; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.performanceanalyzer.AppContext; -import org.opensearch.performanceanalyzer.rca.store.rca.cluster.NodeKey; - -public class SearchBackPressureAction extends SuppressibleAction { - private static final Logger LOG = LogManager.getLogger(SearchBackPressureAction.class); - public static final String NAME = "SearchBackPressureAction"; - private static final ImpactVector NO_IMPACT = new ImpactVector(); - - /* - * Time to wait since last recommendation, before suggesting this action again - */ - private static final long DEFAULT_COOL_OFF_PERIOD_IN_MILLIS = TimeUnit.DAYS.toMillis(1); - - /* From Config Per Diumension Type - * canUpdate: whether the action should be emitted - * coolOffPeriodInMillis: how long the CoolOffPeriod the action should before reemit - * thresholdName: the name of threshold we are tuning (e.g. node_duress.cpu_threshold, search_heap_threshold) - * dimension: indicates whether the resource unit is caused by shard/task level searchbackpressure cancellation stats - * Step Size in percentage: how much should the threshold change in percentage - */ - private boolean canUpdate; - private long coolOffPeriodInMillis; - private String thresholdName; - - private SearchbpDimension dimension; - private SearchbpThresholdActionDirection direction; - private double stepSizeInPercentage; - - public SearchBackPressureAction( - final AppContext appContext, - final boolean canUpdate, - final long coolOffPeriodInMillis, - final String thresholdName, - final SearchbpDimension dimension, - final SearchbpThresholdActionDirection direction, - final double stepSizeInPercentage) { - super(appContext); - this.canUpdate = canUpdate; - this.coolOffPeriodInMillis = coolOffPeriodInMillis; - this.thresholdName = thresholdName; - this.dimension = dimension; - this.direction = direction; - this.stepSizeInPercentage = stepSizeInPercentage; - } - - @Override - public String name() { - return NAME; - } - - @Override - public boolean canUpdate() { - return canUpdate; - } - - @Override - public long coolOffPeriodInMillis() { - return coolOffPeriodInMillis; - } - - @Override - public List impactedNodes() { - // all nodes are impacted by this change - return appContext.getDataNodeInstances().stream() - .map(NodeKey::new) - .collect(Collectors.toList()); - } - - /* Search Back Pressure Decider/Policy only tunes searchbackpressure related thresholds (e.g. search_backpressure.search_task_heap_threshold) - * and it does not correlate directly with any current dimension in the ImpactVector (e.g. CPU/HEAP). - * And the current Searchbp actions only adjust heap related Searchbp Thresholds for now. - * Dimensions in ImpactVector is used by collator to determine which action should be emitted to Publisher, - * eventually which actions should the downstream class execute. So if there are 2 actions emitting in the same time, one increase CPU and one decrease it, the collator cancel out the actions. - * However, since for Searchbp Actions we only tune the searchbp threshold once per time (it's impossible for 2 actions emitting in the same time that increase and decrease searchbackpressure heap usage threshold). - * Therefore, we put no Impact for ImpactVector for Searchbp Actions. - */ - @Override - public Map impact() { - Map impact = new HashMap<>(); - for (NodeKey key : impactedNodes()) { - impact.put(key, NO_IMPACT); - } - return impact; - } - - public String getThresholdName() { - return thresholdName; - } - - public String getDimension() { - return dimension.toString(); - } - - public String getDirection() { - return direction.toString(); - } - - public double getStepSizeInPercentage() { - return stepSizeInPercentage; - } - - @Override - public String summary() { - Summary summary = - new Summary( - thresholdName, - dimension.toString(), - direction.toString(), - stepSizeInPercentage, - DEFAULT_COOL_OFF_PERIOD_IN_MILLIS, - canUpdate); - return summary.toJson(); - } - - public static final class Builder { - public static final boolean DEFAULT_CAN_UPDATE = true; - - private final AppContext appContext; - private final String thresholdName; - private final SearchbpDimension dimension; - private final SearchbpThresholdActionDirection direction; - private boolean canUpdate; - private double stepSizeInPercentage; - private long coolOffPeriodInMillis; - - private Builder( - final AppContext appContext, - final String thresholdName, - final SearchbpDimension dimension, - final SearchbpThresholdActionDirection direction, - final long coolOffPeriodInMillis) { - this.appContext = appContext; - this.thresholdName = thresholdName; - this.dimension = dimension; - this.direction = direction; - this.coolOffPeriodInMillis = coolOffPeriodInMillis; - this.canUpdate = DEFAULT_CAN_UPDATE; - } - - public Builder stepSizeInPercentage(double stepSizeInPercentage) { - this.stepSizeInPercentage = stepSizeInPercentage; - return this; - } - - public Builder coolOffPeriodInMillis(long coolOffPeriodInMillis) { - this.coolOffPeriodInMillis = coolOffPeriodInMillis; - return this; - } - - public SearchBackPressureAction build() { - return new SearchBackPressureAction( - appContext, - canUpdate, - coolOffPeriodInMillis, - thresholdName, - dimension, - direction, - stepSizeInPercentage); - } - } - - /* Write Static Class Summary to conver the Searchbp Action POJO to JSON Object - * Key fields to be included - * 1. ThresholdName: name of the SearchBackPressure threshold to be tuned - * 2. Dimension of the action (Shard/Task) - * 3. Direction of the action (Increase/Decrease) - * 3. StepSizeInPercentage to change the threshold - * 4. CoolOffPeriodInMillis for the action - * 5. canUpdate (whether the action should be emitted) - */ - public static class Summary { - public static final String THRESHOLD_NAME = "thresholdName"; - public static final String SEARCHBP_DIMENSION = "searchbpDimension"; - public static final String DIRECTION = "direction"; - public static final String STEP_SIZE_IN_PERCENTAGE = "stepSizeInPercentage"; - public static final String COOL_OFF_PERIOD = "coolOffPeriodInMillis"; - public static final String CAN_UPDATE = "canUpdate"; - - @SerializedName(value = THRESHOLD_NAME) - private String thresholdName; - - @SerializedName(value = SEARCHBP_DIMENSION) - private String searchbpSettingDimension; - - @SerializedName(value = DIRECTION) - private String direction; - - @SerializedName(value = STEP_SIZE_IN_PERCENTAGE) - private double stepSizeInPercentage; - - @SerializedName(value = COOL_OFF_PERIOD) - private long coolOffPeriodInMillis; - - @SerializedName(value = CAN_UPDATE) - private boolean canUpdate; - - public Summary( - String thresholdName, - String searchbpSettingDimension, - String direction, - double stepSizeInPercentage, - long coolOffPeriodInMillis, - boolean canUpdate) { - this.thresholdName = thresholdName; - this.searchbpSettingDimension = searchbpSettingDimension; - this.direction = direction; - this.stepSizeInPercentage = stepSizeInPercentage; - this.coolOffPeriodInMillis = coolOffPeriodInMillis; - this.canUpdate = canUpdate; - } - - /* - * ThresholdName is the name of the setting to be modified - * e.g. node_duress.cpu_threshold, node_duress.search_heap_threshold - */ - public String getThresholdName() { - return thresholdName; - } - - public String getSearchbpSettingDimension() { - return searchbpSettingDimension; - } - - public String getDirection() { - return direction; - } - - public double getStepSizeInPercentage() { - return stepSizeInPercentage; - } - - public long getCoolOffPeriodInMillis() { - return coolOffPeriodInMillis; - } - - public boolean getCanUpdate() { - return canUpdate; - } - - public String toJson() { - Gson gson = new GsonBuilder().disableHtmlEscaping().create(); - return gson.toJson(this); - } - } - - // enum to indicate to increase/decrease the threshold - public enum SearchbpThresholdActionDirection { - INCREASE(SearchbpThresholdActionDirection.Constants.INCREASE), - DECREASE(SearchbpThresholdActionDirection.Constants.DECREASE); - - private final String value; - - SearchbpThresholdActionDirection(String value) { - this.value = value; - } - - @Override - public String toString() { - return value; - } - - public static class Constants { - public static final String INCREASE = "increase"; - public static final String DECREASE = "decrease"; - } - } - - // enum to indicate to whether the action is caused by shard/task level searchbackpressure - // cancellation - public enum SearchbpDimension { - SHARD(SearchbpDimension.Constants.SHARD), - TASK(SearchbpDimension.Constants.TASK); - - private final String value; - - SearchbpDimension(String value) { - this.value = value; - } - - @Override - public String toString() { - return value; - } - - public static class Constants { - public static final String SHARD = "shard"; - public static final String TASK = "task"; - } - } -} diff --git a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/configs/DeciderConfig.java b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/configs/DeciderConfig.java index 096c45917..c2b3ed444 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/configs/DeciderConfig.java +++ b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/configs/DeciderConfig.java @@ -8,7 +8,6 @@ import org.opensearch.performanceanalyzer.decisionmaker.deciders.configs.jvm.OldGenDecisionPolicyConfig; import org.opensearch.performanceanalyzer.decisionmaker.deciders.configs.jvm.young_gen.JvmGenTuningPolicyConfig; -import org.opensearch.performanceanalyzer.decisionmaker.deciders.configs.searchbackpressure.SearchBackPressurePolicyConfig; import org.opensearch.performanceanalyzer.rca.framework.core.NestedConfig; import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf; @@ -29,14 +28,11 @@ public class DeciderConfig { private static final String OLD_GEN_DECISION_POLICY_CONFIG_NAME = "old-gen-decision-policy-config"; private static final String JVM_GEN_TUNING_POLICY_CONFIG_NAME = "jvm-gen-tuning-policy-config"; - private static final String SEARCH_BACK_PRESSURE_POLICY_CONFIG_NAME = - "search-back-pressure-policy-config"; private final CachePriorityOrderConfig cachePriorityOrderConfig; private final WorkLoadTypeConfig workLoadTypeConfig; private final OldGenDecisionPolicyConfig oldGenDecisionPolicyConfig; private final JvmGenTuningPolicyConfig jvmGenTuningPolicyConfig; - private final SearchBackPressurePolicyConfig searchBackPressurePolicyConfig; public DeciderConfig(final RcaConf rcaConf) { cachePriorityOrderConfig = @@ -55,11 +51,6 @@ public DeciderConfig(final RcaConf rcaConf) { new NestedConfig( JVM_GEN_TUNING_POLICY_CONFIG_NAME, rcaConf.getDeciderConfigSettings())); - searchBackPressurePolicyConfig = - new SearchBackPressurePolicyConfig( - new NestedConfig( - SEARCH_BACK_PRESSURE_POLICY_CONFIG_NAME, - rcaConf.getDeciderConfigSettings())); } public CachePriorityOrderConfig getCachePriorityOrderConfig() { @@ -77,8 +68,4 @@ public OldGenDecisionPolicyConfig getOldGenDecisionPolicyConfig() { public JvmGenTuningPolicyConfig getJvmGenTuningPolicyConfig() { return jvmGenTuningPolicyConfig; } - - public SearchBackPressurePolicyConfig getSearchBackPressurePolicyConfig() { - return searchBackPressurePolicyConfig; - } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/configs/searchbackpressure/SearchBackPressurePolicyConfig.java b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/configs/searchbackpressure/SearchBackPressurePolicyConfig.java deleted file mode 100644 index 11f7593e6..000000000 --- a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/configs/searchbackpressure/SearchBackPressurePolicyConfig.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.decisionmaker.deciders.configs.searchbackpressure; - - -import java.util.concurrent.TimeUnit; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.SearchBackPressurePolicy; -import org.opensearch.performanceanalyzer.rca.framework.core.Config; -import org.opensearch.performanceanalyzer.rca.framework.core.NestedConfig; - -/** - * Configures various thresholds for the {@link SearchBackPressurePolicy} - * - *

The config follows the format below "decider-config-settings": { - * "search-back-pressure-policy-config": { "enabled": true, // whether the - * serch-back-pressure-policy should be enabled "hour-breach-threshold": 30, // threshold for hourly - * received unhealthy cluster level rca flow units, if above, then the below thresholds should be - * modified, "threshold_count": 1, // how many thresholds to be changed, in this case - * search-heap-threshold, "searchbp-heap-stepsize-in-percentage": 5, } } - * "searchbp-heap-stepsize-in-percentage" defines the step size to change heap related threshold (in - * percentage). - */ -public class SearchBackPressurePolicyConfig { - private static final Logger LOG = LogManager.getLogger(SearchBackPressurePolicyConfig.class); - - // Field Names - private static final String ENABLED = "enabled"; - private static final String HOUR_BREACH_THRESHOLD = "hour-breach-threshold"; - private static final String THRESHOLD_COUNT = "threshold_count"; - private static final String SEARCHBP_HEAP_STEPSIZE_IN_PERCENTAGE = - "searchbp-heap-stepsize-in-percentage"; - - // Default values - public static final boolean DEFAULT_ENABLED = true; - - // TO DO: Decide the Default Hour breach threshold - public static final int DEFAULT_HOUR_BREACH_THRESHOLD = 2; - public static final int HOUR_MONITOR_WINDOW_SIZE_MINUTES = (int) TimeUnit.HOURS.toMinutes(1); - public static final int HOUR_MONITOR_BUCKET_SIZE_MINUTES = 1; - public static final double DEFAULT_SEARCHBP_HEAP_STEPSIZE_IN_PERCENTAGE = 5; - - private Config hourBreachThreshold; - private Config enabled; - private Config searchbpHeapStepsizeInPercentage; - - public SearchBackPressurePolicyConfig(NestedConfig config) { - enabled = new Config<>(ENABLED, config.getValue(), DEFAULT_ENABLED, Boolean.class); - hourBreachThreshold = - new Config<>( - HOUR_BREACH_THRESHOLD, - config.getValue(), - DEFAULT_HOUR_BREACH_THRESHOLD, - Integer.class); - LOG.debug( - "SearchBackPressurePolicyConfig hour breach threshold is: {}", - hourBreachThreshold.getValue()); - - searchbpHeapStepsizeInPercentage = - new Config<>( - SEARCHBP_HEAP_STEPSIZE_IN_PERCENTAGE, - config.getValue(), - DEFAULT_SEARCHBP_HEAP_STEPSIZE_IN_PERCENTAGE, - Double.class); - LOG.debug( - "searchbpHeapStepsizeInPercentage is {}", - searchbpHeapStepsizeInPercentage.getValue()); - } - - /** - * Whether or not to enable the policy. A disabled policy will not emit any actions. - * - * @return Whether or not to enable the policy - */ - public boolean isEnabled() { - return enabled.getValue(); - } - - public int getHourBreachThreshold() { - return hourBreachThreshold.getValue(); - } - - public int getHourMonitorWindowSizeMinutes() { - return HOUR_MONITOR_WINDOW_SIZE_MINUTES; - } - - public int getHourMonitorBucketSizeMinutes() { - return HOUR_MONITOR_BUCKET_SIZE_MINUTES; - } - - public double getSearchbpHeapStepsizeInPercentage() { - return searchbpHeapStepsizeInPercentage.getValue(); - } -} diff --git a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressureDecider.java b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressureDecider.java deleted file mode 100644 index 68fbb3370..000000000 --- a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressureDecider.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure; - - -import java.util.List; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.performanceanalyzer.AppContext; -import org.opensearch.performanceanalyzer.decisionmaker.actions.Action; -import org.opensearch.performanceanalyzer.decisionmaker.actions.SearchBackPressureAction; -import org.opensearch.performanceanalyzer.decisionmaker.deciders.Decider; -import org.opensearch.performanceanalyzer.decisionmaker.deciders.Decision; -import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf; -import org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure.SearchBackPressureClusterRCA; - -/** decider to change the dynamic settings of SearchBackPressure In-flight Cancellation */ -public class SearchBackPressureDecider extends Decider { - private static final Logger LOG = LogManager.getLogger(SearchBackPressureDecider.class); - public static final String NAME = "SearchBackPressureDecider"; - - private final SearchBackPressurePolicy searchBackPressurePolicy; - - private int currentIteration = 0; - private SearchBackPressureClusterRCA searchBackPressureClusterRCA; - - public SearchBackPressureDecider( - long evalIntervalSeconds, - int decisionFrequency, - SearchBackPressureClusterRCA searchBackPressureClusterRCA) { - super(evalIntervalSeconds, decisionFrequency); - this.searchBackPressureClusterRCA = searchBackPressureClusterRCA; - this.searchBackPressurePolicy = new SearchBackPressurePolicy(searchBackPressureClusterRCA); - LOG.debug("SearchBackPressureDecider created"); - } - - @Override - public String name() { - return NAME; - } - - @Override - public Decision operate() { - LOG.debug( - "SearchBackPressureDecider#2 operate() with currentIteration: {}", - currentIteration); - - Decision decision = new Decision(System.currentTimeMillis(), NAME); - currentIteration += 1; - if (currentIteration < decisionFrequency) { - return decision; - } - - // reset the currentIteration for next action emitting cycle - currentIteration = 0; - - // SearchBackPressure Policy is always accepted since Searchbp Decider only use the actions - // suggested by Searchbp Policy - List searchBackPressureActions = searchBackPressurePolicy.evaluate(); - - // loop through the actions and print the action threshold name, dimension, - // increase/decrease - searchBackPressureActions.stream() - .forEach( - (action) -> { - LOG.debug( - "searchBackPressureActions details, threshold name: {}, dimension: {}, increase/decrease: {}, stepsize: {}", - ((SearchBackPressureAction) action).getThresholdName(), - ((SearchBackPressureAction) action).getDimension(), - ((SearchBackPressureAction) action).getDirection(), - ((SearchBackPressureAction) action).getStepSizeInPercentage()); - }); - - searchBackPressureActions.forEach(decision::addAction); - - LOG.debug("decision action size is {}", decision.getActions().size()); - return decision; - } - - /* Read RCA Config to fill the dynamic threshold settings for the SearchBackPressure Service */ - @Override - public void readRcaConf(RcaConf conf) { - super.readRcaConf(conf); - searchBackPressurePolicy.setRcaConf(conf); - } - - /* Set AppContext for SearchBackPressurePolicy */ - @Override - public void setAppContext(final AppContext appContext) { - super.setAppContext(appContext); - searchBackPressurePolicy.setAppContext(appContext); - } -} diff --git a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java deleted file mode 100644 index 67bd14db3..000000000 --- a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java +++ /dev/null @@ -1,315 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure; - -import static org.opensearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.SEARCHBACKPRESSURE_SHARD; -import static org.opensearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.SEARCHBACKPRESSURE_TASK; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.TimeUnit; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.performanceanalyzer.AppContext; -import org.opensearch.performanceanalyzer.decisionmaker.actions.Action; -import org.opensearch.performanceanalyzer.decisionmaker.actions.SearchBackPressureAction; -import org.opensearch.performanceanalyzer.decisionmaker.deciders.DecisionPolicy; -import org.opensearch.performanceanalyzer.decisionmaker.deciders.configs.searchbackpressure.SearchBackPressurePolicyConfig; -import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.model.SearchBackPressureIssue; -import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.model.SearchBackPressureSearchTaskIssue; -import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.model.SearchBackPressureSearchTaskIssue.SearchbpTaskAlarmMonitorMapKeys; -import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.model.SearchBackPressureShardIssue; -import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.model.SearchBackPressureShardIssue.SearchbpShardAlarmMonitorMapKeys; -import org.opensearch.performanceanalyzer.grpc.Resource; -import org.opensearch.performanceanalyzer.rca.framework.api.aggregators.BucketizedSlidingWindowConfig; -import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; -import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotClusterSummary; -import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; -import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf; -import org.opensearch.performanceanalyzer.rca.framework.util.RcaConsts; -import org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure.SearchBackPressureClusterRCA; - -/** - * Decides if the SearchBackPressure threshold should be modified suggests actions to take to - * achieve improved performance. - */ -public class SearchBackPressurePolicy implements DecisionPolicy { - private static final Logger LOG = LogManager.getLogger(SearchBackPressurePolicy.class); - - // Default COOLOFF Period for the action (1 DAY) - private static final long DEAFULT_COOLOFF_PERIOD_IN_MILLIS = 24L * 60L * 60L * 1000L; - private static final String HEAP_THRESHOLD_STR = "heap_usage"; - private static final String SHARD_DIMENSION_STR = "SHARD"; - private static final String TASK_DIMENSION_STR = "TASK"; - private static final double DEFAULT_HEAP_CHANGE_IN_PERCENTAGE = 5.0; - - private static final Path SEARCHBP_DATA_FILE_PATH = - Paths.get(RcaConsts.CONFIG_DIR_PATH, "SearchBackPressurePolicy_heap"); - - /* TODO: Specify a path to store SearchBackpressurePolicy_Autotune Stats */ - - private AppContext appContext; - private RcaConf rcaConf; - private SearchBackPressurePolicyConfig policyConfig; - private SearchBackPressureClusterRCA searchBackPressureClusterRCA; - - /* Alarm for heap usage */ - static final List HEAP_SEARCHBP_SHARD_SIGNALS = - Lists.newArrayList(SEARCHBACKPRESSURE_SHARD); - static final List HEAP_SEARCHBP_TASK_SIGNALS = - Lists.newArrayList(SEARCHBACKPRESSURE_TASK); - - SearchBackPressureIssue searchBackPressureIssue; - - /* alarm monitors per threshold */ - // shard-level alarms - @VisibleForTesting SearchBpActionsAlarmMonitor searchBackPressureShardHeapIncreaseAlarm; - @VisibleForTesting SearchBpActionsAlarmMonitor searchBackPressureShardHeapDecreaseAlarm; - HashMap searchBackPressureShardAlarmMonitorMap; - - // task-level alarms - @VisibleForTesting SearchBpActionsAlarmMonitor searchBackPressureTaskHeapIncreaseAlarm; - @VisibleForTesting SearchBpActionsAlarmMonitor searchBackPressureTaskHeapDecreaseAlarm; - HashMap searchBackPressureTaskAlarmMonitorMap; - - public SearchBackPressurePolicy( - SearchBackPressureClusterRCA searchBackPressureClusterRCA, - SearchBpActionsAlarmMonitor searchBackPressureShardHeapIncreaseAlarm, - SearchBpActionsAlarmMonitor searchBackPressureShardHeapDecreaseAlarm, - SearchBpActionsAlarmMonitor searchBackPressureTaskHeapIncreaseAlarm, - SearchBpActionsAlarmMonitor searchBackPressureTaskHeapDecreaseAlarm) { - this.searchBackPressureClusterRCA = searchBackPressureClusterRCA; - this.searchBackPressureShardHeapIncreaseAlarm = searchBackPressureShardHeapIncreaseAlarm; - this.searchBackPressureShardHeapDecreaseAlarm = searchBackPressureShardHeapDecreaseAlarm; - this.searchBackPressureTaskHeapIncreaseAlarm = searchBackPressureTaskHeapIncreaseAlarm; - this.searchBackPressureTaskHeapDecreaseAlarm = searchBackPressureTaskHeapDecreaseAlarm; - } - - public SearchBackPressurePolicy(SearchBackPressureClusterRCA searchBackPressureClusterRCA) { - this(searchBackPressureClusterRCA, null, null, null, null); - } - - /** - * records issues which the policy cares about and discards others - * - * @param issue an issue with the application - */ - private void record(HotResourceSummary summary) { - if (HEAP_SEARCHBP_SHARD_SIGNALS.contains(summary.getResource())) { - searchBackPressureIssue = - new SearchBackPressureShardIssue( - summary, searchBackPressureShardAlarmMonitorMap); - searchBackPressureIssue.recordIssueBySummaryType(summary); - } - - if (HEAP_SEARCHBP_TASK_SIGNALS.contains(summary.getResource())) { - searchBackPressureIssue = - new SearchBackPressureSearchTaskIssue( - summary, searchBackPressureTaskAlarmMonitorMap); - searchBackPressureIssue.recordIssueBySummaryType(summary); - } - } - - /** gathers and records all issues observed in the application */ - private void recordIssues() { - LOG.debug("SearchBackPressurePolicy#recordIssues()"); - - if (searchBackPressureClusterRCA.getFlowUnits().isEmpty()) { - LOG.debug("No flow units in searchBackPressureClusterRCA"); - return; - } - - for (ResourceFlowUnit flowUnit : - searchBackPressureClusterRCA.getFlowUnits()) { - if (!flowUnit.hasResourceSummary()) { - continue; - } - - HotClusterSummary clusterSummary = flowUnit.getSummary(); - clusterSummary.getHotNodeSummaryList().stream() - .flatMap((nodeSummary) -> nodeSummary.getHotResourceSummaryList().stream()) - .forEach((resourceSummary) -> record(resourceSummary)); - } - } - - public boolean isShardHeapThresholdTooSmall() { - return !searchBackPressureShardHeapIncreaseAlarm.isHealthy(); - } - - public boolean isShardHeapThresholdTooLarge() { - return !searchBackPressureShardHeapDecreaseAlarm.isHealthy(); - } - - public boolean isTaskHeapThresholdTooSmall() { - return !searchBackPressureTaskHeapIncreaseAlarm.isHealthy(); - } - - public boolean isTaskHeapThresholdTooLarge() { - return !searchBackPressureTaskHeapDecreaseAlarm.isHealthy(); - } - - // create alarm monitor from config - public SearchBpActionsAlarmMonitor createAlarmMonitor(Path persistenceBasePath) { - LOG.debug( - "createAlarmMonitor with hour window: {}, bucket size: {}, hour threshold: {}, stepsize: {}", - policyConfig.getHourMonitorWindowSizeMinutes(), - policyConfig.getHourMonitorBucketSizeMinutes(), - policyConfig.getHourBreachThreshold(), - policyConfig.getSearchbpHeapStepsizeInPercentage()); - BucketizedSlidingWindowConfig hourMonitorConfig = - new BucketizedSlidingWindowConfig( - policyConfig.getHourMonitorWindowSizeMinutes(), - policyConfig.getHourMonitorBucketSizeMinutes(), - TimeUnit.MINUTES, - persistenceBasePath); - - // TODO: Check whether we need a persistence path to write our data - return new SearchBpActionsAlarmMonitor( - policyConfig.getHourBreachThreshold(), null, hourMonitorConfig); - } - - // initalize all alarm monitors - public void initialize() { - // initialize shard level alarm for resounce unit that suggests to increase jvm threshold - searchBackPressureShardHeapIncreaseAlarm = - initializeAlarmMonitor(searchBackPressureShardHeapIncreaseAlarm); - - // initialize shard level alarm for resounce unit that suggests to decrease jvm threshold - searchBackPressureShardHeapDecreaseAlarm = - initializeAlarmMonitor(searchBackPressureShardHeapDecreaseAlarm); - - // initialize task level alarm for resounce unit that suggests to increase jvm threshold - searchBackPressureTaskHeapIncreaseAlarm = - initializeAlarmMonitor(searchBackPressureTaskHeapIncreaseAlarm); - - // initialize task level alarm for resounce unit that suggests to decrease jvm threhsold - searchBackPressureTaskHeapDecreaseAlarm = - initializeAlarmMonitor(searchBackPressureTaskHeapDecreaseAlarm); - - initializeAlarmMonitorMap(); - } - - private SearchBpActionsAlarmMonitor initializeAlarmMonitor( - SearchBpActionsAlarmMonitor alarmMonitor) { - if (alarmMonitor == null) { - return createAlarmMonitor(SEARCHBP_DATA_FILE_PATH); - } else { - return alarmMonitor; - } - } - - private void initializeAlarmMonitorMap() { - // add shard level monitors to shardAlarmMonitorMap - searchBackPressureShardAlarmMonitorMap = new HashMap(); - searchBackPressureShardAlarmMonitorMap.put( - SearchbpShardAlarmMonitorMapKeys.SHARD_HEAP_INCREASE_ALARM.toString(), - searchBackPressureShardHeapIncreaseAlarm); - searchBackPressureShardAlarmMonitorMap.put( - SearchbpShardAlarmMonitorMapKeys.SHARD_HEAP_DECREASE_ALARM.toString(), - searchBackPressureShardHeapDecreaseAlarm); - - // add task level monitors to taskAlarmMonitorMap - searchBackPressureTaskAlarmMonitorMap = new HashMap(); - searchBackPressureTaskAlarmMonitorMap.put( - SearchbpTaskAlarmMonitorMapKeys.TASK_HEAP_INCREASE_ALARM.toString(), - searchBackPressureTaskHeapIncreaseAlarm); - searchBackPressureTaskAlarmMonitorMap.put( - SearchbpTaskAlarmMonitorMapKeys.TASK_HEAP_DECREASE_ALARM.toString(), - searchBackPressureTaskHeapDecreaseAlarm); - } - - @Override - public List evaluate() { - List actions = new ArrayList<>(); - if (rcaConf == null || appContext == null) { - LOG.error("rca conf/app context is null, return empty action list"); - return actions; - } - - policyConfig = rcaConf.getDeciderConfig().getSearchBackPressurePolicyConfig(); - if (!policyConfig.isEnabled()) { - LOG.debug("SearchBackPressurePolicy is disabled"); - return actions; - } - - initialize(); - - recordIssues(); - - checkShardAlarms(actions); - checkTaskAlarms(actions); - - // print current size of the actions - LOG.debug("SearchBackPressurePolicy#evaluate() action size: {}", actions.size()); - - return actions; - } - - private void checkShardAlarms(List actions) { - if (isShardHeapThresholdTooSmall()) { - LOG.debug("isShardHeapThresholdTooSmall action Added"); - actions.add( - new SearchBackPressureAction( - appContext, - true, - DEAFULT_COOLOFF_PERIOD_IN_MILLIS, - HEAP_THRESHOLD_STR, - SearchBackPressureAction.SearchbpDimension.SHARD, - SearchBackPressureAction.SearchbpThresholdActionDirection.INCREASE, - policyConfig.getSearchbpHeapStepsizeInPercentage())); - } else if (isShardHeapThresholdTooLarge()) { - LOG.debug("isShardHeapThresholdTooLarge action Added"); - actions.add( - new SearchBackPressureAction( - appContext, - true, - DEAFULT_COOLOFF_PERIOD_IN_MILLIS, - HEAP_THRESHOLD_STR, - SearchBackPressureAction.SearchbpDimension.SHARD, - SearchBackPressureAction.SearchbpThresholdActionDirection.DECREASE, - policyConfig.getSearchbpHeapStepsizeInPercentage())); - } - } - - private void checkTaskAlarms(List actions) { - if (isTaskHeapThresholdTooSmall()) { - LOG.debug("isTaskHeapThresholdTooSmall action Added"); - actions.add( - new SearchBackPressureAction( - appContext, - true, - DEAFULT_COOLOFF_PERIOD_IN_MILLIS, - HEAP_THRESHOLD_STR, - SearchBackPressureAction.SearchbpDimension.TASK, - SearchBackPressureAction.SearchbpThresholdActionDirection.INCREASE, - policyConfig.getSearchbpHeapStepsizeInPercentage())); - } else if (isTaskHeapThresholdTooLarge()) { - LOG.debug("isTaskHeapThresholdTooLarge action Added"); - actions.add( - new SearchBackPressureAction( - appContext, - true, - DEAFULT_COOLOFF_PERIOD_IN_MILLIS, - HEAP_THRESHOLD_STR, - SearchBackPressureAction.SearchbpDimension.TASK, - SearchBackPressureAction.SearchbpThresholdActionDirection.DECREASE, - policyConfig.getSearchbpHeapStepsizeInPercentage())); - } - } - - public void setAppContext(AppContext appContext) { - this.appContext = appContext; - } - - public void setRcaConf(final RcaConf rcaConf) { - this.rcaConf = rcaConf; - } -} diff --git a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBpActionsAlarmMonitor.java b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBpActionsAlarmMonitor.java deleted file mode 100644 index 666bb3a02..000000000 --- a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBpActionsAlarmMonitor.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure; - - -import com.google.common.annotations.VisibleForTesting; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.performanceanalyzer.decisionmaker.deciders.AlarmMonitor; -import org.opensearch.performanceanalyzer.rca.framework.api.aggregators.BucketizedSlidingWindow; -import org.opensearch.performanceanalyzer.rca.framework.api.aggregators.BucketizedSlidingWindowConfig; -import org.opensearch.performanceanalyzer.rca.framework.api.aggregators.SlidingWindowData; - -public class SearchBpActionsAlarmMonitor implements AlarmMonitor { - private static final Logger LOG = LogManager.getLogger(SearchBpActionsAlarmMonitor.class); - /* Current design uses hour monitor to evaluate the health of the searchbackpressure service - * if there are more than 30 bad resournce units in one hour, then the alarm shows a Unhealthy Signal - */ - - private static final int DEFAULT_HOUR_BREACH_THRESHOLD = 30; - private static final int DEFAULT_BUCKET_WINDOW_SIZE = 1; - private static final String HOUR_PREFIX = "hour-"; - - public static final int HOUR_MONITOR_BUCKET_WINDOW_MINUTES = 5; - - private BucketizedSlidingWindow hourMonitor; - private int hourBreachThreshold; - - private boolean alarmHealthy = true; - - @Override - public boolean isHealthy() { - evaluateAlarm(); - return alarmHealthy; - } - - public SearchBpActionsAlarmMonitor( - int hourBreachThreshold, - @Nullable Path persistencePath, - @Nullable BucketizedSlidingWindowConfig hourMonitorConfig) { - Path hourMonitorPath = null; - if (persistencePath != null) { - Path persistenceBase = persistencePath.getParent(); - Path persistenceFile = persistencePath.getFileName(); - if (persistenceBase != null && persistenceFile != null) { - hourMonitorPath = - Paths.get( - persistenceBase.toString(), - HOUR_PREFIX + persistenceFile.toString()); - } - } - // initialize hourly alarm monitor - if (hourMonitorConfig == null) { - /* - * Bucket Window Size means the number of issues can exist in a bucket - * when you consider about the size of the BucketizedSlidingWindow, the size is the - * number of buckets, not issues - */ - hourMonitor = - new BucketizedSlidingWindow( - (int) TimeUnit.HOURS.toMinutes(1), - DEFAULT_BUCKET_WINDOW_SIZE, - TimeUnit.MINUTES, - hourMonitorPath); - } else { - hourMonitor = new BucketizedSlidingWindow(hourMonitorConfig); - } - - this.hourBreachThreshold = hourBreachThreshold; - } - - public SearchBpActionsAlarmMonitor(int hourBreachThreshold, @Nullable Path persistencePath) { - this(hourBreachThreshold, persistencePath, null); - } - - public SearchBpActionsAlarmMonitor(int hourBreachThreshold) { - this(hourBreachThreshold, null, null); - } - - public SearchBpActionsAlarmMonitor(@Nullable Path persistencePath) { - this(DEFAULT_HOUR_BREACH_THRESHOLD, persistencePath); - } - - public SearchBpActionsAlarmMonitor() { - this(DEFAULT_HOUR_BREACH_THRESHOLD); - } - - @Override - public void recordIssue(long timeStamp, double value) { - SlidingWindowData dataPoint = new SlidingWindowData(timeStamp, value); - LOG.debug("Search Backpressure Actions Alarm is recording a new issue at {}", timeStamp); - hourMonitor.next(dataPoint); - } - - private void evaluateAlarm() { - if (alarmHealthy) { - if (hourMonitor.size() >= hourBreachThreshold) { - LOG.debug( - "Search Backpressure Actions Alarm is Unhealthy because hourMonitor.size() is {}, and threshold is {}", - hourMonitor.size(), - hourBreachThreshold); - alarmHealthy = false; - } - } else { - if (hourMonitor.size() == 0) { - LOG.debug("SearchBackpressure Hour Monitor is now healthy for zero capacity"); - alarmHealthy = true; - } - } - } - - public int getHourBreachThreshold() { - return hourBreachThreshold; - } - - @VisibleForTesting - BucketizedSlidingWindow getHourMonitor() { - return hourMonitor; - } - - @VisibleForTesting - void setAlarmHealth(boolean isHealthy) { - this.alarmHealthy = isHealthy; - } -} diff --git a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureIssue.java b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureIssue.java deleted file mode 100644 index 423ad429f..000000000 --- a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureIssue.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.model; - - -import java.util.HashMap; -import java.util.Map; -import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.SearchBpActionsAlarmMonitor; -import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; - -/* - * SearchBackPressureIssue is the interface for all types of SearchBackPressure Issue (e.g. issue caused by overflow of shard-level heap usage) - */ -public abstract class SearchBackPressureIssue { - public HotResourceSummary hotResourceSummary; - public Map actionsAlarmMonitorMap; - - // constructor - SearchBackPressureIssue( - HotResourceSummary hotResourceSummary, - HashMap actionsAlarmMonitorMap) { - this.hotResourceSummary = hotResourceSummary; - this.actionsAlarmMonitorMap = actionsAlarmMonitorMap; - } - - public abstract void recordIssueBySummaryType(HotResourceSummary hotResourceSummary); -} diff --git a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureSearchTaskIssue.java b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureSearchTaskIssue.java deleted file mode 100644 index 8169cc024..000000000 --- a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureSearchTaskIssue.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.model; - - -import java.util.HashMap; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.SearchBpActionsAlarmMonitor; -import org.opensearch.performanceanalyzer.rca.configs.SearchBackPressureRcaConfig; -import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; - -public class SearchBackPressureSearchTaskIssue extends SearchBackPressureIssue { - private static final Logger LOG = LogManager.getLogger(SearchBackPressureSearchTaskIssue.class); - - public SearchBackPressureSearchTaskIssue( - HotResourceSummary hotResourceSummary, - HashMap actionsAlarmMonitorMap) { - super(hotResourceSummary, actionsAlarmMonitorMap); - } - - @Override - public void recordIssueBySummaryType(HotResourceSummary summary) { - - if (summary.getMetaData() == SearchBackPressureRcaConfig.INCREASE_THRESHOLD_BY_JVM_STR) { - LOG.debug("recording increase-level issue for task"); - actionsAlarmMonitorMap - .get(SearchbpTaskAlarmMonitorMapKeys.TASK_HEAP_INCREASE_ALARM.toString()) - .recordIssue(); - } - - // decrease alarm for heap-related threshold - if (summary.getMetaData() == SearchBackPressureRcaConfig.DECREASE_THRESHOLD_BY_JVM_STR) { - LOG.debug("recording decrease-level issue for task"); - actionsAlarmMonitorMap - .get(SearchbpTaskAlarmMonitorMapKeys.TASK_HEAP_DECREASE_ALARM.toString()) - .recordIssue(); - } - } - - public enum SearchbpTaskAlarmMonitorMapKeys { - TASK_HEAP_INCREASE_ALARM( - SearchbpTaskAlarmMonitorMapKeys.Constants.TASK_HEAP_INCREASE_ALARM), - TASK_HEAP_DECREASE_ALARM( - SearchbpTaskAlarmMonitorMapKeys.Constants.TASK_HEAP_DECREASE_ALARM); - - private final String value; - - SearchbpTaskAlarmMonitorMapKeys(String value) { - this.value = value; - } - - @Override - public String toString() { - return value; - } - - public static class Constants { - public static final String TASK_HEAP_INCREASE_ALARM = - "searchBackPressureTaskHeapIncreaseAlarm"; - public static final String TASK_HEAP_DECREASE_ALARM = - "searchBackPressureTaskHeapDecreaseAlarm"; - } - } -} diff --git a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureShardIssue.java b/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureShardIssue.java deleted file mode 100644 index bb13e6b31..000000000 --- a/src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureShardIssue.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.model; - - -import java.util.HashMap; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.SearchBpActionsAlarmMonitor; -import org.opensearch.performanceanalyzer.rca.configs.SearchBackPressureRcaConfig; -import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; - -public class SearchBackPressureShardIssue extends SearchBackPressureIssue { - private static final Logger LOG = LogManager.getLogger(SearchBackPressureShardIssue.class); - - public SearchBackPressureShardIssue( - HotResourceSummary hotResourceSummary, - HashMap actionsAlarmMonitorMap) { - super(hotResourceSummary, actionsAlarmMonitorMap); - } - - @Override - public void recordIssueBySummaryType(HotResourceSummary summary) { - if (summary.getMetaData() == SearchBackPressureRcaConfig.INCREASE_THRESHOLD_BY_JVM_STR) { - LOG.debug("recording increase-level issue for shard"); - LOG.debug("size of the HashMap: {}", actionsAlarmMonitorMap.size()); - actionsAlarmMonitorMap - .get(SearchbpShardAlarmMonitorMapKeys.SHARD_HEAP_INCREASE_ALARM.toString()) - .recordIssue(); - } - - // decrease alarm for heap-related threshold - if (summary.getMetaData() == SearchBackPressureRcaConfig.DECREASE_THRESHOLD_BY_JVM_STR) { - LOG.debug("recording decrease-level issue for shard"); - actionsAlarmMonitorMap - .get(SearchbpShardAlarmMonitorMapKeys.SHARD_HEAP_DECREASE_ALARM.toString()) - .recordIssue(); - } - } - - public enum SearchbpShardAlarmMonitorMapKeys { - SHARD_HEAP_INCREASE_ALARM( - SearchbpShardAlarmMonitorMapKeys.Constants.SHARD_HEAP_DECREASE_ALARM), - SHARD_HEAP_DECREASE_ALARM( - SearchbpShardAlarmMonitorMapKeys.Constants.SHARD_HEAP_DECREASE_ALARM); - - private final String value; - - SearchbpShardAlarmMonitorMapKeys(String value) { - this.value = value; - } - - @Override - public String toString() { - return value; - } - - public static class Constants { - public static final String SHARD_HEAP_INCREASE_ALARM = - "searchBackPressureShardHeapIncreaseAlarm"; - public static final String SHARD_HEAP_DECREASE_ALARM = - "searchBackPressureShardHeapDecreaseAlarm"; - } - } -} diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java index a9423ccd0..1b11c014e 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java @@ -19,7 +19,6 @@ import org.opensearch.performanceanalyzer.decisionmaker.deciders.QueueHealthDecider; import org.opensearch.performanceanalyzer.decisionmaker.deciders.collator.Collator; import org.opensearch.performanceanalyzer.decisionmaker.deciders.jvm.HeapHealthDecider; -import org.opensearch.performanceanalyzer.decisionmaker.deciders.searchbackpressure.SearchBackPressureDecider; import org.opensearch.performanceanalyzer.metricsdb.MetricsDB; import org.opensearch.performanceanalyzer.plugins.PluginController; import org.opensearch.performanceanalyzer.plugins.PluginControllerConfig; @@ -444,11 +443,28 @@ public void construct() { shardRequestCacheClusterRca, highHeapUsageClusterRca)); - // SearchBackPressure RCA Decider - SearchBackPressureDecider searchBackPressureDecider = - buildSearchBackPressureDecider(heapMax, heapUsed, searchbp_Stats); + // Search Back Pressure Service RCA enabled + SearchBackPressureRCA searchBackPressureRCA = + new SearchBackPressureRCA(RCA_PERIOD, heapMax, heapUsed, searchbp_Stats); + searchBackPressureRCA.addTag( + RcaConsts.RcaTagConstants.TAG_LOCUS, + RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE); + searchBackPressureRCA.addAllUpstreams(Arrays.asList(heapMax, heapUsed, searchbp_Stats)); + + // Search Back Pressure Service Cluster RCA enabled + SearchBackPressureClusterRCA searchBackPressureClusterRCA = + new SearchBackPressureClusterRCA(RCA_PERIOD, searchBackPressureRCA); + searchBackPressureClusterRCA.addTag( + RcaConsts.RcaTagConstants.TAG_LOCUS, + RcaConsts.RcaTagConstants.LOCUS_CLUSTER_MANAGER_NODE); + searchBackPressureClusterRCA.addAllUpstreams( + Collections.singletonList(searchBackPressureRCA)); + searchBackPressureClusterRCA.addTag( + RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM, + RcaConsts.RcaTagConstants.LOCUS_DATA_NODE); + + // TODO: Add SearchBackPressure Decider - // AdmissionControl RCA Decider AdmissionControlDecider admissionControlDecider = buildAdmissionControlDecider(heapUsed, heapMax); @@ -462,8 +478,7 @@ public void construct() { queueHealthDecider, cacheHealthDecider, heapHealthDecider, - admissionControlDecider, - searchBackPressureDecider); + admissionControlDecider); collator.addTag( RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_CLUSTER_MANAGER_NODE); @@ -472,8 +487,7 @@ public void construct() { queueHealthDecider, cacheHealthDecider, heapHealthDecider, - admissionControlDecider, - searchBackPressureDecider)); + admissionControlDecider)); // Publisher - Executes decisions output from collator Publisher publisher = new Publisher(EVALUATION_INTERVAL_SECONDS, collator); @@ -488,40 +502,6 @@ public void construct() { pluginController.initPlugins(); } - private SearchBackPressureDecider buildSearchBackPressureDecider( - Metric heapMax, Metric heapUsed, Metric searchbp_Stats) { - // Enbale SearchBackPressure node-level RCA - SearchBackPressureRCA searchBackPressureRCA = - new SearchBackPressureRCA(RCA_PERIOD, heapMax, heapUsed, searchbp_Stats); - searchBackPressureRCA.addTag( - RcaConsts.RcaTagConstants.TAG_LOCUS, - RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE); - searchBackPressureRCA.addAllUpstreams(Arrays.asList(heapMax, heapUsed, searchbp_Stats)); - - // Enable SearchBackPressure cluster-level RCA - SearchBackPressureClusterRCA searchBackPressureClusterRCA = - new SearchBackPressureClusterRCA(RCA_PERIOD, searchBackPressureRCA); - searchBackPressureClusterRCA.addTag( - RcaConsts.RcaTagConstants.TAG_LOCUS, - RcaConsts.RcaTagConstants.LOCUS_CLUSTER_MANAGER_NODE); - searchBackPressureClusterRCA.addAllUpstreams( - Collections.singletonList(searchBackPressureRCA)); - searchBackPressureClusterRCA.addTag( - RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM, - RcaConsts.RcaTagConstants.LOCUS_DATA_NODE); - - // Enabel SearchBackPressureDecider - SearchBackPressureDecider searchBackPressureDecider = - new SearchBackPressureDecider( - EVALUATION_INTERVAL_SECONDS, RCA_PERIOD, searchBackPressureClusterRCA); - searchBackPressureDecider.addTag( - RcaConsts.RcaTagConstants.TAG_LOCUS, - RcaConsts.RcaTagConstants.LOCUS_CLUSTER_MANAGER_NODE); - searchBackPressureDecider.addAllUpstreams( - Collections.singletonList(searchBackPressureClusterRCA)); - return searchBackPressureDecider; - } - private AdmissionControlDecider buildAdmissionControlDecider(Metric heapUsed, Metric heapMax) { AdmissionControlRca admissionControlRca = new AdmissionControlRca(RCA_PERIOD, heapUsed, heapMax); diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/OldGenRca.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/OldGenRca.java index a7b2c25ac..e2056eec8 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/OldGenRca.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/OldGenRca.java @@ -6,10 +6,8 @@ package org.opensearch.performanceanalyzer.rca.store.rca; -import java.util.Deque; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jooq.Field; @@ -261,24 +259,33 @@ public double readMin() { * be implemented as minSlidingWindow or maxSlidingWindow depending on the need. */ public static class MinMaxSlidingWindow extends SlidingWindow { - BiConsumer, SlidingWindowData> nextElementFunc; + boolean isMinSlidingWindow; public MinMaxSlidingWindow( int SLIDING_WINDOW_SIZE_IN_TIMESTAMP, TimeUnit timeUnit, - BiConsumer, SlidingWindowData> nextElementFunc) { + boolean isMinSlidingWindow) { super(SLIDING_WINDOW_SIZE_IN_TIMESTAMP, timeUnit); - - // get the Biconsumer lambda function passed in - this.nextElementFunc = nextElementFunc; + this.isMinSlidingWindow = isMinSlidingWindow; } @Override public void next(SlidingWindowData e) { - // use the passed in lambda function to accept next element - nextElementFunc.accept(windowDeque, e); + if (isMinSlidingWindow) { + // monotonically decreasing sliding window + while (!windowDeque.isEmpty() + && windowDeque.peekFirst().getValue() >= e.getValue()) { + windowDeque.pollFirst(); + } + } else { + // monotonically increasing sliding window + while (!windowDeque.isEmpty() + && windowDeque.peekFirst().getValue() < e.getValue()) { + windowDeque.pollFirst(); + } + } - // evict elements in sliding window outside the sliding window size + windowDeque.addFirst(e); while (!windowDeque.isEmpty() && TimeUnit.MILLISECONDS.toSeconds( e.getTimeStamp() - windowDeque.peekLast().getTimeStamp()) diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureClusterRCA.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureClusterRCA.java index 2f2ea88a5..a7b95bada 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureClusterRCA.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureClusterRCA.java @@ -21,6 +21,5 @@ public class SearchBackPressureClusterRCA extends BaseClusterRca { public >> SearchBackPressureClusterRCA( final int rcaPeriod, final R SearchBackPressureRCA) { super(rcaPeriod, SearchBackPressureRCA); - LOG.info("SearchBackPressureClusterRCA enabeld."); } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java index 0457ccad1..d274e6f52 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java @@ -11,10 +11,8 @@ import java.time.Clock; import java.util.ArrayList; -import java.util.Deque; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jooq.Field; @@ -100,28 +98,6 @@ public class SearchBackPressureRCA extends Rca> // Current time protected Clock clock; - // lambda function to add nextElement to monotonically decreasing sliding window - BiConsumer, SlidingWindowData> minSlidingWindowNextElement = - (windowDeque, nextElement) -> { - while (!windowDeque.isEmpty() - && windowDeque.peekFirst().getValue() >= nextElement.getValue()) { - windowDeque.pollFirst(); - } - - windowDeque.addFirst(nextElement); - }; - - // lambda function to add nextElement to monotonically increasing sliding window - BiConsumer, SlidingWindowData> maxSlidingWindowNextElement = - (windowDeque, nextElement) -> { - while (!windowDeque.isEmpty() - && windowDeque.peekFirst().getValue() < nextElement.getValue()) { - windowDeque.pollFirst(); - } - - windowDeque.addFirst(nextElement); - }; - public SearchBackPressureRCA( final int rcaPeriod, final M heapMax, final M heapUsed, M searchbp_Stats) { super(EVAL_INTERVAL_IN_S); @@ -153,11 +129,9 @@ public SearchBackPressureRCA( // sliding window for heap usage this.minHeapUsageSlidingWindow = - new MinMaxSlidingWindow( - SLIDING_WINDOW_SIZE_IN_MINS, TimeUnit.MINUTES, minSlidingWindowNextElement); + new MinMaxSlidingWindow(SLIDING_WINDOW_SIZE_IN_MINS, TimeUnit.MINUTES, true); this.maxHeapUsageSlidingWindow = - new MinMaxSlidingWindow( - SLIDING_WINDOW_SIZE_IN_MINS, TimeUnit.MINUTES, maxSlidingWindowNextElement); + new MinMaxSlidingWindow(SLIDING_WINDOW_SIZE_IN_MINS, TimeUnit.MINUTES, false); // sliding window for JVM this.shardJVMCancellationSlidingWindow =