From 63dc6aa795f8ec78597c7f93e732b979b6963d8f Mon Sep 17 00:00:00 2001 From: Stephen Crawford <65832608+scrawfor99@users.noreply.github.com> Date: Thu, 22 Jun 2023 08:27:15 -0400 Subject: [PATCH] Fix #7972 SearchBackpressureIT flaky tests (#8063) * fix thread issue Signed-off-by: Stephen Crawford * fix thread issue Signed-off-by: Stephen Crawford * Fix thresholds Signed-off-by: Stephen Crawford * Swap to object based Signed-off-by: Stephen Crawford * Spotless Signed-off-by: Stephen Crawford * Swap to preserve nulls Signed-off-by: Stephen Crawford * Spotless Signed-off-by: Stephen Crawford * Resolve npe Signed-off-by: Stephen Crawford * remove final declerations Signed-off-by: Stephen Crawford * spotless Signed-off-by: Stephen Crawford * add annotations Signed-off-by: Stephen Crawford * push to rerun tests Signed-off-by: Stephen Crawford * Fix idea Signed-off-by: Stephen Crawford * Fix idea Signed-off-by: Stephen Crawford --------- Signed-off-by: Stephen Crawford --- .../backpressure/SearchBackpressureIT.java | 4 +- .../org/opensearch/tasks/CancellableTask.java | 85 +++++++++++-------- 2 files changed, 50 insertions(+), 39 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java index 1decd69ead7e3..a63c3287ea124 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/backpressure/SearchBackpressureIT.java @@ -151,7 +151,7 @@ public void testSearchShardTaskCancellationWithHighElapsedTime() throws Interrup public void testSearchTaskCancellationWithHighCpu() throws InterruptedException { Settings request = Settings.builder() .put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced") - .put(SearchTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 50) + .put(SearchTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 1000) .build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); @@ -182,7 +182,7 @@ public void testSearchTaskCancellationWithHighCpu() throws InterruptedException public void testSearchShardTaskCancellationWithHighCpu() throws InterruptedException { Settings request = Settings.builder() .put(SearchBackpressureSettings.SETTING_MODE.getKey(), "enforced") - .put(SearchShardTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 50) + .put(SearchShardTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 1000) .build(); assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(request).get()); diff --git a/server/src/main/java/org/opensearch/tasks/CancellableTask.java b/server/src/main/java/org/opensearch/tasks/CancellableTask.java index d32d04f006bbd..dc28c26700e6c 100644 --- a/server/src/main/java/org/opensearch/tasks/CancellableTask.java +++ b/server/src/main/java/org/opensearch/tasks/CancellableTask.java @@ -33,10 +33,10 @@ package org.opensearch.tasks; import org.opensearch.common.Nullable; +import org.opensearch.common.SetOnce; import org.opensearch.common.unit.TimeValue; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import static org.opensearch.search.SearchService.NO_TIMEOUT; @@ -47,17 +47,26 @@ */ public abstract class CancellableTask extends Task { - private volatile String reason; - private final AtomicBoolean cancelled = new AtomicBoolean(false); + private static class CancelledInfo { + String reason; + /** + * The time this task was cancelled as a wall clock time since epoch ({@link System#currentTimeMillis()} style). + */ + Long cancellationStartTime; + /** + * The time this task was cancelled as a relative time ({@link System#nanoTime()} style). + */ + Long cancellationStartTimeNanos; + + public CancelledInfo(String reason) { + this.reason = reason; + this.cancellationStartTime = System.currentTimeMillis(); + this.cancellationStartTimeNanos = System.nanoTime(); + } + } + + private final SetOnce cancelledInfo = new SetOnce<>(); private final TimeValue cancelAfterTimeInterval; - /** - * The time this task was cancelled as a wall clock time since epoch ({@link System#currentTimeMillis()} style). - */ - private Long cancellationStartTime = null; - /** - * The time this task was cancelled as a relative time ({@link System#nanoTime()} style). - */ - private Long cancellationStartTimeNanos = null; public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT); @@ -81,14 +90,29 @@ public CancellableTask( */ public void cancel(String reason) { assert reason != null; - if (cancelled.compareAndSet(false, true)) { - this.cancellationStartTime = System.currentTimeMillis(); - this.cancellationStartTimeNanos = System.nanoTime(); - this.reason = reason; + if (cancelledInfo.trySet(new CancelledInfo(reason))) { onCancelled(); } } + public boolean isCancelled() { + return cancelledInfo.get() != null; + } + + /** + * Returns true if this task can potentially have children that need to be cancelled when it parent is cancelled. + */ + public abstract boolean shouldCancelChildrenOnCancellation(); + + public TimeValue getCancellationTimeout() { + return cancelAfterTimeInterval; + } + + /** + * Called after the task is cancelled so that it can take any actions that it has to take. + */ + protected void onCancelled() {} + /** * Returns true if this task should be automatically cancelled if the coordinating node that * requested this task left the cluster. @@ -97,37 +121,24 @@ public boolean cancelOnParentLeaving() { return true; } + @Nullable public Long getCancellationStartTime() { - return cancellationStartTime; + CancelledInfo info = cancelledInfo.get(); + return (info != null) ? info.cancellationStartTime : null; } + @Nullable public Long getCancellationStartTimeNanos() { - return cancellationStartTimeNanos; - } - - /** - * Returns true if this task can potentially have children that need to be cancelled when it parent is cancelled. - */ - public abstract boolean shouldCancelChildrenOnCancellation(); - - public boolean isCancelled() { - return cancelled.get(); - } - - public TimeValue getCancellationTimeout() { - return cancelAfterTimeInterval; + CancelledInfo info = cancelledInfo.get(); + return (info != null) ? info.cancellationStartTimeNanos : null; } /** * The reason the task was cancelled or null if it hasn't been cancelled. */ @Nullable - public final String getReasonCancelled() { - return reason; + public String getReasonCancelled() { + CancelledInfo info = cancelledInfo.get(); + return (info != null) ? info.reason : null; } - - /** - * Called after the task is cancelled so that it can take any actions that it has to take. - */ - protected void onCancelled() {} }