Skip to content

Commit

Permalink
add comment and improve logging
Browse files Browse the repository at this point in the history
Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Dec 16, 2022
1 parent 9753663 commit 2f23de0
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,16 @@ private void updateLatestRealtimeTask(
);
}

/**
* The function is not only indexing the result with the exception, but also updating the task state after
* 60s if the exception is related to cold start (index not found exceptions) for a single stream detector.
*
* @param detectionStartTime execution start time
* @param executionStartTime execution end time
* @param errorMessage Error message to record
* @param taskState AD task state (e.g., stopped)
* @param detector Detector config accessor
*/
public void indexAnomalyResultException(
Instant detectionStartTime,
Instant executionStartTime,
Expand Down Expand Up @@ -262,7 +272,7 @@ public void indexAnomalyResultException(
totalUpdates > 0 ? "" : errorMessage
);
}, e -> {
log.error("Fail to eecute RCFRollingAction", e);
log.error("Fail to execute RCFRollingAction", e);
updateLatestRealtimeTask(detectorId, taskState, null, null, errorMessage);
}));
}, new TimeValue(60, TimeUnit.SECONDS), AnomalyDetectorPlugin.AD_THREAD_POOL_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,13 @@ protected void executeRequest(EntityRequest coldStartRequest, ActionListener<Voi
nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> {
try {
if (!detectorOptional.isPresent()) {
LOG.error(new ParameterizedMessage("fail to get detector [{}]", detectorId));
LOG
.error(
new ParameterizedMessage(
"fail to load trained model [{}] to cache due to the detector not being found.",
modelState.getModelId()
)
);
return;
}
AnomalyDetector detector = detectorOptional.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public IndexAnomalyDetectorJobActionHandler(
* @param listener Listener to send responses
*/
public void startAnomalyDetectorJob(AnomalyDetector detector, ActionListener<AnomalyDetectorJobResponse> listener) {
// this start listener is created & injected throughout the job handler so that whenever the job response is received,
// there's the extra step of trying to index results and update detector state with a 60s delay.
ActionListener<AnomalyDetectorJobResponse> startListener = ActionListener.wrap(r -> {
try {
Instant executionEndTime = Instant.now();
Expand Down

0 comments on commit 2f23de0

Please sign in to comment.