diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java b/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java index 655267dcf..aa0ce8075 100644 --- a/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java +++ b/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java @@ -14,21 +14,18 @@ import static org.opensearch.action.DocWriteResponse.Result.CREATED; import static org.opensearch.action.DocWriteResponse.Result.UPDATED; import static org.opensearch.ad.AnomalyDetectorPlugin.AD_THREAD_POOL_NAME; -import static org.opensearch.ad.constant.CommonErrorMessages.CAN_NOT_FIND_LATEST_TASK; import static org.opensearch.ad.util.RestHandlerUtils.XCONTENT_WITH_TYPE; import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; import java.io.IOException; import java.time.Instant; -import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; @@ -37,15 +34,10 @@ import org.opensearch.ad.common.exception.AnomalyDetectionException; import org.opensearch.ad.common.exception.EndRunException; import org.opensearch.ad.common.exception.InternalFailure; -import org.opensearch.ad.common.exception.ResourceNotFoundException; -import org.opensearch.ad.indices.ADIndex; import org.opensearch.ad.indices.AnomalyDetectionIndices; import org.opensearch.ad.model.ADTaskState; +import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyDetectorJob; -import org.opensearch.ad.model.AnomalyResult; -import org.opensearch.ad.model.DetectorProfileName; -import org.opensearch.ad.model.FeatureData; -import org.opensearch.ad.model.IntervalTimeConfiguration; import org.opensearch.ad.rest.handler.AnomalyDetectorFunction; import org.opensearch.ad.settings.AnomalyDetectorSettings; import org.opensearch.ad.task.ADTaskManager; @@ -53,12 +45,7 @@ import org.opensearch.ad.transport.AnomalyResultRequest; import org.opensearch.ad.transport.AnomalyResultResponse; import org.opensearch.ad.transport.AnomalyResultTransportAction; -import org.opensearch.ad.transport.ProfileAction; -import org.opensearch.ad.transport.ProfileRequest; -import org.opensearch.ad.transport.handler.AnomalyIndexHandler; -import org.opensearch.ad.util.DiscoveryNodeFilterer; import org.opensearch.client.Client; -import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.NamedXContentRegistry; @@ -66,7 +53,6 @@ import org.opensearch.common.xcontent.XContentParser; import org.opensearch.common.xcontent.XContentType; import org.opensearch.commons.InjectSecurity; -import org.opensearch.commons.authuser.User; import org.opensearch.jobscheduler.spi.JobExecutionContext; import org.opensearch.jobscheduler.spi.LockModel; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; @@ -88,11 +74,11 @@ public class AnomalyDetectorJobRunner implements ScheduledJobRunner { private int maxRetryForEndRunException; private Client client; private ThreadPool threadPool; - private AnomalyIndexHandler anomalyResultHandler; private ConcurrentHashMap detectorEndRunExceptionCount; private AnomalyDetectionIndices anomalyDetectionIndices; - private DiscoveryNodeFilterer nodeFilter; private ADTaskManager adTaskManager; + private NodeStateManager nodeStateManager; + private ExecuteADResultResponseRecorder recorder; public static AnomalyDetectorJobRunner getJobRunnerInstance() { if (INSTANCE != null) { @@ -120,10 +106,6 @@ public void setThreadPool(ThreadPool threadPool) { this.threadPool = threadPool; } - public void setAnomalyResultHandler(AnomalyIndexHandler anomalyResultHandler) { - this.anomalyResultHandler = anomalyResultHandler; - } - public void setSettings(Settings settings) { this.settings = settings; this.maxRetryForEndRunException = AnomalyDetectorSettings.MAX_RETRY_FOR_END_RUN_EXCEPTION.get(settings); @@ -137,8 +119,12 @@ public void setAnomalyDetectionIndices(AnomalyDetectionIndices anomalyDetectionI this.anomalyDetectionIndices = anomalyDetectionIndices; } - public void setNodeFilter(DiscoveryNodeFilterer nodeFilter) { - this.nodeFilter = nodeFilter; + public void setNodeStateManager(NodeStateManager nodeStateManager) { + this.nodeStateManager = nodeStateManager; + } + + public void setExecuteADResultResponseRecorder(ExecuteADResultResponseRecorder recorder) { + this.recorder = recorder; } @Override @@ -159,28 +145,49 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont final LockService lockService = context.getLockService(); Runnable runnable = () -> { - if (jobParameter.getLockDurationSeconds() != null) { - lockService - .acquireLock( - jobParameter, - context, - ActionListener - .wrap(lock -> runAdJob(jobParameter, lockService, lock, detectionStartTime, executionStartTime), exception -> { - indexAnomalyResultException( - jobParameter, - lockService, - null, - detectionStartTime, - executionStartTime, - exception, - false - ); - throw new IllegalStateException("Failed to acquire lock for AD job: " + detectorId); - }) - ); - } else { - log.warn("Can't get lock for AD job: " + detectorId); - } + nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> { + if (!detectorOptional.isPresent()) { + log.error(new ParameterizedMessage("fail to get detector [{}]", detectorId)); + return; + } + AnomalyDetector detector = detectorOptional.get(); + + if (jobParameter.getLockDurationSeconds() != null) { + lockService + .acquireLock( + jobParameter, + context, + ActionListener + .wrap( + lock -> runAdJob( + jobParameter, + lockService, + lock, + detectionStartTime, + executionStartTime, + recorder, + detector + ), + exception -> { + indexAnomalyResultException( + jobParameter, + lockService, + null, + detectionStartTime, + executionStartTime, + exception, + false, + recorder, + detector + ); + throw new IllegalStateException("Failed to acquire lock for AD job: " + detectorId); + } + ) + ); + } else { + log.warn("Can't get lock for AD job: " + detectorId); + } + }, e -> log.error(new ParameterizedMessage("fail to get detector [{}]", detectorId), e))); }; ExecutorService executor = threadPool.executor(AD_THREAD_POOL_NAME); @@ -195,13 +202,17 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont * @param lock lock to run job * @param detectionStartTime detection start time * @param executionStartTime detection end time + * @param recorder utility to record job execution result + * @param detector associated detector accessor */ protected void runAdJob( AnomalyDetectorJob jobParameter, LockService lockService, LockModel lock, Instant detectionStartTime, - Instant executionStartTime + Instant executionStartTime, + ExecuteADResultResponseRecorder recorder, + AnomalyDetector detector ) { String detectorId = jobParameter.getName(); if (lock == null) { @@ -212,7 +223,9 @@ protected void runAdJob( detectionStartTime, executionStartTime, "Can't run AD job due to null lock", - false + false, + recorder, + detector ); return; } @@ -242,16 +255,38 @@ protected void runAdJob( } String resultIndex = jobParameter.getResultIndex(); if (resultIndex == null) { - runAnomalyDetectionJob(jobParameter, lockService, lock, detectionStartTime, executionStartTime, detectorId, user, roles); + runAnomalyDetectionJob( + jobParameter, + lockService, + lock, + detectionStartTime, + executionStartTime, + detectorId, + user, + roles, + recorder, + detector + ); return; } ActionListener listener = ActionListener.wrap(r -> { log.debug("Custom index is valid"); }, e -> { Exception exception = new EndRunException(detectorId, e.getMessage(), true); - handleAdException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, exception); + handleAdException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, exception, recorder, detector); }); anomalyDetectionIndices.validateCustomIndexForBackendJob(resultIndex, detectorId, user, roles, () -> { listener.onResponse(true); - runAnomalyDetectionJob(jobParameter, lockService, lock, detectionStartTime, executionStartTime, detectorId, user, roles); + runAnomalyDetectionJob( + jobParameter, + lockService, + lock, + detectionStartTime, + executionStartTime, + detectorId, + user, + roles, + recorder, + detector + ); }, listener); } @@ -263,7 +298,9 @@ private void runAnomalyDetectionJob( Instant executionStartTime, String detectorId, String user, - List roles + List roles, + ExecuteADResultResponseRecorder recorder, + AnomalyDetector detector ) { try (InjectSecurity injectSecurity = new InjectSecurity(detectorId, settings, client.threadPool().getThreadContext())) { @@ -282,15 +319,43 @@ private void runAnomalyDetectionJob( ActionListener .wrap( response -> { - indexAnomalyResult(jobParameter, lockService, lock, detectionStartTime, executionStartTime, response); + indexAnomalyResult( + jobParameter, + lockService, + lock, + detectionStartTime, + executionStartTime, + response, + recorder, + detector + ); }, exception -> { - handleAdException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, exception); + handleAdException( + jobParameter, + lockService, + lock, + detectionStartTime, + executionStartTime, + exception, + recorder, + detector + ); } ) ); } catch (Exception e) { - indexAnomalyResultException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, e, true); + indexAnomalyResultException( + jobParameter, + lockService, + lock, + detectionStartTime, + executionStartTime, + e, + true, + recorder, + detector + ); log.error("Failed to execute AD job " + detectorId, e); } } @@ -331,6 +396,8 @@ private void runAnomalyDetectionJob( * @param detectionStartTime detection start time * @param executionStartTime detection end time * @param exception exception + * @param recorder utility to record job execution result + * @param detector associated detector accessor */ protected void handleAdException( AnomalyDetectorJob jobParameter, @@ -338,7 +405,9 @@ protected void handleAdException( LockModel lock, Instant detectionStartTime, Instant executionStartTime, - Exception exception + Exception exception, + ExecuteADResultResponseRecorder recorder, + AnomalyDetector detector ) { String detectorId = jobParameter.getName(); if (exception instanceof EndRunException) { @@ -353,7 +422,9 @@ protected void handleAdException( lock, detectionStartTime, executionStartTime, - (EndRunException) exception + (EndRunException) exception, + recorder, + detector ); } else { detectorEndRunExceptionCount.compute(detectorId, (k, v) -> { @@ -378,7 +449,9 @@ protected void handleAdException( lock, detectionStartTime, executionStartTime, - (EndRunException) exception + (EndRunException) exception, + recorder, + detector ); return; } @@ -389,7 +462,9 @@ protected void handleAdException( detectionStartTime, executionStartTime, exception.getMessage(), - true + true, + recorder, + detector ); } } else { @@ -399,7 +474,17 @@ protected void handleAdException( } else { log.error("Failed to execute anomaly result action for " + detectorId, exception); } - indexAnomalyResultException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, exception, true); + indexAnomalyResultException( + jobParameter, + lockService, + lock, + detectionStartTime, + executionStartTime, + exception, + true, + recorder, + detector + ); } } @@ -409,7 +494,9 @@ private void stopAdJobForEndRunException( LockModel lock, Instant detectionStartTime, Instant executionStartTime, - EndRunException exception + EndRunException exception, + ExecuteADResultResponseRecorder recorder, + AnomalyDetector detector ) { String detectorId = jobParameter.getName(); detectorEndRunExceptionCount.remove(detectorId); @@ -427,7 +514,9 @@ private void stopAdJobForEndRunException( executionStartTime, error, true, - ADTaskState.STOPPED.name() + ADTaskState.STOPPED.name(), + recorder, + detector ) ); } @@ -491,81 +580,22 @@ private void indexAnomalyResult( LockModel lock, Instant detectionStartTime, Instant executionStartTime, - AnomalyResultResponse response + AnomalyResultResponse response, + ExecuteADResultResponseRecorder recorder, + AnomalyDetector detector ) { String detectorId = jobParameter.getName(); detectorEndRunExceptionCount.remove(detectorId); try { - // skipping writing to the result index if not necessary - // For a single-entity detector, the result is not useful if error is null - // and rcf score (thus anomaly grade/confidence) is null. - // For a HCAD detector, we don't need to save on the detector level. - // We return 0 or Double.NaN rcf score if there is no error. - if ((response.getAnomalyScore() <= 0 || Double.isNaN(response.getAnomalyScore())) && response.getError() == null) { - updateRealtimeTask(response, detectorId); - return; - } - IntervalTimeConfiguration windowDelay = (IntervalTimeConfiguration) jobParameter.getWindowDelay(); - Instant dataStartTime = detectionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit()); - Instant dataEndTime = executionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit()); - User user = jobParameter.getUser(); - - if (response.getError() != null) { - log.info("Anomaly result action run successfully for {} with error {}", detectorId, response.getError()); - } - - AnomalyResult anomalyResult = response - .toAnomalyResult( - detectorId, - dataStartTime, - dataEndTime, - executionStartTime, - Instant.now(), - anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT), - user, - response.getError() - ); - - String resultIndex = jobParameter.getResultIndex(); - anomalyResultHandler.index(anomalyResult, detectorId, resultIndex); - updateRealtimeTask(response, detectorId); + recorder.indexAnomalyResult(detectionStartTime, executionStartTime, response, detector); } catch (EndRunException e) { - handleAdException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, e); + handleAdException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, e, recorder, detector); } catch (Exception e) { log.error("Failed to index anomaly result for " + detectorId, e); } finally { releaseLock(jobParameter, lockService, lock); } - } - private void updateRealtimeTask(AnomalyResultResponse response, String detectorId) { - if (response.isHCDetector() != null - && response.isHCDetector() - && !adTaskManager.skipUpdateHCRealtimeTask(detectorId, response.getError())) { - DiscoveryNode[] dataNodes = nodeFilter.getEligibleDataNodes(); - Set profiles = new HashSet<>(); - profiles.add(DetectorProfileName.INIT_PROGRESS); - ProfileRequest profileRequest = new ProfileRequest(detectorId, profiles, true, dataNodes); - client.execute(ProfileAction.INSTANCE, profileRequest, ActionListener.wrap(r -> { - log.debug("Update latest realtime task for HC detector {}, total updates: {}", detectorId, r.getTotalUpdates()); - updateLatestRealtimeTask( - detectorId, - null, - r.getTotalUpdates(), - response.getDetectorIntervalInMinutes(), - response.getError() - ); - }, e -> { log.error("Failed to update latest realtime task for " + detectorId, e); })); - } else { - log.debug("Update latest realtime task for SINGLE detector {}, total updates: {}", detectorId, response.getRcfTotalUpdates()); - updateLatestRealtimeTask( - detectorId, - null, - response.getRcfTotalUpdates(), - response.getDetectorIntervalInMinutes(), - response.getError() - ); - } } private void indexAnomalyResultException( @@ -575,13 +605,25 @@ private void indexAnomalyResultException( Instant detectionStartTime, Instant executionStartTime, Exception exception, - boolean releaseLock + boolean releaseLock, + ExecuteADResultResponseRecorder recorder, + AnomalyDetector detector ) { try { String errorMessage = exception instanceof AnomalyDetectionException ? exception.getMessage() : Throwables.getStackTraceAsString(exception); - indexAnomalyResultException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, errorMessage, releaseLock); + indexAnomalyResultException( + jobParameter, + lockService, + lock, + detectionStartTime, + executionStartTime, + errorMessage, + releaseLock, + recorder, + detector + ); } catch (Exception e) { log.error("Failed to index anomaly result for " + jobParameter.getName(), e); } @@ -594,7 +636,9 @@ private void indexAnomalyResultException( Instant detectionStartTime, Instant executionStartTime, String errorMessage, - boolean releaseLock + boolean releaseLock, + ExecuteADResultResponseRecorder recorder, + AnomalyDetector detector ) { indexAnomalyResultException( jobParameter, @@ -604,7 +648,9 @@ private void indexAnomalyResultException( executionStartTime, errorMessage, releaseLock, - null + null, + recorder, + detector ); } @@ -616,40 +662,12 @@ private void indexAnomalyResultException( Instant executionStartTime, String errorMessage, boolean releaseLock, - String taskState + String taskState, + ExecuteADResultResponseRecorder recorder, + AnomalyDetector detector ) { - String detectorId = jobParameter.getName(); try { - IntervalTimeConfiguration windowDelay = (IntervalTimeConfiguration) jobParameter.getWindowDelay(); - Instant dataStartTime = detectionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit()); - Instant dataEndTime = executionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit()); - User user = jobParameter.getUser(); - - AnomalyResult anomalyResult = new AnomalyResult( - detectorId, - null, // no task id - new ArrayList(), - dataStartTime, - dataEndTime, - executionStartTime, - Instant.now(), - errorMessage, - null, // single-stream detectors have no entity - user, - anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT), - null // no model id - ); - String resultIndex = jobParameter.getResultIndex(); - if (resultIndex != null && !anomalyDetectionIndices.doesIndexExist(resultIndex)) { - // Set result index as null, will write exception to default result index. - anomalyResultHandler.index(anomalyResult, detectorId, null); - } else { - anomalyResultHandler.index(anomalyResult, detectorId, resultIndex); - } - - updateLatestRealtimeTask(detectorId, taskState, null, null, errorMessage); - } catch (Exception e) { - log.error("Failed to index anomaly result for " + detectorId, e); + recorder.indexAnomalyResultException(detectionStartTime, executionStartTime, errorMessage, taskState, detector); } finally { if (releaseLock) { releaseLock(jobParameter, lockService, lock); @@ -657,37 +675,6 @@ private void indexAnomalyResultException( } } - private void updateLatestRealtimeTask( - String detectorId, - String taskState, - Long rcfTotalUpdates, - Long detectorIntervalInMinutes, - String error - ) { - // Don't need info as this will be printed repeatedly in each interval - adTaskManager - .updateLatestRealtimeTaskOnCoordinatingNode( - detectorId, - taskState, - rcfTotalUpdates, - detectorIntervalInMinutes, - error, - ActionListener.wrap(r -> { - if (r != null) { - log.debug("Updated latest realtime task successfully for detector {}, taskState: {}", detectorId, taskState); - } - }, e -> { - if ((e instanceof ResourceNotFoundException) && e.getMessage().contains(CAN_NOT_FIND_LATEST_TASK)) { - // Clear realtime task cache, will recreate AD task in next run, check AnomalyResultTransportAction. - log.error("Can't find latest realtime task of detector " + detectorId); - adTaskManager.removeRealtimeTaskCache(detectorId); - } else { - log.error("Failed to update latest realtime task for detector " + detectorId, e); - } - }) - ); - } - private void releaseLock(AnomalyDetectorJob jobParameter, LockService lockService, LockModel lock) { lockService .release( diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java b/src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java index b21d11cc5..cace64e3f 100644 --- a/src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java @@ -234,11 +234,12 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip private ClientUtil clientUtil; private DiscoveryNodeFilterer nodeFilter; private IndexUtils indexUtils; - private ADTaskCacheManager adTaskCacheManager; private ADTaskManager adTaskManager; private ADBatchTaskRunner adBatchTaskRunner; // package private for testing GenericObjectPool serializeRCFBufferPool; + private NodeStateManager stateManager; + private ExecuteADResultResponseRecorder adResultResponseRecorder; static { SpecialPermission.check(); @@ -259,25 +260,14 @@ public List getRestHandlers( IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster ) { - AnomalyIndexHandler anomalyResultHandler = new AnomalyIndexHandler( - client, - settings, - threadPool, - CommonName.ANOMALY_RESULT_INDEX_ALIAS, - anomalyDetectionIndices, - this.clientUtil, - this.indexUtils, - clusterService - ); - AnomalyDetectorJobRunner jobRunner = AnomalyDetectorJobRunner.getJobRunnerInstance(); jobRunner.setClient(client); jobRunner.setThreadPool(threadPool); - jobRunner.setAnomalyResultHandler(anomalyResultHandler); jobRunner.setSettings(settings); jobRunner.setAnomalyDetectionIndices(anomalyDetectionIndices); - jobRunner.setNodeFilter(nodeFilter); jobRunner.setAdTaskManager(adTaskManager); + jobRunner.setNodeStateManager(stateManager); + jobRunner.setExecuteADResultResponseRecorder(adResultResponseRecorder); RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(); RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(settings, clusterService); @@ -383,7 +373,7 @@ public Collection createComponents( adCircuitBreakerService ); - NodeStateManager stateManager = new NodeStateManager( + stateManager = new NodeStateManager( client, xContentRegistry, settings, @@ -568,7 +558,8 @@ public PooledObject wrap(LinkedBuffer obj) { AnomalyDetectorSettings.QUEUE_MAINTENANCE, entityColdStarter, AnomalyDetectorSettings.HOURLY_MAINTENANCE, - stateManager + stateManager, + cacheProvider ); ModelManager modelManager = new ModelManager( @@ -714,7 +705,7 @@ public PooledObject wrap(LinkedBuffer obj) { anomalyDetectorRunner = new AnomalyDetectorRunner(modelManager, featureManager, AnomalyDetectorSettings.MAX_PREVIEW_RESULTS); - adTaskCacheManager = new ADTaskCacheManager(settings, clusterService, memoryTracker); + ADTaskCacheManager adTaskCacheManager = new ADTaskCacheManager(settings, clusterService, memoryTracker); adTaskManager = new ADTaskManager( settings, clusterService, @@ -754,6 +745,26 @@ public PooledObject wrap(LinkedBuffer obj) { ADSearchHandler adSearchHandler = new ADSearchHandler(settings, clusterService, client); + AnomalyIndexHandler anomalyResultHandler = new AnomalyIndexHandler( + client, + settings, + threadPool, + CommonName.ANOMALY_RESULT_INDEX_ALIAS, + anomalyDetectionIndices, + this.clientUtil, + this.indexUtils, + clusterService + ); + + adResultResponseRecorder = new ExecuteADResultResponseRecorder( + anomalyDetectionIndices, + anomalyResultHandler, + adTaskManager, + nodeFilter, + threadPool, + client + ); + // return objects used by Guice to inject dependencies for e.g., // transport action handler constructors return ImmutableList @@ -795,7 +806,8 @@ public PooledObject wrap(LinkedBuffer obj) { checkpointWriteQueue, coldEntityQueue, entityColdStarter, - adTaskCacheManager + adTaskCacheManager, + adResultResponseRecorder ); } diff --git a/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java b/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java new file mode 100644 index 000000000..19710f0cb --- /dev/null +++ b/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java @@ -0,0 +1,288 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.ad; + +import static org.opensearch.ad.constant.CommonErrorMessages.CAN_NOT_FIND_LATEST_TASK; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.ad.common.exception.EndRunException; +import org.opensearch.ad.common.exception.ResourceNotFoundException; +import org.opensearch.ad.constant.CommonErrorMessages; +import org.opensearch.ad.indices.ADIndex; +import org.opensearch.ad.indices.AnomalyDetectionIndices; +import org.opensearch.ad.model.AnomalyDetector; +import org.opensearch.ad.model.AnomalyResult; +import org.opensearch.ad.model.DetectorProfileName; +import org.opensearch.ad.model.FeatureData; +import org.opensearch.ad.model.IntervalTimeConfiguration; +import org.opensearch.ad.task.ADTaskManager; +import org.opensearch.ad.transport.AnomalyResultResponse; +import org.opensearch.ad.transport.ProfileAction; +import org.opensearch.ad.transport.ProfileRequest; +import org.opensearch.ad.transport.RCFPollingAction; +import org.opensearch.ad.transport.RCFPollingRequest; +import org.opensearch.ad.transport.handler.AnomalyIndexHandler; +import org.opensearch.ad.util.DiscoveryNodeFilterer; +import org.opensearch.client.Client; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.commons.authuser.User; +import org.opensearch.threadpool.ThreadPool; + +public class ExecuteADResultResponseRecorder { + private static final Logger log = LogManager.getLogger(ExecuteADResultResponseRecorder.class); + + private AnomalyDetectionIndices anomalyDetectionIndices; + private AnomalyIndexHandler anomalyResultHandler; + private ADTaskManager adTaskManager; + private DiscoveryNodeFilterer nodeFilter; + private ThreadPool threadPool; + private Client client; + + public ExecuteADResultResponseRecorder( + AnomalyDetectionIndices anomalyDetectionIndices, + AnomalyIndexHandler anomalyResultHandler, + ADTaskManager adTaskManager, + DiscoveryNodeFilterer nodeFilter, + ThreadPool threadPool, + Client client + ) { + this.anomalyDetectionIndices = anomalyDetectionIndices; + this.anomalyResultHandler = anomalyResultHandler; + this.adTaskManager = adTaskManager; + this.nodeFilter = nodeFilter; + this.threadPool = threadPool; + this.client = client; + } + + public void indexAnomalyResult( + Instant detectionStartTime, + Instant executionStartTime, + AnomalyResultResponse response, + AnomalyDetector detector + ) { + String detectorId = detector.getDetectorId(); + try { + // skipping writing to the result index if not necessary + // For a single-entity detector, the result is not useful if error is null + // and rcf score (thus anomaly grade/confidence) is null. + // For a HCAD detector, we don't need to save on the detector level. + // We return 0 or Double.NaN rcf score if there is no error. + if ((response.getAnomalyScore() <= 0 || Double.isNaN(response.getAnomalyScore())) && response.getError() == null) { + updateRealtimeTask(response, detectorId); + return; + } + IntervalTimeConfiguration windowDelay = (IntervalTimeConfiguration) detector.getWindowDelay(); + Instant dataStartTime = detectionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit()); + Instant dataEndTime = executionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit()); + User user = detector.getUser(); + + if (response.getError() != null) { + log.info("Anomaly result action run successfully for {} with error {}", detectorId, response.getError()); + } + + AnomalyResult anomalyResult = response + .toAnomalyResult( + detectorId, + dataStartTime, + dataEndTime, + executionStartTime, + Instant.now(), + anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT), + user, + response.getError() + ); + + String resultIndex = detector.getResultIndex(); + anomalyResultHandler.index(anomalyResult, detectorId, resultIndex); + updateRealtimeTask(response, detectorId); + } catch (EndRunException e) { + throw e; + } catch (Exception e) { + log.error("Failed to index anomaly result for " + detectorId, e); + } + } + + /** + * Update real time task (one document per detector in state index). If the real-time task has no changes compared with local cache, + * the task won't update. Task only updates when the state changed, or any error happened, or AD job stopped. Task is mainly consumed + * by the front-end to track detector status. For single-stream detectors, we embed model total updates in AnomalyResultResponse and + * update state accordingly. For HCAD, we won't wait for model finishing updating before returning a response to the job scheduler + * since it might be long before all entities finish execution. So we don't embed model total updates in AnomalyResultResponse. + * Instead, we issue a profile request to poll each model node and get the maximum total updates among all models. + * @param response response returned from executing AnomalyResultAction + * @param detectorId Detector Id + */ + private void updateRealtimeTask(AnomalyResultResponse response, String detectorId) { + if (response.isHCDetector() != null && response.isHCDetector()) { + if (adTaskManager.skipUpdateHCRealtimeTask(detectorId, response.getError())) { + return; + } + DiscoveryNode[] dataNodes = nodeFilter.getEligibleDataNodes(); + Set profiles = new HashSet<>(); + profiles.add(DetectorProfileName.INIT_PROGRESS); + ProfileRequest profileRequest = new ProfileRequest(detectorId, profiles, true, dataNodes); + Runnable profileHCInitProgress = () -> { + client.execute(ProfileAction.INSTANCE, profileRequest, ActionListener.wrap(r -> { + log.debug("Update latest realtime task for HC detector {}, total updates: {}", detectorId, r.getTotalUpdates()); + updateLatestRealtimeTask( + detectorId, + null, + r.getTotalUpdates(), + response.getDetectorIntervalInMinutes(), + response.getError() + ); + }, e -> { log.error("Failed to update latest realtime task for " + detectorId, e); })); + }; + if (!adTaskManager.isHCRealtimeTaskStartInitializing(detectorId)) { + // real time init progress is 0 may mean this is a newly started detector + // Delay real time cache update by one minute. If we are in init status, the delay may give the model training time to + // finish. We can change the detector running immediately instead of waiting for the next interval. + threadPool.schedule(profileHCInitProgress, new TimeValue(60, TimeUnit.SECONDS), AnomalyDetectorPlugin.AD_THREAD_POOL_NAME); + } else { + profileHCInitProgress.run(); + } + + } else { + log + .debug( + "Update latest realtime task for single stream detector {}, total updates: {}", + detectorId, + response.getRcfTotalUpdates() + ); + updateLatestRealtimeTask( + detectorId, + null, + response.getRcfTotalUpdates(), + response.getDetectorIntervalInMinutes(), + response.getError() + ); + } + } + + private void updateLatestRealtimeTask( + String detectorId, + String taskState, + Long rcfTotalUpdates, + Long detectorIntervalInMinutes, + String error + ) { + // Don't need info as this will be printed repeatedly in each interval + adTaskManager + .updateLatestRealtimeTaskOnCoordinatingNode( + detectorId, + taskState, + rcfTotalUpdates, + detectorIntervalInMinutes, + error, + ActionListener.wrap(r -> { + if (r != null) { + log.debug("Updated latest realtime task successfully for detector {}, taskState: {}", detectorId, taskState); + } + }, e -> { + if ((e instanceof ResourceNotFoundException) && e.getMessage().contains(CAN_NOT_FIND_LATEST_TASK)) { + // Clear realtime task cache, will recreate AD task in next run, check AnomalyResultTransportAction. + log.error("Can't find latest realtime task of detector " + detectorId); + adTaskManager.removeRealtimeTaskCache(detectorId); + } else { + log.error("Failed to update latest realtime task for detector " + detectorId, e); + } + }) + ); + } + + /** + * The function is not only indexing the result with the exception, but also updating the task state after + * 60s if the exception is related to cold start (index not found exceptions) for a single stream detector. + * + * @param detectionStartTime execution start time + * @param executionStartTime execution end time + * @param errorMessage Error message to record + * @param taskState AD task state (e.g., stopped) + * @param detector Detector config accessor + */ + public void indexAnomalyResultException( + Instant detectionStartTime, + Instant executionStartTime, + String errorMessage, + String taskState, + AnomalyDetector detector + ) { + String detectorId = detector.getDetectorId(); + try { + IntervalTimeConfiguration windowDelay = (IntervalTimeConfiguration) detector.getWindowDelay(); + Instant dataStartTime = detectionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit()); + Instant dataEndTime = executionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit()); + User user = detector.getUser(); + + AnomalyResult anomalyResult = new AnomalyResult( + detectorId, + null, // no task id + new ArrayList(), + dataStartTime, + dataEndTime, + executionStartTime, + Instant.now(), + errorMessage, + null, // single-stream detectors have no entity + user, + anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT), + null // no model id + ); + String resultIndex = detector.getResultIndex(); + if (resultIndex != null && !anomalyDetectionIndices.doesIndexExist(resultIndex)) { + // Set result index as null, will write exception to default result index. + anomalyResultHandler.index(anomalyResult, detectorId, null); + } else { + anomalyResultHandler.index(anomalyResult, detectorId, resultIndex); + } + + if (errorMessage.contains(CommonErrorMessages.NO_CHECKPOINT_ERR_MSG) && !detector.isMultiCategoryDetector()) { + // single stream detector raises ResourceNotFoundException containing CommonErrorMessages.NO_CHECKPOINT_ERR_MSG + // when there is no checkpoint. + // Delay real time cache update by one minute so we will have trained models by then and update the state + // document accordingly. + threadPool.schedule(() -> { + RCFPollingRequest request = new RCFPollingRequest(detectorId); + client.execute(RCFPollingAction.INSTANCE, request, ActionListener.wrap(rcfPollResponse -> { + long totalUpdates = rcfPollResponse.getTotalUpdates(); + // if there are updates, don't record failures + updateLatestRealtimeTask( + detectorId, + taskState, + totalUpdates, + detector.getDetectorIntervalInMinutes(), + totalUpdates > 0 ? "" : errorMessage + ); + }, e -> { + log.error("Fail to execute RCFRollingAction", e); + updateLatestRealtimeTask(detectorId, taskState, null, null, errorMessage); + })); + }, new TimeValue(60, TimeUnit.SECONDS), AnomalyDetectorPlugin.AD_THREAD_POOL_NAME); + } else { + updateLatestRealtimeTask(detectorId, taskState, null, null, errorMessage); + } + + } catch (Exception e) { + log.error("Failed to index anomaly result for " + detectorId, e); + } + } + +} diff --git a/src/main/java/org/opensearch/ad/caching/PriorityCache.java b/src/main/java/org/opensearch/ad/caching/PriorityCache.java index 0e94663d2..dfae6b931 100644 --- a/src/main/java/org/opensearch/ad/caching/PriorityCache.java +++ b/src/main/java/org/opensearch/ad/caching/PriorityCache.java @@ -55,6 +55,7 @@ import org.opensearch.ad.ratelimit.CheckpointMaintainWorker; import org.opensearch.ad.ratelimit.CheckpointWriteWorker; import org.opensearch.ad.settings.AnomalyDetectorSettings; +import org.opensearch.ad.settings.EnabledSetting; import org.opensearch.ad.util.DateUtils; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Strings; @@ -161,29 +162,31 @@ public ModelState get(String modelId, AnomalyDetector detector) { // during maintenance period, stop putting new entries if (!maintenanceLock.isLocked() && modelState == null) { - DoorKeeper doorKeeper = doorKeepers - .computeIfAbsent( - detectorId, - id -> { - // reset every 60 intervals - return new DoorKeeper( - AnomalyDetectorSettings.DOOR_KEEPER_FOR_CACHE_MAX_INSERTION, - AnomalyDetectorSettings.DOOR_KEEPER_FAULSE_POSITIVE_RATE, - detector.getDetectionIntervalDuration().multipliedBy(AnomalyDetectorSettings.DOOR_KEEPER_MAINTENANCE_FREQ), - clock - ); - } - ); - - // first hit, ignore - // since door keeper may get reset during maintenance, it is possible - // the entity is still active even though door keeper has no record of - // this model Id. We have to call isActive method to make sure. Otherwise, - // the entity might miss an anomaly result every 60 intervals due to door keeper - // reset. - if (!doorKeeper.mightContain(modelId) && !isActive(detectorId, modelId)) { - doorKeeper.put(modelId); - return null; + if (EnabledSetting.isDoorKeeperInCacheEnabled()) { + DoorKeeper doorKeeper = doorKeepers + .computeIfAbsent( + detectorId, + id -> { + // reset every 60 intervals + return new DoorKeeper( + AnomalyDetectorSettings.DOOR_KEEPER_FOR_CACHE_MAX_INSERTION, + AnomalyDetectorSettings.DOOR_KEEPER_FAULSE_POSITIVE_RATE, + detector.getDetectionIntervalDuration().multipliedBy(AnomalyDetectorSettings.DOOR_KEEPER_MAINTENANCE_FREQ), + clock + ); + } + ); + + // first hit, ignore + // since door keeper may get reset during maintenance, it is possible + // the entity is still active even though door keeper has no record of + // this model Id. We have to call isActive method to make sure. Otherwise, + // the entity might miss an anomaly result every 60 intervals due to door keeper + // reset. + if (!doorKeeper.mightContain(modelId) && !isActive(detectorId, modelId)) { + doorKeeper.put(modelId); + return null; + } } try { diff --git a/src/main/java/org/opensearch/ad/model/AnomalyDetector.java b/src/main/java/org/opensearch/ad/model/AnomalyDetector.java index 2c069d703..ab92d2f76 100644 --- a/src/main/java/org/opensearch/ad/model/AnomalyDetector.java +++ b/src/main/java/org/opensearch/ad/model/AnomalyDetector.java @@ -123,6 +123,7 @@ public class AnomalyDetector implements Writeable, ToXContentObject { private DetectionDateRange detectionDateRange; public static final int MAX_RESULT_INDEX_NAME_SIZE = 255; + // OS doesn’t allow uppercase: https://tinyurl.com/yse2xdbx public static final String RESULT_INDEX_NAME_PATTERN = "[a-z0-9_-]+"; /** diff --git a/src/main/java/org/opensearch/ad/ratelimit/EntityColdStartWorker.java b/src/main/java/org/opensearch/ad/ratelimit/EntityColdStartWorker.java index c1166d4d9..8702fafcc 100644 --- a/src/main/java/org/opensearch/ad/ratelimit/EntityColdStartWorker.java +++ b/src/main/java/org/opensearch/ad/ratelimit/EntityColdStartWorker.java @@ -22,13 +22,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; import org.opensearch.ad.NodeStateManager; import org.opensearch.ad.breaker.ADCircuitBreakerService; +import org.opensearch.ad.caching.CacheProvider; import org.opensearch.ad.ml.EntityColdStarter; import org.opensearch.ad.ml.EntityModel; import org.opensearch.ad.ml.ModelManager.ModelType; import org.opensearch.ad.ml.ModelState; +import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.util.ExceptionUtil; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Setting; @@ -49,6 +52,7 @@ public class EntityColdStartWorker extends SingleRequestWorker { public static final String WORKER_NAME = "cold-start"; private final EntityColdStarter entityColdStarter; + private final CacheProvider cacheProvider; public EntityColdStartWorker( long heapSizeInBytes, @@ -67,7 +71,8 @@ public EntityColdStartWorker( Duration executionTtl, EntityColdStarter entityColdStarter, Duration stateTtl, - NodeStateManager nodeStateManager + NodeStateManager nodeStateManager, + CacheProvider cacheProvider ) { super( WORKER_NAME, @@ -90,6 +95,7 @@ public EntityColdStartWorker( nodeStateManager ); this.entityColdStarter = entityColdStarter; + this.cacheProvider = cacheProvider; } @Override @@ -114,15 +120,42 @@ protected void executeRequest(EntityRequest coldStartRequest, ActionListener failureListener = ActionListener.delegateResponse(listener, (delegateListener, e) -> { - if (ExceptionUtil.isOverloaded(e)) { - LOG.error("OpenSearch is overloaded"); - setCoolDownStart(); + ActionListener coldStartListener = ActionListener.wrap(r -> { + nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> { + try { + if (!detectorOptional.isPresent()) { + LOG + .error( + new ParameterizedMessage( + "fail to load trained model [{}] to cache due to the detector not being found.", + modelState.getModelId() + ) + ); + return; + } + AnomalyDetector detector = detectorOptional.get(); + EntityModel model = modelState.getModel(); + // load to cache if cold start succeeds + if (model != null && model.getTrcf() != null) { + cacheProvider.get().hostIfPossible(detector, modelState); + } + } finally { + listener.onResponse(null); + } + }, listener::onFailure)); + + }, e -> { + try { + if (ExceptionUtil.isOverloaded(e)) { + LOG.error("OpenSearch is overloaded"); + setCoolDownStart(); + } + nodeStateManager.setException(detectorId, e); + } finally { + listener.onFailure(e); } - nodeStateManager.setException(detectorId, e); - delegateListener.onFailure(e); }); - entityColdStarter.trainModel(coldStartRequest.getEntity(), detectorId, modelState, failureListener); + entityColdStarter.trainModel(coldStartRequest.getEntity(), detectorId, modelState, coldStartListener); } } diff --git a/src/main/java/org/opensearch/ad/rest/RestExecuteAnomalyDetectorAction.java b/src/main/java/org/opensearch/ad/rest/RestExecuteAnomalyDetectorAction.java index 6e2ac9131..363199d58 100644 --- a/src/main/java/org/opensearch/ad/rest/RestExecuteAnomalyDetectorAction.java +++ b/src/main/java/org/opensearch/ad/rest/RestExecuteAnomalyDetectorAction.java @@ -70,7 +70,6 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli } AnomalyDetectorExecutionInput input = getAnomalyDetectorExecutionInput(request); return channel -> { - String rawPath = request.rawPath(); String error = validateAdExecutionInput(input); if (StringUtils.isNotBlank(error)) { channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, error)); diff --git a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java index b4c7eba3e..31256f3ed 100644 --- a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java +++ b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java @@ -31,6 +31,7 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.WriteRequest; +import org.opensearch.ad.ExecuteADResultResponseRecorder; import org.opensearch.ad.indices.AnomalyDetectionIndices; import org.opensearch.ad.model.ADTaskState; import org.opensearch.ad.model.AnomalyDetector; @@ -38,6 +39,8 @@ import org.opensearch.ad.model.IntervalTimeConfiguration; import org.opensearch.ad.task.ADTaskManager; import org.opensearch.ad.transport.AnomalyDetectorJobResponse; +import org.opensearch.ad.transport.AnomalyResultAction; +import org.opensearch.ad.transport.AnomalyResultRequest; import org.opensearch.ad.transport.StopDetectorAction; import org.opensearch.ad.transport.StopDetectorRequest; import org.opensearch.ad.transport.StopDetectorResponse; @@ -52,6 +55,8 @@ import org.opensearch.rest.RestStatus; import org.opensearch.transport.TransportService; +import com.google.common.base.Throwables; + /** * Anomaly detector job REST action handler to process POST/PUT request. */ @@ -62,19 +67,18 @@ public class IndexAnomalyDetectorJobActionHandler { private final Long seqNo; private final Long primaryTerm; private final Client client; - private final ActionListener listener; private final NamedXContentRegistry xContentRegistry; private final TransportService transportService; private final ADTaskManager adTaskManager; private final Logger logger = LogManager.getLogger(IndexAnomalyDetectorJobActionHandler.class); private final TimeValue requestTimeout; + private final ExecuteADResultResponseRecorder recorder; /** * Constructor function. * * @param client ES node client that executes actions on the local node - * @param listener Listener to send responses * @param anomalyDetectionIndices anomaly detector index manager * @param detectorId detector identifier * @param seqNo sequence number of last modification @@ -83,10 +87,10 @@ public class IndexAnomalyDetectorJobActionHandler { * @param xContentRegistry Registry which is used for XContentParser * @param transportService transport service * @param adTaskManager AD task manager + * @param recorder Utility to record AnomalyResultAction execution result */ public IndexAnomalyDetectorJobActionHandler( Client client, - ActionListener listener, AnomalyDetectionIndices anomalyDetectionIndices, String detectorId, Long seqNo, @@ -94,10 +98,10 @@ public IndexAnomalyDetectorJobActionHandler( TimeValue requestTimeout, NamedXContentRegistry xContentRegistry, TransportService transportService, - ADTaskManager adTaskManager + ADTaskManager adTaskManager, + ExecuteADResultResponseRecorder recorder ) { this.client = client; - this.listener = listener; this.anomalyDetectionIndices = anomalyDetectionIndices; this.detectorId = detectorId; this.seqNo = seqNo; @@ -106,6 +110,7 @@ public IndexAnomalyDetectorJobActionHandler( this.xContentRegistry = xContentRegistry; this.transportService = transportService; this.adTaskManager = adTaskManager; + this.recorder = recorder; } /** @@ -113,16 +118,56 @@ public IndexAnomalyDetectorJobActionHandler( * 1. If job doesn't exist, create new job. * 2. If job exists: a). if job enabled, return error message; b). if job disabled, enable job. * @param detector anomaly detector + * @param listener Listener to send responses */ - public void startAnomalyDetectorJob(AnomalyDetector detector) { + public void startAnomalyDetectorJob(AnomalyDetector detector, ActionListener listener) { + // this start listener is created & injected throughout the job handler so that whenever the job response is received, + // there's the extra step of trying to index results and update detector state with a 60s delay. + ActionListener startListener = ActionListener.wrap(r -> { + try { + Instant executionEndTime = Instant.now(); + IntervalTimeConfiguration schedule = (IntervalTimeConfiguration) detector.getDetectionInterval(); + Instant executionStartTime = executionEndTime.minus(schedule.getInterval(), schedule.getUnit()); + AnomalyResultRequest getRequest = new AnomalyResultRequest( + detector.getDetectorId(), + executionStartTime.toEpochMilli(), + executionEndTime.toEpochMilli() + ); + client + .execute( + AnomalyResultAction.INSTANCE, + getRequest, + ActionListener + .wrap( + response -> recorder.indexAnomalyResult(executionStartTime, executionEndTime, response, detector), + exception -> { + + recorder + .indexAnomalyResultException( + executionStartTime, + executionEndTime, + Throwables.getStackTraceAsString(exception), + null, + detector + ); + } + ) + ); + } catch (Exception ex) { + listener.onFailure(ex); + return; + } + listener.onResponse(r); + + }, listener::onFailure); if (!anomalyDetectionIndices.doesAnomalyDetectorJobIndexExist()) { anomalyDetectionIndices.initAnomalyDetectorJobIndex(ActionListener.wrap(response -> { if (response.isAcknowledged()) { logger.info("Created {} with mappings.", ANOMALY_DETECTORS_INDEX); - createJob(detector); + createJob(detector, startListener); } else { logger.warn("Created {} with mappings call not acknowledged.", ANOMALY_DETECTORS_INDEX); - listener + startListener .onFailure( new OpenSearchStatusException( "Created " + ANOMALY_DETECTORS_INDEX + " with mappings call not acknowledged.", @@ -130,13 +175,13 @@ public void startAnomalyDetectorJob(AnomalyDetector detector) { ) ); } - }, exception -> listener.onFailure(exception))); + }, exception -> startListener.onFailure(exception))); } else { - createJob(detector); + createJob(detector, startListener); } } - private void createJob(AnomalyDetector detector) { + private void createJob(AnomalyDetector detector, ActionListener listener) { try { IntervalTimeConfiguration interval = (IntervalTimeConfiguration) detector.getDetectionInterval(); Schedule schedule = new IntervalSchedule(Instant.now(), (int) interval.getInterval(), interval.getUnit()); @@ -155,7 +200,7 @@ private void createJob(AnomalyDetector detector) { detector.getResultIndex() ); - getAnomalyDetectorJobForWrite(detector, job); + getAnomalyDetectorJobForWrite(detector, job, listener); } catch (Exception e) { String message = "Failed to parse anomaly detector job " + detectorId; logger.error(message, e); @@ -163,19 +208,30 @@ private void createJob(AnomalyDetector detector) { } } - private void getAnomalyDetectorJobForWrite(AnomalyDetector detector, AnomalyDetectorJob job) { + private void getAnomalyDetectorJobForWrite( + AnomalyDetector detector, + AnomalyDetectorJob job, + ActionListener listener + ) { GetRequest getRequest = new GetRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX).id(detectorId); client .get( getRequest, ActionListener - .wrap(response -> onGetAnomalyDetectorJobForWrite(response, detector, job), exception -> listener.onFailure(exception)) + .wrap( + response -> onGetAnomalyDetectorJobForWrite(response, detector, job, listener), + exception -> listener.onFailure(exception) + ) ); } - private void onGetAnomalyDetectorJobForWrite(GetResponse response, AnomalyDetector detector, AnomalyDetectorJob job) - throws IOException { + private void onGetAnomalyDetectorJobForWrite( + GetResponse response, + AnomalyDetector detector, + AnomalyDetectorJob job, + ActionListener listener + ) throws IOException { if (response.isExists()) { try (XContentParser parser = createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); @@ -207,7 +263,7 @@ private void onGetAnomalyDetectorJobForWrite(GetResponse response, AnomalyDetect transportService, ActionListener .wrap( - r -> { indexAnomalyDetectorJob(newJob, null); }, + r -> { indexAnomalyDetectorJob(newJob, null, listener); }, e -> { // Have logged error message in ADTaskManager#startDetector listener.onFailure(e); @@ -227,12 +283,16 @@ private void onGetAnomalyDetectorJobForWrite(GetResponse response, AnomalyDetect null, job.getUser(), transportService, - ActionListener.wrap(r -> { indexAnomalyDetectorJob(job, null); }, e -> listener.onFailure(e)) + ActionListener.wrap(r -> { indexAnomalyDetectorJob(job, null, listener); }, e -> listener.onFailure(e)) ); } } - private void indexAnomalyDetectorJob(AnomalyDetectorJob job, AnomalyDetectorFunction function) throws IOException { + private void indexAnomalyDetectorJob( + AnomalyDetectorJob job, + AnomalyDetectorFunction function, + ActionListener listener + ) throws IOException { IndexRequest indexRequest = new IndexRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .source(job.toXContent(XContentFactory.jsonBuilder(), RestHandlerUtils.XCONTENT_WITH_TYPE)) @@ -244,11 +304,18 @@ private void indexAnomalyDetectorJob(AnomalyDetectorJob job, AnomalyDetectorFunc .index( indexRequest, ActionListener - .wrap(response -> onIndexAnomalyDetectorJobResponse(response, function), exception -> listener.onFailure(exception)) + .wrap( + response -> onIndexAnomalyDetectorJobResponse(response, function, listener), + exception -> listener.onFailure(exception) + ) ); } - private void onIndexAnomalyDetectorJobResponse(IndexResponse response, AnomalyDetectorFunction function) { + private void onIndexAnomalyDetectorJobResponse( + IndexResponse response, + AnomalyDetectorFunction function, + ActionListener listener + ) { if (response == null || (response.getResult() != CREATED && response.getResult() != UPDATED)) { String errorMsg = getShardsFailure(response); listener.onFailure(new OpenSearchStatusException(errorMsg, response.status())); @@ -274,8 +341,9 @@ private void onIndexAnomalyDetectorJobResponse(IndexResponse response, AnomalyDe * 2.If job exists: a).if job state is disabled, return error message; b).if job state is enabled, disable job. * * @param detectorId detector identifier + * @param listener Listener to send responses */ - public void stopAnomalyDetectorJob(String detectorId) { + public void stopAnomalyDetectorJob(String detectorId, ActionListener listener) { GetRequest getRequest = new GetRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX).id(detectorId); client.get(getRequest, ActionListener.wrap(response -> { @@ -304,8 +372,9 @@ public void stopAnomalyDetectorJob(String detectorId) { .execute( StopDetectorAction.INSTANCE, new StopDetectorRequest(detectorId), - stopAdDetectorListener(detectorId) - ) + stopAdDetectorListener(detectorId, listener) + ), + listener ); } } catch (IOException e) { @@ -319,7 +388,10 @@ public void stopAnomalyDetectorJob(String detectorId) { }, exception -> listener.onFailure(exception))); } - private ActionListener stopAdDetectorListener(String detectorId) { + private ActionListener stopAdDetectorListener( + String detectorId, + ActionListener listener + ) { return new ActionListener() { @Override public void onResponse(StopDetectorResponse stopDetectorResponse) { diff --git a/src/main/java/org/opensearch/ad/settings/EnabledSetting.java b/src/main/java/org/opensearch/ad/settings/EnabledSetting.java index 6a22b769b..797df2a2b 100644 --- a/src/main/java/org/opensearch/ad/settings/EnabledSetting.java +++ b/src/main/java/org/opensearch/ad/settings/EnabledSetting.java @@ -39,8 +39,9 @@ public class EnabledSetting extends AbstractSetting { public static final String LEGACY_OPENDISTRO_AD_BREAKER_ENABLED = "opendistro.anomaly_detection.breaker.enabled"; - public static final String INTERPOLATION_IN_HCAD_COLD_START_ENABLED = - "plugins.anomaly_detection.hcad_cold_start_interpolation.enabled";; + public static final String INTERPOLATION_IN_HCAD_COLD_START_ENABLED = "plugins.anomaly_detection.hcad_cold_start_interpolation.enabled"; + + public static final String DOOR_KEEPER_IN_CACHE_ENABLED = "plugins.anomaly_detection.door_keeper_in_cache.enabled";; public static final Map> settings = unmodifiableMap(new HashMap>() { { @@ -75,6 +76,13 @@ public class EnabledSetting extends AbstractSetting { INTERPOLATION_IN_HCAD_COLD_START_ENABLED, Setting.boolSetting(INTERPOLATION_IN_HCAD_COLD_START_ENABLED, false, NodeScope, Dynamic) ); + + /** + * We have a bloom filter placed in front of inactive entity cache to + * filter out unpopular items that are not likely to appear more + * than once. Whether this bloom filter is enabled or not. + */ + put(DOOR_KEEPER_IN_CACHE_ENABLED, Setting.boolSetting(DOOR_KEEPER_IN_CACHE_ENABLED, false, NodeScope, Dynamic)); } }); @@ -112,4 +120,12 @@ public static boolean isADBreakerEnabled() { public static boolean isInterpolationInColdStartEnabled() { return EnabledSetting.getInstance().getSettingValue(EnabledSetting.INTERPOLATION_IN_HCAD_COLD_START_ENABLED); } + + /** + * If enabled, we filter out unpopular items that are not likely to appear more than once + * @return wWhether door keeper in cache is enabled or not. + */ + public static boolean isDoorKeeperInCacheEnabled() { + return EnabledSetting.getInstance().getSettingValue(EnabledSetting.DOOR_KEEPER_IN_CACHE_ENABLED); + } } diff --git a/src/main/java/org/opensearch/ad/task/ADTaskManager.java b/src/main/java/org/opensearch/ad/task/ADTaskManager.java index 3d8f0ea95..1b3a775c8 100644 --- a/src/main/java/org/opensearch/ad/task/ADTaskManager.java +++ b/src/main/java/org/opensearch/ad/task/ADTaskManager.java @@ -334,7 +334,7 @@ private void startRealtimeOrHistoricalDetection( try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { if (detectionDateRange == null) { // start realtime job - handler.startAnomalyDetectorJob(detector.get()); + handler.startAnomalyDetectorJob(detector.get(), listener); } else { // start historical analysis task forwardApplyForTaskSlotsRequestToLeadNode(detector.get(), detectionDateRange, user, transportService, listener); @@ -852,7 +852,7 @@ public void stopDetector( ); } else { // stop realtime detector job - handler.stopAnomalyDetectorJob(detectorId); + handler.stopAnomalyDetectorJob(detectorId, listener); } }, listener); } @@ -2820,6 +2820,13 @@ public boolean skipUpdateHCRealtimeTask(String detectorId, String error) { && Objects.equals(error, realtimeTaskCache.getError()); } + public boolean isHCRealtimeTaskStartInitializing(String detectorId) { + ADRealtimeTaskCache realtimeTaskCache = adTaskCacheManager.getRealtimeTaskCache(detectorId); + return realtimeTaskCache != null + && realtimeTaskCache.getInitProgress() != null + && realtimeTaskCache.getInitProgress().floatValue() > 0; + } + public String convertEntityToString(ADTask adTask) { if (adTask == null || !adTask.isEntityTask()) { return null; diff --git a/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportAction.java b/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportAction.java index c7f2beddf..181dfc797 100644 --- a/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportAction.java @@ -24,6 +24,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.ad.ExecuteADResultResponseRecorder; import org.opensearch.ad.indices.AnomalyDetectionIndices; import org.opensearch.ad.model.DetectionDateRange; import org.opensearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler; @@ -51,6 +52,7 @@ public class AnomalyDetectorJobTransportAction extends HandledTransportAction filterByEnabled = it); + this.recorder = recorder; } @Override @@ -131,7 +135,6 @@ private void executeDetector( ) { IndexAnomalyDetectorJobActionHandler handler = new IndexAnomalyDetectorJobActionHandler( client, - listener, anomalyDetectionIndices, detectorId, seqNo, @@ -139,7 +142,8 @@ private void executeDetector( requestTimeout, xContentRegistry, transportService, - adTaskManager + adTaskManager, + recorder ); if (rawPath.endsWith(RestHandlerUtils.START_JOB)) { adTaskManager.startDetector(detectorId, detectionDateRange, handler, user, transportService, context, listener); diff --git a/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java b/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java index 0aa359a8f..921a5dc55 100644 --- a/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java @@ -696,7 +696,7 @@ private Exception coldStartIfNoModel(AtomicReference failure, Anomaly } LOG.info("Trigger cold start for {}", detector.getDetectorId()); coldStart(detector); - return previousException.orElse(new InternalFailure(adID, NO_MODEL_ERR_MSG)); + return previousException.orElse(exp); } private void findException(Throwable cause, String adID, AtomicReference failure, String nodeId) { diff --git a/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java b/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java index a56addd72..0c3d35037 100644 --- a/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java +++ b/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.Locale; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; @@ -49,12 +50,14 @@ import org.opensearch.action.index.IndexResponse; import org.opensearch.ad.common.exception.EndRunException; import org.opensearch.ad.indices.AnomalyDetectionIndices; +import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyDetectorJob; import org.opensearch.ad.model.AnomalyResult; import org.opensearch.ad.model.IntervalTimeConfiguration; import org.opensearch.ad.task.ADTaskManager; import org.opensearch.ad.transport.handler.AnomalyIndexHandler; import org.opensearch.ad.util.ClientUtil; +import org.opensearch.ad.util.DiscoveryNodeFilterer; import org.opensearch.ad.util.IndexUtils; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; @@ -114,6 +117,13 @@ public class AnomalyDetectorJobRunnerTests extends AbstractADTest { @Mock private AnomalyDetectionIndices indexUtil; + private ExecuteADResultResponseRecorder recorder; + + @Mock + private DiscoveryNodeFilterer nodeFilter; + + private AnomalyDetector detector; + @BeforeClass public static void setUpBeforeClass() { setUpThreadPool(AnomalyDetectorJobRunnerTests.class.getSimpleName()); @@ -138,7 +148,6 @@ public void setup() throws Exception { Mockito.doReturn(threadContext).when(mockedThreadPool).getThreadContext(); runner.setThreadPool(mockedThreadPool); runner.setClient(client); - runner.setAnomalyResultHandler(anomalyResultHandler); runner.setAdTaskManager(adTaskManager); Settings settings = Settings @@ -154,7 +163,6 @@ public void setup() throws Exception { AnomalyDetectionIndices anomalyDetectionIndices = mock(AnomalyDetectionIndices.class); IndexNameExpressionResolver indexNameResolver = mock(IndexNameExpressionResolver.class); IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService, indexNameResolver); - NodeStateManager stateManager = mock(NodeStateManager.class); runner.setAnomalyDetectionIndices(indexUtil); @@ -195,6 +203,18 @@ public void setup() throws Exception { return null; }).when(client).index(any(), any()); + + recorder = new ExecuteADResultResponseRecorder(indexUtil, anomalyResultHandler, adTaskManager, nodeFilter, threadPool, client); + runner.setExecuteADResultResponseRecorder(recorder); + detector = TestHelpers.randomAnomalyDetectorWithEmptyFeature(); + + NodeStateManager stateManager = mock(NodeStateManager.class); + doAnswer(invocation -> { + ActionListener> listener = invocation.getArgument(1); + listener.onResponse(Optional.of(detector)); + return null; + }).when(stateManager).getAnomalyDetector(any(String.class), any(ActionListener.class)); + runner.setNodeStateManager(stateManager); } @Rule @@ -222,7 +242,7 @@ public void testRunJobWithNullLockDuration() throws InterruptedException { when(jobParameter.getLockDurationSeconds()).thenReturn(null); when(jobParameter.getSchedule()).thenReturn(new IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES)); runner.runJob(jobParameter, context); - Thread.sleep(1000); + Thread.sleep(2000); assertTrue(testAppender.containsMessage("Can't get lock for AD job")); } @@ -239,7 +259,7 @@ public void testRunJobWithLockDuration() throws InterruptedException { @Test public void testRunAdJobWithNullLock() { LockModel lock = null; - runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now()); + runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now(), recorder, detector); verify(client, never()).execute(any(), any(), any()); } @@ -247,7 +267,7 @@ public void testRunAdJobWithNullLock() { public void testRunAdJobWithLock() { LockModel lock = new LockModel("indexName", "jobId", Instant.now(), 10, false); - runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now()); + runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now(), recorder, detector); verify(client, times(1)).execute(any(), any(), any()); } @@ -257,7 +277,7 @@ public void testRunAdJobWithExecuteException() { doThrow(RuntimeException.class).when(client).execute(any(), any(), any()); - runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now()); + runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now(), recorder, detector); verify(client, times(1)).execute(any(), any(), any()); assertTrue(testAppender.containsMessage("Failed to execute AD job")); } @@ -266,7 +286,17 @@ public void testRunAdJobWithExecuteException() { public void testRunAdJobWithEndRunExceptionNow() { LockModel lock = new LockModel("indexName", "jobId", Instant.now(), 10, false); Exception exception = new EndRunException(jobParameter.getName(), randomAlphaOfLength(5), true); - runner.handleAdException(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now(), exception); + runner + .handleAdException( + jobParameter, + lockService, + lock, + Instant.now().minusMillis(1000 * 60), + Instant.now(), + exception, + recorder, + detector + ); verify(anomalyResultHandler).index(any(), any(), any()); } @@ -366,7 +396,17 @@ private void testRunAdJobWithEndRunExceptionNowAndStopAdJob(boolean jobExists, b return null; }).when(client).index(any(IndexRequest.class), any()); - runner.handleAdException(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now(), exception); + runner + .handleAdException( + jobParameter, + lockService, + lock, + Instant.now().minusMillis(1000 * 60), + Instant.now(), + exception, + recorder, + detector + ); } @Test @@ -380,7 +420,17 @@ public void testRunAdJobWithEndRunExceptionNowAndGetJobException() { return null; }).when(client).get(any(GetRequest.class), any()); - runner.handleAdException(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now(), exception); + runner + .handleAdException( + jobParameter, + lockService, + lock, + Instant.now().minusMillis(1000 * 60), + Instant.now(), + exception, + recorder, + detector + ); assertTrue(testAppender.containsMessage("JobRunner will stop AD job due to EndRunException for")); assertTrue(testAppender.containsMessage("JobRunner failed to get detector job")); verify(anomalyResultHandler).index(any(), any(), any()); @@ -404,7 +454,17 @@ public void testRunAdJobWithEndRunExceptionNowAndFailToGetJob() { return null; }).when(client).get(any(), any()); - runner.handleAdException(jobParameter, lockService, lock, Instant.now().minusMillis(1000 * 60), Instant.now(), exception); + runner + .handleAdException( + jobParameter, + lockService, + lock, + Instant.now().minusMillis(1000 * 60), + Instant.now(), + exception, + recorder, + detector + ); verify(anomalyResultHandler).index(any(), any(), any()); assertEquals(1, testAppender.countMessage("JobRunner failed to get detector job")); } @@ -425,10 +485,10 @@ public void testRunAdJobWithEndRunExceptionNotNowAndRetryUntilStop() throws Inte }).when(client).execute(any(), any(), any()); for (int i = 0; i < 3; i++) { - runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime); + runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime, recorder, detector); assertEquals(i + 1, testAppender.countMessage("EndRunException happened for")); } - runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime); + runner.runAdJob(jobParameter, lockService, lock, Instant.now().minusSeconds(60), executionStartTime, recorder, detector); assertEquals(1, testAppender.countMessage("JobRunner will stop AD job due to EndRunException retry exceeds upper limit")); } diff --git a/src/test/java/org/opensearch/ad/ODFERestTestCase.java b/src/test/java/org/opensearch/ad/ODFERestTestCase.java index 44b3e1d2d..f04880532 100644 --- a/src/test/java/org/opensearch/ad/ODFERestTestCase.java +++ b/src/test/java/org/opensearch/ad/ODFERestTestCase.java @@ -30,6 +30,8 @@ import java.util.Optional; import java.util.stream.Collectors; +import javax.net.ssl.SSLEngine; + import org.apache.hc.client5.http.auth.AuthScope; import org.apache.hc.client5.http.auth.UsernamePasswordCredentials; import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; @@ -37,10 +39,12 @@ import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; import org.apache.hc.client5.http.ssl.NoopHostnameVerifier; +import org.apache.hc.core5.function.Factory; import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.message.BasicHeader; import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.reactor.ssl.TlsDetails; import org.apache.hc.core5.ssl.SSLContextBuilder; import org.apache.hc.core5.util.Timeout; import org.junit.After; @@ -195,6 +199,13 @@ protected static void configureHttpsClient(RestClientBuilder builder, Settings s .create() .setHostnameVerifier(NoopHostnameVerifier.INSTANCE) .setSslContext(SSLContextBuilder.create().loadTrustMaterial(null, (chains, authType) -> true).build()) + // See https://issues.apache.org/jira/browse/HTTPCLIENT-2219 + .setTlsDetailsFactory(new Factory() { + @Override + public TlsDetails create(final SSLEngine sslEngine) { + return new TlsDetails(sslEngine.getSession(), sslEngine.getApplicationProtocol()); + } + }) .build(); final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder .create() diff --git a/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java b/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java index a4e0ccc13..d939884bf 100644 --- a/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java +++ b/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java @@ -56,6 +56,7 @@ import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.Entity; import org.opensearch.ad.settings.AnomalyDetectorSettings; +import org.opensearch.ad.settings.EnabledSetting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -650,21 +651,26 @@ public void testSelectEmpty() { // test that detector interval is more than 1 hour that maintenance is called before // the next get method public void testLongDetectorInterval() { - when(clock.instant()).thenReturn(Instant.ofEpochSecond(1000)); - when(detector.getDetectionIntervalDuration()).thenReturn(Duration.ofHours(12)); - String modelId = entity1.getModelId(detectorId).get(); - // record last access time 1000 - entityCache.get(modelId, detector); - assertEquals(-1, entityCache.getLastActiveMs(detectorId, modelId)); - // 2 hour = 7200 seconds have passed - long currentTimeEpoch = 8200; - when(clock.instant()).thenReturn(Instant.ofEpochSecond(currentTimeEpoch)); - // door keeper should not be expired since we reclaim space every 60 intervals - entityCache.maintenance(); - // door keeper still has the record and won't blocks entity state being created - entityCache.get(modelId, detector); - // * 1000 to convert to milliseconds - assertEquals(currentTimeEpoch * 1000, entityCache.getLastActiveMs(detectorId, modelId)); + try { + EnabledSetting.getInstance().setSettingValue(EnabledSetting.DOOR_KEEPER_IN_CACHE_ENABLED, true); + when(clock.instant()).thenReturn(Instant.ofEpochSecond(1000)); + when(detector.getDetectionIntervalDuration()).thenReturn(Duration.ofHours(12)); + String modelId = entity1.getModelId(detectorId).get(); + // record last access time 1000 + assertTrue(null == entityCache.get(modelId, detector)); + assertEquals(-1, entityCache.getLastActiveMs(detectorId, modelId)); + // 2 hour = 7200 seconds have passed + long currentTimeEpoch = 8200; + when(clock.instant()).thenReturn(Instant.ofEpochSecond(currentTimeEpoch)); + // door keeper should not be expired since we reclaim space every 60 intervals + entityCache.maintenance(); + // door keeper still has the record and won't blocks entity state being created + entityCache.get(modelId, detector); + // * 1000 to convert to milliseconds + assertEquals(currentTimeEpoch * 1000, entityCache.getLastActiveMs(detectorId, modelId)); + } finally { + EnabledSetting.getInstance().setSettingValue(EnabledSetting.DOOR_KEEPER_IN_CACHE_ENABLED, false); + } } public void testGetNoPriorityUpdate() { diff --git a/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java b/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java index 0c2cce832..074b15552 100644 --- a/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java +++ b/src/test/java/org/opensearch/ad/e2e/DetectionResultEvalutationIT.java @@ -16,8 +16,6 @@ import java.text.SimpleDateFormat; import java.time.Clock; import java.time.Instant; -import java.time.format.DateTimeFormatter; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; @@ -45,7 +43,7 @@ public class DetectionResultEvalutationIT extends AbstractSyntheticDataTest { protected static final Logger LOG = (Logger) LogManager.getLogger(DetectionResultEvalutationIT.class); /** - * Simulate starting the given HCAD detector. + * Wait for HCAD cold start to finish. * @param detectorId Detector Id * @param data Data in Json format * @param trainTestSplit Training data size @@ -54,7 +52,7 @@ public class DetectionResultEvalutationIT extends AbstractSyntheticDataTest { * @param client OpenSearch Client * @throws Exception when failing to query/indexing from/to OpenSearch */ - private void simulateHCADStartDetector( + private void waitForHCADStartDetector( String detectorId, List data, int trainTestSplit, @@ -63,18 +61,6 @@ private void simulateHCADStartDetector( RestClient client ) throws Exception { - Instant trainTime = Instant.from(DateTimeFormatter.ISO_INSTANT.parse(data.get(trainTestSplit - 1).get("timestamp").getAsString())); - - Instant begin = null; - Instant end = null; - for (int i = 0; i < shingleSize; i++) { - begin = trainTime.minus(intervalMinutes * (shingleSize - 1 - i), ChronoUnit.MINUTES); - end = begin.plus(intervalMinutes, ChronoUnit.MINUTES); - try { - getDetectionResult(detectorId, begin, end, client); - } catch (Exception e) {} - } - // It takes time to wait for model initialization long startTime = System.currentTimeMillis(); long duration = 0; do { @@ -94,7 +80,7 @@ private void simulateHCADStartDetector( break; } try { - getDetectionResult(detectorId, begin, end, client); + profileDetectorInitProgress(detectorId, client); } catch (Exception e) {} duration = System.currentTimeMillis() - startTime; } while (duration <= 60_000); @@ -280,14 +266,15 @@ private void verifyRestart(String datasetName, int intervalMinutes, int shingleS String detectorId = createDetector(datasetName, intervalMinutes, client, categoricalField, 0); // cannot stop without actually starting detector because ad complains no ad job index startDetector(detectorId, client); + profileDetectorInitProgress(detectorId, client); // it would be long if we wait for the job actually run the work periodically; speed it up by using simulateHCADStartDetector - simulateHCADStartDetector(detectorId, data, trainTestSplit, shingleSize, intervalMinutes, client); + waitForHCADStartDetector(detectorId, data, trainTestSplit, shingleSize, intervalMinutes, client); String initProgress = profileDetectorInitProgress(detectorId, client); assertEquals("init progress is " + initProgress, "100%", initProgress); stopDetector(detectorId, client); // restart detector startDetector(detectorId, client); - simulateHCADStartDetector(detectorId, data, trainTestSplit, shingleSize, intervalMinutes, client); + waitForHCADStartDetector(detectorId, data, trainTestSplit, shingleSize, intervalMinutes, client); initProgress = profileDetectorInitProgress(detectorId, client); assertEquals("init progress is " + initProgress, "100%", initProgress); } diff --git a/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java b/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java index 1236c6217..4f4ca09c4 100644 --- a/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java +++ b/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java @@ -114,7 +114,6 @@ public void testTrainUsingSamples() throws InterruptedException { public void testColdStart() throws InterruptedException, IOException { Queue samples = MLUtil.createQueueSamples(1); - double[] savedSample = samples.peek(); EntityModel model = new EntityModel(entity, samples, null); modelState = new ModelState<>(model, modelId, detectorId, ModelType.ENTITY.getName(), clock, priority); diff --git a/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobTransportActionWithUser.java b/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobTransportActionWithUser.java index 4b7b8b00d..caa36cdaa 100644 --- a/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobTransportActionWithUser.java +++ b/src/test/java/org/opensearch/ad/mock/transport/MockAnomalyDetectorJobTransportActionWithUser.java @@ -20,6 +20,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.ad.ExecuteADResultResponseRecorder; import org.opensearch.ad.indices.AnomalyDetectionIndices; import org.opensearch.ad.model.DetectionDateRange; import org.opensearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler; @@ -52,6 +53,7 @@ public class MockAnomalyDetectorJobTransportActionWithUser extends private ThreadContext.StoredContext context; private final ADTaskManager adTaskManager; private final TransportService transportService; + private final ExecuteADResultResponseRecorder recorder; @Inject public MockAnomalyDetectorJobTransportActionWithUser( @@ -62,7 +64,8 @@ public MockAnomalyDetectorJobTransportActionWithUser( Settings settings, AnomalyDetectionIndices anomalyDetectionIndices, NamedXContentRegistry xContentRegistry, - ADTaskManager adTaskManager + ADTaskManager adTaskManager, + ExecuteADResultResponseRecorder recorder ) { super(MockAnomalyDetectorJobAction.NAME, transportService, actionFilters, AnomalyDetectorJobRequest::new); this.transportService = transportService; @@ -77,6 +80,7 @@ public MockAnomalyDetectorJobTransportActionWithUser( ThreadContext threadContext = new ThreadContext(settings); context = threadContext.stashContext(); + this.recorder = recorder; } @Override @@ -131,7 +135,6 @@ private void executeDetector( ) { IndexAnomalyDetectorJobActionHandler handler = new IndexAnomalyDetectorJobActionHandler( client, - listener, anomalyDetectionIndices, detectorId, seqNo, @@ -139,7 +142,8 @@ private void executeDetector( requestTimeout, xContentRegistry, transportService, - adTaskManager + adTaskManager, + recorder ); if (rawPath.endsWith(RestHandlerUtils.START_JOB)) { adTaskManager.startDetector(detectorId, detectionDateRange, handler, user, transportService, context, listener); diff --git a/src/test/java/org/opensearch/ad/ratelimit/EntityColdStartWorkerTests.java b/src/test/java/org/opensearch/ad/ratelimit/EntityColdStartWorkerTests.java index 77dc78015..0bc23d60c 100644 --- a/src/test/java/org/opensearch/ad/ratelimit/EntityColdStartWorkerTests.java +++ b/src/test/java/org/opensearch/ad/ratelimit/EntityColdStartWorkerTests.java @@ -30,7 +30,10 @@ import org.opensearch.OpenSearchStatusException; import org.opensearch.action.ActionListener; import org.opensearch.ad.breaker.ADCircuitBreakerService; +import org.opensearch.ad.caching.CacheProvider; import org.opensearch.ad.ml.EntityColdStarter; +import org.opensearch.ad.ml.EntityModel; +import org.opensearch.ad.ml.ModelState; import org.opensearch.ad.settings.AnomalyDetectorSettings; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; @@ -38,10 +41,13 @@ import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; import org.opensearch.rest.RestStatus; +import test.org.opensearch.ad.util.MLUtil; + public class EntityColdStartWorkerTests extends AbstractRateLimitingTest { ClusterService clusterService; EntityColdStartWorker worker; EntityColdStarter entityColdStarter; + CacheProvider cacheProvider; @Override public void setUp() throws Exception { @@ -64,6 +70,8 @@ public void setUp() throws Exception { entityColdStarter = mock(EntityColdStarter.class); + cacheProvider = mock(CacheProvider.class); + // Integer.MAX_VALUE makes a huge heap worker = new EntityColdStartWorker( Integer.MAX_VALUE, @@ -82,7 +90,8 @@ public void setUp() throws Exception { AnomalyDetectorSettings.QUEUE_MAINTENANCE, entityColdStarter, AnomalyDetectorSettings.HOURLY_MAINTENANCE, - nodeStateManager + nodeStateManager, + cacheProvider ); } @@ -135,4 +144,22 @@ public void testException() { verify(entityColdStarter, times(2)).trainModel(any(), anyString(), any(), any()); verify(nodeStateManager, times(2)).setException(eq(detectorId), any(OpenSearchStatusException.class)); } + + public void testModelHosted() { + EntityRequest request = new EntityRequest(Integer.MAX_VALUE, detectorId, RequestPriority.MEDIUM, entity); + + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(3); + + ModelState state = invocation.getArgument(2); + state.setModel(MLUtil.createNonEmptyModel(detectorId)); + listener.onResponse(null); + + return null; + }).when(entityColdStarter).trainModel(any(), anyString(), any(), any()); + + worker.put(request); + + verify(cacheProvider, times(1)).get(); + } } diff --git a/src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java b/src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java new file mode 100644 index 000000000..c0366e95a --- /dev/null +++ b/src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java @@ -0,0 +1,354 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.ad.rest.handler; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opensearch.action.DocWriteResponse.Result.CREATED; +import static org.opensearch.ad.constant.CommonErrorMessages.CAN_NOT_FIND_LATEST_TASK; + +import java.io.IOException; +import java.util.Arrays; + +import org.junit.Before; +import org.junit.BeforeClass; +import org.opensearch.action.ActionListener; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.get.GetResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.update.UpdateResponse; +import org.opensearch.ad.ExecuteADResultResponseRecorder; +import org.opensearch.ad.TestHelpers; +import org.opensearch.ad.common.exception.ResourceNotFoundException; +import org.opensearch.ad.constant.CommonErrorMessages; +import org.opensearch.ad.constant.CommonName; +import org.opensearch.ad.indices.AnomalyDetectionIndices; +import org.opensearch.ad.mock.model.MockSimpleLog; +import org.opensearch.ad.model.AnomalyDetector; +import org.opensearch.ad.model.AnomalyResult; +import org.opensearch.ad.model.Feature; +import org.opensearch.ad.task.ADTaskManager; +import org.opensearch.ad.transport.AnomalyDetectorJobResponse; +import org.opensearch.ad.transport.AnomalyResultAction; +import org.opensearch.ad.transport.AnomalyResultResponse; +import org.opensearch.ad.transport.ProfileAction; +import org.opensearch.ad.transport.ProfileResponse; +import org.opensearch.ad.transport.handler.AnomalyIndexHandler; +import org.opensearch.ad.util.DiscoveryNodeFilterer; +import org.opensearch.client.Client; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import com.google.common.collect.ImmutableList; + +public class IndexAnomalyDetectorJobActionHandlerTests extends OpenSearchTestCase { + + private static AnomalyDetectionIndices anomalyDetectionIndices; + private static String detectorId; + private static Long seqNo; + private static Long primaryTerm; + + private static NamedXContentRegistry xContentRegistry; + private static TransportService transportService; + private static TimeValue requestTimeout; + private static DiscoveryNodeFilterer nodeFilter; + private static AnomalyDetector detector; + + private ADTaskManager adTaskManager; + + private ThreadPool threadPool; + + private ExecuteADResultResponseRecorder recorder; + private Client client; + private IndexAnomalyDetectorJobActionHandler handler; + private AnomalyIndexHandler anomalyResultHandler; + + @BeforeClass + public static void setOnce() throws IOException { + detectorId = "123"; + seqNo = 1L; + primaryTerm = 2L; + anomalyDetectionIndices = mock(AnomalyDetectionIndices.class); + xContentRegistry = NamedXContentRegistry.EMPTY; + transportService = mock(TransportService.class); + + requestTimeout = TimeValue.timeValueMinutes(60); + when(anomalyDetectionIndices.doesAnomalyDetectorJobIndexExist()).thenReturn(true); + + nodeFilter = mock(DiscoveryNodeFilterer.class); + detector = TestHelpers.randomAnomalyDetectorUsingCategoryFields(detectorId, Arrays.asList("a")); + } + + @SuppressWarnings("unchecked") + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + client = mock(Client.class); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[1]; + + GetResponse response = mock(GetResponse.class); + when(response.isExists()).thenReturn(false); + listener.onResponse(response); + + return null; + }).when(client).get(any(GetRequest.class), any()); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[1]; + + IndexResponse response = mock(IndexResponse.class); + when(response.getResult()).thenReturn(CREATED); + listener.onResponse(response); + + return null; + }).when(client).index(any(IndexRequest.class), any()); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[2]; + + AnomalyResultResponse response = new AnomalyResultResponse(null, "", 0L, 10L, true); + listener.onResponse(response); + + return null; + }).when(client).execute(any(AnomalyResultAction.class), any(), any()); + + adTaskManager = mock(ADTaskManager.class); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[4]; + + AnomalyDetectorJobResponse response = mock(AnomalyDetectorJobResponse.class); + listener.onResponse(response); + + return null; + }).when(adTaskManager).startDetector(any(), any(), any(), any(), any()); + + threadPool = mock(ThreadPool.class); + + anomalyResultHandler = mock(AnomalyIndexHandler.class); + + recorder = new ExecuteADResultResponseRecorder( + anomalyDetectionIndices, + anomalyResultHandler, + adTaskManager, + nodeFilter, + threadPool, + client + ); + + handler = new IndexAnomalyDetectorJobActionHandler( + client, + anomalyDetectionIndices, + detectorId, + seqNo, + primaryTerm, + requestTimeout, + xContentRegistry, + transportService, + adTaskManager, + recorder + ); + } + + @SuppressWarnings("unchecked") + public void testDelayHCProfile() { + when(adTaskManager.isHCRealtimeTaskStartInitializing(anyString())).thenReturn(false); + + ActionListener listener = mock(ActionListener.class); + + handler.startAnomalyDetectorJob(detector, listener); + + verify(client, times(1)).get(any(), any()); + verify(client, times(1)).execute(any(), any(), any()); + verify(adTaskManager, times(1)).startDetector(any(), any(), any(), any(), any()); + verify(adTaskManager, times(1)).isHCRealtimeTaskStartInitializing(anyString()); + verify(threadPool, times(1)).schedule(any(), any(), any()); + verify(listener, times(1)).onResponse(any()); + } + + @SuppressWarnings("unchecked") + public void testNoDelayHCProfile() { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[2]; + + ProfileResponse response = mock(ProfileResponse.class); + when(response.getTotalUpdates()).thenReturn(3L); + listener.onResponse(response); + + return null; + }).when(client).execute(any(ProfileAction.class), any(), any()); + + when(adTaskManager.isHCRealtimeTaskStartInitializing(anyString())).thenReturn(true); + + ActionListener listener = mock(ActionListener.class); + + handler.startAnomalyDetectorJob(detector, listener); + + verify(client, times(1)).get(any(), any()); + verify(client, times(2)).execute(any(), any(), any()); + verify(adTaskManager, times(1)).startDetector(any(), any(), any(), any(), any()); + verify(adTaskManager, times(1)).isHCRealtimeTaskStartInitializing(anyString()); + verify(adTaskManager, times(1)).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); + verify(threadPool, never()).schedule(any(), any(), any()); + verify(listener, times(1)).onResponse(any()); + } + + @SuppressWarnings("unchecked") + public void testHCProfileException() { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[2]; + + listener.onFailure(new RuntimeException()); + + return null; + }).when(client).execute(any(ProfileAction.class), any(), any()); + + when(adTaskManager.isHCRealtimeTaskStartInitializing(anyString())).thenReturn(true); + + ActionListener listener = mock(ActionListener.class); + + handler.startAnomalyDetectorJob(detector, listener); + + verify(client, times(1)).get(any(), any()); + verify(client, times(2)).execute(any(), any(), any()); + verify(adTaskManager, times(1)).startDetector(any(), any(), any(), any(), any()); + verify(adTaskManager, times(1)).isHCRealtimeTaskStartInitializing(anyString()); + verify(adTaskManager, never()).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); + verify(threadPool, never()).schedule(any(), any(), any()); + verify(listener, times(1)).onResponse(any()); + } + + @SuppressWarnings("unchecked") + public void testUpdateLatestRealtimeTaskOnCoordinatingNodeResourceNotFoundException() { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[2]; + + ProfileResponse response = mock(ProfileResponse.class); + when(response.getTotalUpdates()).thenReturn(3L); + listener.onResponse(response); + + return null; + }).when(client).execute(any(ProfileAction.class), any(), any()); + + when(adTaskManager.isHCRealtimeTaskStartInitializing(anyString())).thenReturn(true); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[5]; + + listener.onFailure(new ResourceNotFoundException(CAN_NOT_FIND_LATEST_TASK)); + + return null; + }).when(adTaskManager).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); + + ActionListener listener = mock(ActionListener.class); + + handler.startAnomalyDetectorJob(detector, listener); + + verify(client, times(1)).get(any(), any()); + verify(client, times(2)).execute(any(), any(), any()); + verify(adTaskManager, times(1)).startDetector(any(), any(), any(), any(), any()); + verify(adTaskManager, times(1)).isHCRealtimeTaskStartInitializing(anyString()); + verify(adTaskManager, times(1)).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); + verify(adTaskManager, times(1)).removeRealtimeTaskCache(anyString()); + verify(threadPool, never()).schedule(any(), any(), any()); + verify(listener, times(1)).onResponse(any()); + } + + @SuppressWarnings("unchecked") + public void testUpdateLatestRealtimeTaskOnCoordinatingException() { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[2]; + + ProfileResponse response = mock(ProfileResponse.class); + when(response.getTotalUpdates()).thenReturn(3L); + listener.onResponse(response); + + return null; + }).when(client).execute(any(ProfileAction.class), any(), any()); + + when(adTaskManager.isHCRealtimeTaskStartInitializing(anyString())).thenReturn(true); + + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[5]; + + listener.onFailure(new RuntimeException()); + + return null; + }).when(adTaskManager).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); + + ActionListener listener = mock(ActionListener.class); + + handler.startAnomalyDetectorJob(detector, listener); + + verify(client, times(1)).get(any(), any()); + verify(client, times(2)).execute(any(), any(), any()); + verify(adTaskManager, times(1)).startDetector(any(), any(), any(), any(), any()); + verify(adTaskManager, times(1)).isHCRealtimeTaskStartInitializing(anyString()); + verify(adTaskManager, times(1)).updateLatestRealtimeTaskOnCoordinatingNode(any(), any(), any(), any(), any(), any()); + verify(adTaskManager, never()).removeRealtimeTaskCache(anyString()); + verify(threadPool, never()).schedule(any(), any(), any()); + verify(listener, times(1)).onResponse(any()); + } + + @SuppressWarnings("unchecked") + public void testIndexException() throws IOException { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[2]; + + listener.onFailure(new ResourceNotFoundException(detectorId, CommonErrorMessages.NO_CHECKPOINT_ERR_MSG)); + + return null; + }).when(client).execute(any(AnomalyResultAction.class), any(), any()); + + ActionListener listener = mock(ActionListener.class); + AggregationBuilder aggregationBuilder = TestHelpers + .parseAggregation("{\"test\":{\"max\":{\"field\":\"" + MockSimpleLog.VALUE_FIELD + "\"}}}"); + Feature feature = new Feature(randomAlphaOfLength(5), randomAlphaOfLength(10), true, aggregationBuilder); + detector = TestHelpers + .randomDetector( + ImmutableList.of(feature), + "test", + 10, + MockSimpleLog.TIME_FIELD, + null, + CommonName.CUSTOM_RESULT_INDEX_PREFIX + "index" + ); + when(anomalyDetectionIndices.doesIndexExist(anyString())).thenReturn(false); + handler.startAnomalyDetectorJob(detector, listener); + verify(anomalyResultHandler, times(1)).index(any(), any(), eq(null)); + verify(threadPool, times(1)).schedule(any(), any(), any()); + } +} diff --git a/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobActionTests.java b/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobActionTests.java index cefb64f05..860347a21 100644 --- a/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobActionTests.java @@ -25,6 +25,7 @@ import org.junit.Test; import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; +import org.opensearch.ad.ExecuteADResultResponseRecorder; import org.opensearch.ad.indices.AnomalyDetectionIndices; import org.opensearch.ad.model.DetectionDateRange; import org.opensearch.ad.settings.AnomalyDetectorSettings; @@ -76,7 +77,8 @@ public void setUp() throws Exception { indexSettings(), mock(AnomalyDetectionIndices.class), xContentRegistry(), - mock(ADTaskManager.class) + mock(ADTaskManager.class), + mock(ExecuteADResultResponseRecorder.class) ); task = mock(Task.class); request = new AnomalyDetectorJobRequest("1234", 4567, 7890, "_start");