Skip to content

Commit

Permalink
Fix: Prevent resetting latest flag of real-time analysis when startin…
Browse files Browse the repository at this point in the history
…g historical analysis

This PR addresses a bug where starting a historical analysis after a real-time analysis on the same detector caused the real-time task’s latest flag to be incorrectly reset to false by the historical run.

The fix ensures that only the latest flags of the same analysis type are reset:
* Real-time analysis will only reset the latest flag of previous real-time analyses.
* Historical analysis will only reset the latest flag of previous historical analyses.

This PR also updated recencyEmphasis to have a minimum value of 2, aligning with RCF requirements.

Testing:
- Added an integration test to reproduce the bug and verified the fix.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Aug 30, 2024
1 parent 2922bbd commit 7010e06
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 47 deletions.
33 changes: 10 additions & 23 deletions src/main/java/org/opensearch/ad/task/ADTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static org.opensearch.ad.constant.ADCommonName.DETECTION_STATE_INDEX;
import static org.opensearch.ad.indices.ADIndexManagement.ALL_AD_RESULTS_INDEX_PATTERN;
import static org.opensearch.ad.model.ADTask.DETECTOR_ID_FIELD;
import static org.opensearch.ad.model.ADTaskType.ALL_HISTORICAL_TASK_TYPES;
import static org.opensearch.ad.model.ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES;
import static org.opensearch.ad.model.ADTaskType.REALTIME_TASK_TYPES;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_REQUEST_TIMEOUT;
Expand Down Expand Up @@ -1881,21 +1880,6 @@ private void maintainRunningHistoricalTask(ConcurrentLinkedQueue<ADTask> taskQue
}, TimeValue.timeValueSeconds(DEFAULT_MAINTAIN_INTERVAL_IN_SECONDS), AD_BATCH_TASK_THREAD_POOL_NAME);
}

/**
* Get list of task types.
* 1. If date range is null, will return all realtime task types
* 2. If date range is not null, will return all historical detector level tasks types
* if resetLatestTaskStateFlag is true; otherwise return all historical tasks types include
* HC entity level task type.
* @param dateRange detection date range
* @param resetLatestTaskStateFlag reset latest task state or not
* @return list of AD task types
*/
protected List<ADTaskType> getTaskTypes(DateRange dateRange, boolean resetLatestTaskStateFlag) {
// AD does not support run once
return getTaskTypes(dateRange, resetLatestTaskStateFlag, false);
}

