From 2f23de0bb419cc6987ae6711b0d6749266a522a9 Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Thu, 15 Dec 2022 16:29:45 -0800 Subject: [PATCH] add comment and improve logging Signed-off-by: Kaituo Li --- .../ad/ExecuteADResultResponseRecorder.java | 12 +++++++++++- .../ad/ratelimit/EntityColdStartWorker.java | 8 +++++++- .../IndexAnomalyDetectorJobActionHandler.java | 2 ++ 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java b/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java index 721eeb898..19710f0cb 100644 --- a/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java +++ b/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java @@ -208,6 +208,16 @@ private void updateLatestRealtimeTask( ); } + /** + * The function is not only indexing the result with the exception, but also updating the task state after + * 60s if the exception is related to cold start (index not found exceptions) for a single stream detector. + * + * @param detectionStartTime execution start time + * @param executionStartTime execution end time + * @param errorMessage Error message to record + * @param taskState AD task state (e.g., stopped) + * @param detector Detector config accessor + */ public void indexAnomalyResultException( Instant detectionStartTime, Instant executionStartTime, @@ -262,7 +272,7 @@ public void indexAnomalyResultException( totalUpdates > 0 ? "" : errorMessage ); }, e -> { - log.error("Fail to eecute RCFRollingAction", e); + log.error("Fail to execute RCFRollingAction", e); updateLatestRealtimeTask(detectorId, taskState, null, null, errorMessage); })); }, new TimeValue(60, TimeUnit.SECONDS), AnomalyDetectorPlugin.AD_THREAD_POOL_NAME); diff --git a/src/main/java/org/opensearch/ad/ratelimit/EntityColdStartWorker.java b/src/main/java/org/opensearch/ad/ratelimit/EntityColdStartWorker.java index b02ad314b..8702fafcc 100644 --- a/src/main/java/org/opensearch/ad/ratelimit/EntityColdStartWorker.java +++ b/src/main/java/org/opensearch/ad/ratelimit/EntityColdStartWorker.java @@ -124,7 +124,13 @@ protected void executeRequest(EntityRequest coldStartRequest, ActionListener { try { if (!detectorOptional.isPresent()) { - LOG.error(new ParameterizedMessage("fail to get detector [{}]", detectorId)); + LOG + .error( + new ParameterizedMessage( + "fail to load trained model [{}] to cache due to the detector not being found.", + modelState.getModelId() + ) + ); return; } AnomalyDetector detector = detectorOptional.get(); diff --git a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java index 2b91a0a06..31256f3ed 100644 --- a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java +++ b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java @@ -121,6 +121,8 @@ public IndexAnomalyDetectorJobActionHandler( * @param listener Listener to send responses */ public void startAnomalyDetectorJob(AnomalyDetector detector, ActionListener listener) { + // this start listener is created & injected throughout the job handler so that whenever the job response is received, + // there's the extra step of trying to index results and update detector state with a 60s delay. ActionListener startListener = ActionListener.wrap(r -> { try { Instant executionEndTime = Instant.now();