Skip to content

Commit

Permalink
Speed up cold start
Browse files Browse the repository at this point in the history
If historical data is enough, a single stream detector takes 1 interval for cold start to be triggered + 1 interval for the state document to be updated. Similar to single stream detectors, HCAD cold start needs 2 intervals and one more interval to make sure an entity appears more than once. So HCAD needs three intervals to complete cold starts. Long initialization is the single most complained problem of AD. This PR reduces both single stream and HCAD detectors' initialization time to 1 minute by
* delaying real time cache update by one minute when we receive ResourceNotFoundException in single stream detectors or when the init progress of HCAD real time cache is 0. Thus, we can finish the cold start and writing checkpoint one minute later and update the state document accordingly. This optimization saves one interval to wait for the state document update.
* disable the door keeper by default so that we won't have to wait an extra interval in HCAD.
* trigger cold start when starting a real time detector. This optimization saves one interval to wait for the cold start to be triggered.

Testing done:
* verified the cold start time is reduced to 1 minute.
* added tests for new code.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Dec 9, 2022
1 parent 09b4cc1 commit a5313db
Show file tree
Hide file tree
Showing 20 changed files with 1,161 additions and 328 deletions.
373 changes: 180 additions & 193 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java

Large diffs are not rendered by default.

48 changes: 30 additions & 18 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,12 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private ClientUtil clientUtil;
private DiscoveryNodeFilterer nodeFilter;
private IndexUtils indexUtils;
private ADTaskCacheManager adTaskCacheManager;
private ADTaskManager adTaskManager;
private ADBatchTaskRunner adBatchTaskRunner;
// package private for testing
GenericObjectPool<LinkedBuffer> serializeRCFBufferPool;
private NodeStateManager stateManager;
private ExecuteADResultResponseRecorder adResultResponseRecorder;