@Override
protected BiCheckedFunction<XContentParser, String, ADTask, IOException> getTaskParser() {
return ADTask::parse;
Expand All @@ -1912,17 +1896,20 @@ public void createRunOnceTaskAndCleanupStaleTasks(
throw new UnsupportedOperationException("AD has no run once yet");
}

/**
* Get list of task types.
* 1. If date range is null, will return all realtime task types
* 2. If date range is not null, will return all historical detector level tasks types
*
* @param dateRange detection date range
* @return list of AD task types
*/
@Override
public List<ADTaskType> getTaskTypes(DateRange dateRange, boolean resetLatestTaskStateFlag, boolean runOnce) {
public List<ADTaskType> getTaskTypes(DateRange dateRange, boolean runOnce) {
if (dateRange == null) {
return REALTIME_TASK_TYPES;
} else {
if (resetLatestTaskStateFlag) {
// return all task types include HC entity task to make sure we can reset all tasks latest flag
return ALL_HISTORICAL_TASK_TYPES;
} else {
return HISTORICAL_DETECTOR_TASK_TYPES;
}
return HISTORICAL_DETECTOR_TASK_TYPES;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ public void createRunOnceTaskAndCleanupStaleTasks(
}

@Override
public List<ForecastTaskType> getTaskTypes(DateRange dateRange, boolean resetLatestTaskStateFlag, boolean runOnce) {
public List<ForecastTaskType> getTaskTypes(DateRange dateRange, boolean runOnce) {
if (runOnce) {
return ForecastTaskType.RUN_ONCE_TASK_TYPES;
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/timeseries/model/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ protected Config(
return;
}

if (recencyEmphasis != null && (recencyEmphasis <= 0)) {
if (recencyEmphasis != null && recencyEmphasis <= 1) {
issueType = ValidationIssueType.RECENCY_EMPHASIS;
errorMessage = "recency emphasis has to be a positive integer";
errorMessage = "Recency emphasis must be an integer greater than 1.";
return;
}

Expand Down
9 changes: 6 additions & 3 deletions src/main/java/org/opensearch/timeseries/task/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,10 @@ public <T> void updateLatestFlagOfOldTasksAndCreateNewTask(
query.filter(new TermQueryBuilder(configIdFieldName, config.getId()));
query.filter(new TermQueryBuilder(TimeSeriesTask.IS_LATEST_FIELD, true));
// make sure we reset all latest task as false when user switch from single entity to HC, vice versa.
query.filter(new TermsQueryBuilder(TimeSeriesTask.TASK_TYPE_FIELD, taskTypeToString(getTaskTypes(dateRange, true, runOnce))));
// Ensures that only the latest flags of the same analysis type are reset:
// Real-time analysis will only reset the latest flag of previous real-time analyses.
// Historical analysis will only reset the latest flag of previous historical analyses.
query.filter(new TermsQueryBuilder(TimeSeriesTask.TASK_TYPE_FIELD, taskTypeToString(getTaskTypes(dateRange, runOnce))));
updateByQueryRequest.setQuery(query);
updateByQueryRequest.setRefresh(true);
String script = String.format(Locale.ROOT, "ctx._source.%s=%s;", TimeSeriesTask.IS_LATEST_FIELD, false);
Expand Down Expand Up @@ -432,7 +435,7 @@ public <T> void getAndExecuteOnLatestConfigTask(
}

public List<TaskTypeEnum> getTaskTypes(DateRange dateRange) {
return getTaskTypes(dateRange, false, false);
return getTaskTypes(dateRange, false);
}

/**
Expand Down Expand Up @@ -1081,5 +1084,5 @@ public abstract void createRunOnceTaskAndCleanupStaleTasks(
ActionListener<TaskClass> listener
);

public abstract List<TaskTypeEnum> getTaskTypes(DateRange dateRange, boolean resetLatestTaskStateFlag, boolean runOnce);
public abstract List<TaskTypeEnum> getTaskTypes(DateRange dateRange, boolean runOnce);
}
103 changes: 95 additions & 8 deletions src/test/java/org/opensearch/ad/AbstractADSyntheticDataTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import java.nio.charset.Charset;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -56,20 +58,22 @@ protected static class TrainResult {
// last data time in data
public Instant finalDataTime;

public TrainResult(String detectorId, List<JsonObject> data, int rawDataTrainTestSplit, Duration windowDelay, Instant trainTime) {
public TrainResult(
String detectorId,
List<JsonObject> data,
int rawDataTrainTestSplit,
Duration windowDelay,
Instant trainTime,
String timeStampField
) {
this.detectorId = detectorId;
this.data = data;
this.rawDataTrainTestSplit = rawDataTrainTestSplit;
this.windowDelay = windowDelay;
this.trainTime = trainTime;

this.firstDataTime = getDataTime(0);
this.finalDataTime = getDataTime(data.size() - 1);
}

private Instant getDataTime(int index) {
String finalTimeStr = data.get(index).get("timestamp").getAsString();
return Instant.ofEpochMilli(Long.parseLong(finalTimeStr));
this.firstDataTime = getDataTimeOfEpochMillis(timeStampField, data, 0);
this.finalDataTime = getDataTimeOfEpochMillis(timeStampField, data, data.size() - 1);
}
}

Expand Down Expand Up @@ -689,4 +693,87 @@ public static boolean areDoublesEqual(double d1, double d2) {
public interface ConditionChecker {
boolean checkCondition(JsonArray hits, int expectedSize);
}

protected static Instant getDataTimeOfEpochMillis(String timestampField, List<JsonObject> data, int index) {
String finalTimeStr = data.get(index).get(timestampField).getAsString();
return Instant.ofEpochMilli(Long.parseLong(finalTimeStr));
}

protected static Instant getDataTimeofISOFormat(String timestampField, List<JsonObject> data, int index) {
String finalTimeStr = data.get(index).get(timestampField).getAsString();

try {
// Attempt to parse as an ISO 8601 formatted string (e.g., "2019-11-01T00:00:00Z")
ZonedDateTime zonedDateTime = ZonedDateTime.parse(finalTimeStr, DateTimeFormatter.ISO_DATE_TIME);
return zonedDateTime.toInstant();
} catch (DateTimeParseException ex) {
throw new IllegalArgumentException("Invalid timestamp format: " + finalTimeStr, ex);
}
}

protected List<JsonObject> getTasks(String detectorId, int size, ConditionChecker checker, RestClient client)
throws InterruptedException {
Request request = new Request("POST", "/_plugins/_anomaly_detection/detectors/tasks/_search");

String jsonTemplate = "{\n"
+ " \"size\": %d,\n"
+ " \"query\": {\n"
+ " \"bool\": {\n"
+ " \"filter\": [\n"
+ " {\n"
+ " \"term\": {\n"
+ " \"detector_id\": \"%s\"\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ " }\n"
+ " }\n"
+ "}";

// try to get size + 10 results if there are that many
String formattedJson = String.format(Locale.ROOT, jsonTemplate, size + 10, detectorId);

request.setJsonEntity(formattedJson);

// wait until results are available
// max wait for 60_000 milliseconds
int maxWaitCycles = 30;
do {
try {
JsonArray hits = getHits(client, request);
if (hits != null && checker.checkCondition(hits, size)) {
List<JsonObject> res = new ArrayList<>();
for (int i = 0; i < hits.size(); i++) {
JsonObject source = hits.get(i).getAsJsonObject().get("_source").getAsJsonObject();
res.add(source);
}

return res;
} else {
LOG.info("wait for result, previous result: {}, size: {}", hits, hits.size());
}
Thread.sleep(2_000 * size);
} catch (Exception e) {
LOG.warn("Exception while waiting for result", e);
Thread.sleep(2_000 * size);
}
} while (maxWaitCycles-- >= 0);

// leave some debug information before returning empty
try {
String matchAll = "{\n" + " \"size\": 1000,\n" + " \"query\": {\n" + " \"match_all\": {}\n" + " }\n" + "}";
request.setJsonEntity(matchAll);
JsonArray hits = getHits(client, request);
LOG.info("Query: {}", formattedJson);
LOG.info("match all result: {}", hits);
} catch (Exception e) {
LOG.warn("Exception while waiting for match all result", e);
}

return new ArrayList<>();
}

protected static boolean getLatest(List<JsonObject> data, int index) {
return data.get(index).get("is_latest").getAsBoolean();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ protected TrainResult ingestTrainData(
long windowDelayMinutes = Duration.between(trainTime, Instant.now()).toMinutes();

Duration windowDelay = Duration.ofMinutes(windowDelayMinutes);
return new TrainResult(null, data, rawDataTrainTestSplit, windowDelay, trainTime);
return new TrainResult(null, data, rawDataTrainTestSplit, windowDelay, trainTime, "timestamp");
}

public Map<String, List<Entry<Instant, Instant>>> getAnomalyWindowsMap(String labelFileName) throws Exception {
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/org/opensearch/ad/e2e/MissingIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ protected TrainResult createDetector(
String detectorId = createDetector(client, detector);
LOG.info("Created detector {}", detectorId);

return new TrainResult(detectorId, data, trainTestSplit * numberOfEntities, windowDelay, trainTime);
return new TrainResult(detectorId, data, trainTestSplit * numberOfEntities, windowDelay, trainTime, "timestamp");
}

protected Duration getWindowDelay(long trainTimeMillis) {
Expand Down
74 changes: 74 additions & 0 deletions src/test/java/org/opensearch/ad/e2e/MixedRealtimeHistoricalIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ad.e2e;

import java.util.List;
import java.util.Locale;

import org.opensearch.ad.AbstractADSyntheticDataTest;

import com.google.gson.JsonObject;

/**
* Test if real time and historical run together, historical won't reset real time's latest flag
*
*/
public class MixedRealtimeHistoricalIT extends AbstractADSyntheticDataTest {

public void testMixed() throws Exception {
String datasetName = "synthetic";
String dataFileName = String.format(Locale.ROOT, "data/%s.data", datasetName);
int intervalsToWait = 3;

List<JsonObject> data = getData(dataFileName);

String mapping = "{ \"mappings\": { \"properties\": { \"timestamp\": { \"type\": \"date\"},"
+ " \"Feature1\": { \"type\": \"double\" }, \"Feature2\": { \"type\": \"double\" } } } }";
int trainTestSplit = 1500;
// train data plus a few data points for real time inference
int totalDataToIngest = trainTestSplit + intervalsToWait + 3;
bulkIndexTrainData(datasetName, data, totalDataToIngest, client(), mapping);

long windowDelayMinutes = getWindowDelayMinutes(data, trainTestSplit - 1, "timestamp");
int intervalMinutes = 1;

// single-stream detector can use window delay 0 here because we give the run api the actual data time
String detector = String
.format(
Locale.ROOT,
"{ \"name\": \"test\", \"description\": \"test\", \"time_field\": \"timestamp\""
+ ", \"indices\": [\"%s\"], \"feature_attributes\": [{ \"feature_name\": \"feature 1\", \"feature_enabled\": "
+ "\"true\", \"aggregation_query\": { \"Feature1\": { \"sum\": { \"field\": \"Feature1\" } } } }, { \"feature_name\""
+ ": \"feature 2\", \"feature_enabled\": \"true\", \"aggregation_query\": { \"Feature2\": { \"sum\": { \"field\": "
+ "\"Feature2\" } } } }], \"detection_interval\": { \"period\": { \"interval\": %d, \"unit\": \"Minutes\" } }, "
+ "\"window_delay\": { \"period\": {\"interval\": %d, \"unit\": \"MINUTES\"}},"
+ "\"schema_version\": 0 }",
datasetName,
intervalMinutes,
windowDelayMinutes
);
String detectorId = createDetector(client(), detector);

startDetector(detectorId, client());

startHistorical(
detectorId,
getDataTimeofISOFormat("timestamp", data, 0),
getDataTimeofISOFormat("timestamp", data, totalDataToIngest),
client(),
1
);

int size = 2;
List<JsonObject> results = getTasks(detectorId, size, (h, eSize) -> h.size() >= eSize, client());

assertEquals(String.format(Locale.ROOT, "Expected %d, but got %d", size, results.size()), size, results.size());
for (int i = 0; i < size; i++) {
assert (getLatest(results, i));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ public void testInvalidRecency() {
null
)
);
assertEquals("recency emphasis has to be a positive integer", exception.getMessage());
assertEquals("Recency emphasis must be an integer greater than 1.", exception.getMessage());
}

public void testInvalidDetectionInterval() {
Expand Down
Loading

0 comments on commit 7010e06

Please sign in to comment.