Skip to content

Commit

Permalink
Speed up cold start (#753) (#763)
Browse files Browse the repository at this point in the history
If historical data is enough, a single stream detector takes 1 interval for cold start to be triggered + 1 interval for the state document to be updated. Similar to single stream detectors, HCAD cold start needs 2 intervals and one more interval to make sure an entity appears more than once. So HCAD needs three intervals to complete cold starts. Long initialization is the single most complained problem of AD. This PR reduces both single stream and HCAD detectors' initialization time to 1 minute by
* delaying real time cache update by one minute when we receive ResourceNotFoundException in single stream detectors or when the init progress of HCAD real time cache is 0. Thus, we can finish the cold start and writing checkpoint one minute later and update the state document accordingly. This optimization saves one interval to wait for the state document update.
* disable the door keeper by default so that we won't have to wait an extra interval in HCAD.
* trigger cold start when starting a real time detector. This optimization saves one interval to wait for the cold start to be triggered.

Testing done:
* verified the cold start time is reduced to 1 minute.
* added tests for new code.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo authored Dec 16, 2022
1 parent 021f92f commit 8ce4493
Show file tree
Hide file tree
Showing 21 changed files with 1,193 additions and 332 deletions.
8 changes: 4 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ configurations.all {
if (it.state != Configuration.State.UNRESOLVED) return
resolutionStrategy {
force "joda-time:joda-time:${versions.joda}"
force "com.fasterxml.jackson.core:jackson-core:2.14.0"
force "com.fasterxml.jackson.core:jackson-core:2.14.1"
force "commons-logging:commons-logging:${versions.commonslogging}"
force "org.apache.httpcomponents:httpcore:${versions.httpcore}"
force "commons-codec:commons-codec:${versions.commonscodec}"
Expand Down Expand Up @@ -677,9 +677,9 @@ dependencies {
implementation 'software.amazon.randomcutforest:randomcutforest-core:3.0-rc3'

// force Jackson version to avoid version conflict issue
implementation "com.fasterxml.jackson.core:jackson-core:2.14.0"
implementation "com.fasterxml.jackson.core:jackson-databind:2.14.0"
implementation "com.fasterxml.jackson.core:jackson-annotations:2.14.0"
implementation "com.fasterxml.jackson.core:jackson-core:2.14.1"
implementation "com.fasterxml.jackson.core:jackson-databind:2.14.1"
implementation "com.fasterxml.jackson.core:jackson-annotations:2.14.1"

// used for serializing/deserializing rcf models.
implementation group: 'io.protostuff', name: 'protostuff-core', version: '1.8.0'
Expand Down
373 changes: 180 additions & 193 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java

Large diffs are not rendered by default.

48 changes: 30 additions & 18 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,12 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
private ClientUtil clientUtil;
private DiscoveryNodeFilterer nodeFilter;
private IndexUtils indexUtils;
private ADTaskCacheManager adTaskCacheManager;
private ADTaskManager adTaskManager;
private ADBatchTaskRunner adBatchTaskRunner;
// package private for testing
GenericObjectPool<LinkedBuffer> serializeRCFBufferPool;
private NodeStateManager stateManager;
private ExecuteADResultResponseRecorder adResultResponseRecorder;

static {
SpecialPermission.check();
Expand All @@ -259,25 +260,14 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
AnomalyIndexHandler<AnomalyResult> anomalyResultHandler = new AnomalyIndexHandler<AnomalyResult>(
client,
settings,
threadPool,
CommonName.ANOMALY_RESULT_INDEX_ALIAS,
anomalyDetectionIndices,
this.clientUtil,
this.indexUtils,
clusterService
);

AnomalyDetectorJobRunner jobRunner = AnomalyDetectorJobRunner.getJobRunnerInstance();
jobRunner.setClient(client);
jobRunner.setThreadPool(threadPool);
jobRunner.setAnomalyResultHandler(anomalyResultHandler);
jobRunner.setSettings(settings);
jobRunner.setAnomalyDetectionIndices(anomalyDetectionIndices);
jobRunner.setNodeFilter(nodeFilter);
jobRunner.setAdTaskManager(adTaskManager);
jobRunner.setNodeStateManager(stateManager);
jobRunner.setExecuteADResultResponseRecorder(adResultResponseRecorder);

RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction();
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(settings, clusterService);
Expand Down Expand Up @@ -383,7 +373,7 @@ public Collection<Object> createComponents(
adCircuitBreakerService
);

NodeStateManager stateManager = new NodeStateManager(
stateManager = new NodeStateManager(
client,
xContentRegistry,
settings,
Expand Down Expand Up @@ -568,7 +558,8 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
AnomalyDetectorSettings.QUEUE_MAINTENANCE,
entityColdStarter,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
stateManager
stateManager,
cacheProvider
);

ModelManager modelManager = new ModelManager(
Expand Down Expand Up @@ -714,7 +705,7 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {

anomalyDetectorRunner = new AnomalyDetectorRunner(modelManager, featureManager, AnomalyDetectorSettings.MAX_PREVIEW_RESULTS);

adTaskCacheManager = new ADTaskCacheManager(settings, clusterService, memoryTracker);
ADTaskCacheManager adTaskCacheManager = new ADTaskCacheManager(settings, clusterService, memoryTracker);
adTaskManager = new ADTaskManager(
settings,
clusterService,
Expand Down Expand Up @@ -754,6 +745,26 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {

ADSearchHandler adSearchHandler = new ADSearchHandler(settings, clusterService, client);

AnomalyIndexHandler<AnomalyResult> anomalyResultHandler = new AnomalyIndexHandler<AnomalyResult>(
client,
settings,
threadPool,
CommonName.ANOMALY_RESULT_INDEX_ALIAS,
anomalyDetectionIndices,
this.clientUtil,
this.indexUtils,
clusterService
);

adResultResponseRecorder = new ExecuteADResultResponseRecorder(
anomalyDetectionIndices,
anomalyResultHandler,
adTaskManager,
nodeFilter,
threadPool,
client
);

// return objects used by Guice to inject dependencies for e.g.,
// transport action handler constructors
return ImmutableList
Expand Down Expand Up @@ -795,7 +806,8 @@ public PooledObject<LinkedBuffer> wrap(LinkedBuffer obj) {
checkpointWriteQueue,
coldEntityQueue,
entityColdStarter,
adTaskCacheManager
adTaskCacheManager,
adResultResponseRecorder
);
}

Expand Down
Loading

0 comments on commit 8ce4493

Please sign in to comment.