static {
SpecialPermission.check();
Expand All @@ -259,25 +260,14 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
AnomalyIndexHandler<AnomalyResult> anomalyResultHandler = new AnomalyIndexHandler<AnomalyResult>(
client,
settings,
threadPool,
CommonName.ANOMALY_RESULT_INDEX_ALIAS,
anomalyDetectionIndices,
this.clientUtil,
this.indexUtils,
clusterService
);

AnomalyDetectorJobRunner jobRunner = AnomalyDetectorJobRunner.getJobRunnerInstance();
jobRunner.setClient(client);
jobRunner.setThreadPool(threadPool);
jobRunner.setAnomalyResultHandler(anomalyResultHandler);
jobRunner.setSettings(settings);
jobRunner.setAnomalyDetectionIndices(anomalyDetectionIndices);
jobRunner.setNodeFilter(nodeFilter);
jobRunner.setAdTaskManager(adTaskManager);
jobRunner.setNodeStateManager(stateManager);
jobRunner.setExecuteADResultResponseRecorder(adResultResponseRecorder);

RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction();
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(settings, clusterService);
Expand Down Expand Up @@ -383,7 +373,7 @@ public Collection<Object> createComponents(
adCircuitBreakerService
);

NodeStateManager stateManager = new NodeStateManager(
stateManager = new NodeStateManager(
client,
xContentRegistry,
settings,
Expand Down Expand Up @@ -568,7 +558,8 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
AnomalyDetectorSettings.QUEUE_MAINTENANCE,
entityColdStarter,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
stateManager
stateManager,
cacheProvider
);

ModelManager modelManager = new ModelManager(
Expand Down Expand Up @@ -714,7 +705,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {

anomalyDetectorRunner = new AnomalyDetectorRunner(modelManager, featureManager, AnomalyDetectorSettings.MAX_PREVIEW_RESULTS);

adTaskCacheManager = new ADTaskCacheManager(settings, clusterService, memoryTracker);
ADTaskCacheManager adTaskCacheManager = new ADTaskCacheManager(settings, clusterService, memoryTracker);
adTaskManager = new ADTaskManager(
settings,
clusterService,
Expand Down Expand Up @@ -754,6 +745,26 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {

ADSearchHandler adSearchHandler = new ADSearchHandler(settings, clusterService, client);

AnomalyIndexHandler<AnomalyResult> anomalyResultHandler = new AnomalyIndexHandler<AnomalyResult>(
client,
settings,
threadPool,
CommonName.ANOMALY_RESULT_INDEX_ALIAS,
anomalyDetectionIndices,
this.clientUtil,
this.indexUtils,
clusterService
);

adResultResponseRecorder = new ExecuteADResultResponseRecorder(
anomalyDetectionIndices,
anomalyResultHandler,
adTaskManager,
nodeFilter,
threadPool,
client
);

// return objects used by Guice to inject dependencies for e.g.,
// transport action handler constructors
return ImmutableList
Expand Down Expand Up @@ -795,7 +806,8 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
checkpointWriteQueue,
coldEntityQueue,
entityColdStarter,
adTaskCacheManager
adTaskCacheManager,
adResultResponseRecorder
);
}

Expand Down
268 changes: 268 additions & 0 deletions src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.ad;

import static org.opensearch.ad.constant.CommonErrorMessages.CAN_NOT_FIND_LATEST_TASK;

import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.ad.common.exception.EndRunException;
import org.opensearch.ad.common.exception.ResourceNotFoundException;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.DetectorProfileName;
import org.opensearch.ad.model.FeatureData;
import org.opensearch.ad.model.IntervalTimeConfiguration;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.AnomalyResultResponse;
import org.opensearch.ad.transport.ProfileAction;
import org.opensearch.ad.transport.ProfileRequest;
import org.opensearch.ad.transport.RCFPollingAction;
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.commons.authuser.User;
import org.opensearch.threadpool.ThreadPool;

public class ExecuteADResultResponseRecorder {
private static final Logger log = LogManager.getLogger(ExecuteADResultResponseRecorder.class);

private AnomalyDetectionIndices anomalyDetectionIndices;
private AnomalyIndexHandler<AnomalyResult> anomalyResultHandler;
private ADTaskManager adTaskManager;
private DiscoveryNodeFilterer nodeFilter;
private ThreadPool threadPool;
private Client client;

public ExecuteADResultResponseRecorder(
AnomalyDetectionIndices anomalyDetectionIndices,
AnomalyIndexHandler<AnomalyResult> anomalyResultHandler,
ADTaskManager adTaskManager,
DiscoveryNodeFilterer nodeFilter,
ThreadPool threadPool,
Client client
) {
this.anomalyDetectionIndices = anomalyDetectionIndices;
this.anomalyResultHandler = anomalyResultHandler;
this.adTaskManager = adTaskManager;
this.nodeFilter = nodeFilter;
this.threadPool = threadPool;
this.client = client;
}

public void indexAnomalyResult(
Instant detectionStartTime,
Instant executionStartTime,
AnomalyResultResponse response,
AnomalyDetector detector
) {
String detectorId = detector.getDetectorId();
try {
// skipping writing to the result index if not necessary
// For a single-entity detector, the result is not useful if error is null
// and rcf score (thus anomaly grade/confidence) is null.
// For a HCAD detector, we don't need to save on the detector level.
// We return 0 or Double.NaN rcf score if there is no error.
if ((response.getAnomalyScore() <= 0 || Double.isNaN(response.getAnomalyScore())) && response.getError() == null) {
updateRealtimeTask(response, detectorId);
return;
}
IntervalTimeConfiguration windowDelay = (IntervalTimeConfiguration) detector.getWindowDelay();
Instant dataStartTime = detectionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit());
Instant dataEndTime = executionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit());
User user = detector.getUser();

if (response.getError() != null) {
log.info("Anomaly result action run successfully for {} with error {}", detectorId, response.getError());
}

AnomalyResult anomalyResult = response
.toAnomalyResult(
detectorId,
dataStartTime,
dataEndTime,
executionStartTime,
Instant.now(),
anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT),
user,
response.getError()
);

String resultIndex = detector.getResultIndex();
anomalyResultHandler.index(anomalyResult, detectorId, resultIndex);
updateRealtimeTask(response, detectorId);
} catch (EndRunException e) {
throw e;
} catch (Exception e) {
log.error("Failed to index anomaly result for " + detectorId, e);
}
}

private void updateRealtimeTask(AnomalyResultResponse response, String detectorId) {
if (response.isHCDetector() != null && response.isHCDetector()) {
if (adTaskManager.skipUpdateHCRealtimeTask(detectorId, response.getError())) {
return;
}
DiscoveryNode[] dataNodes = nodeFilter.getEligibleDataNodes();
Set<DetectorProfileName> profiles = new HashSet<>();
profiles.add(DetectorProfileName.INIT_PROGRESS);
ProfileRequest profileRequest = new ProfileRequest(detectorId, profiles, true, dataNodes);
Runnable profileHCInitProgress = () -> {
client.execute(ProfileAction.INSTANCE, profileRequest, ActionListener.wrap(r -> {
log.debug("Update latest realtime task for HC detector {}, total updates: {}", detectorId, r.getTotalUpdates());
updateLatestRealtimeTask(
detectorId,
null,
r.getTotalUpdates(),
response.getDetectorIntervalInMinutes(),
response.getError()
);
}, e -> { log.error("Failed to update latest realtime task for " + detectorId, e); }));
};
if (!adTaskManager.isHCRealtimeTaskStartInitializing(detectorId)) {
// real time init progress is 0 may mean this is a newly started detector
// Delay real time cache update by one minute. If we are in init status, the delay may give the model training time to
// finish. We can change the detector running immediately instead of waiting for the next interval.
threadPool.schedule(profileHCInitProgress, new TimeValue(60, TimeUnit.SECONDS), AnomalyDetectorPlugin.AD_THREAD_POOL_NAME);
} else {
profileHCInitProgress.run();
}

} else {
log
.debug(
"Update latest realtime task for single stream detector {}, total updates: {}",
detectorId,
response.getRcfTotalUpdates()
);
updateLatestRealtimeTask(
detectorId,
null,
response.getRcfTotalUpdates(),
response.getDetectorIntervalInMinutes(),
response.getError()
);
}
}

private void updateLatestRealtimeTask(
String detectorId,
String taskState,
Long rcfTotalUpdates,
Long detectorIntervalInMinutes,
String error
) {
// Don't need info as this will be printed repeatedly in each interval
adTaskManager
.updateLatestRealtimeTaskOnCoordinatingNode(
detectorId,
taskState,
rcfTotalUpdates,
detectorIntervalInMinutes,
error,
ActionListener.wrap(r -> {
if (r != null) {
log.debug("Updated latest realtime task successfully for detector {}, taskState: {}", detectorId, taskState);
}
}, e -> {
if ((e instanceof ResourceNotFoundException) && e.getMessage().contains(CAN_NOT_FIND_LATEST_TASK)) {
// Clear realtime task cache, will recreate AD task in next run, check AnomalyResultTransportAction.
log.error("Can't find latest realtime task of detector " + detectorId);
adTaskManager.removeRealtimeTaskCache(detectorId);
} else {
log.error("Failed to update latest realtime task for detector " + detectorId, e);
}
})
);
}

public void indexAnomalyResultException(
Instant detectionStartTime,
Instant executionStartTime,
String errorMessage,
String taskState,
AnomalyDetector detector
) {
String detectorId = detector.getDetectorId();
try {
IntervalTimeConfiguration windowDelay = (IntervalTimeConfiguration) detector.getWindowDelay();
Instant dataStartTime = detectionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit());
Instant dataEndTime = executionStartTime.minus(windowDelay.getInterval(), windowDelay.getUnit());
User user = detector.getUser();

AnomalyResult anomalyResult = new AnomalyResult(
detectorId,
null, // no task id
new ArrayList<FeatureData>(),
dataStartTime,
dataEndTime,
executionStartTime,
Instant.now(),
errorMessage,
null, // single-stream detectors have no entity
user,
anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT),
null // no model id
);
String resultIndex = detector.getResultIndex();
if (resultIndex != null && !anomalyDetectionIndices.doesIndexExist(resultIndex)) {
// Set result index as null, will write exception to default result index.
anomalyResultHandler.index(anomalyResult, detectorId, null);
} else {
anomalyResultHandler.index(anomalyResult, detectorId, resultIndex);
}

if (errorMessage.contains(CommonErrorMessages.NO_CHECKPOINT_ERR_MSG) && !detector.isMultiCategoryDetector()) {
// single stream detector raises ResourceNotFoundException containing CommonErrorMessages.NO_CHECKPOINT_ERR_MSG
// when there is no checkpoint.
// Delay real time cache update by one minute so we will have trained models by then and update the state
// document accordingly.
threadPool.schedule(() -> {
RCFPollingRequest request = new RCFPollingRequest(detectorId);
client.execute(RCFPollingAction.INSTANCE, request, ActionListener.wrap(rcfPollResponse -> {
long totalUpdates = rcfPollResponse.getTotalUpdates();
// if there are updates, don't record failures
updateLatestRealtimeTask(
detectorId,
taskState,
totalUpdates,
detector.getDetectorIntervalInMinutes(),
totalUpdates > 0 ? "" : errorMessage
);
}, e -> {
log.error("Fail to eecute RCFRollingAction", e);
updateLatestRealtimeTask(detectorId, taskState, null, null, errorMessage);
}));
}, new TimeValue(60, TimeUnit.SECONDS), AnomalyDetectorPlugin.AD_THREAD_POOL_NAME);
} else {
updateLatestRealtimeTask(detectorId, taskState, null, null, errorMessage);
}

} catch (Exception e) {
log.error("Failed to index anomaly result for " + detectorId, e);
}
}

}
Loading

0 comments on commit a5313db

Please sign in to comment.