Skip to content

Commit

Permalink
Add ForecastResult and Refactor Shared Code (#941)
Browse files Browse the repository at this point in the history
This commit adds the functionality of ForecastResult, while also performing several necessary refactorings.

Firstly, we introduce ForecastResult, which closely mirrors the functionality of AnomalyResult. Any shared code between these two classes has been abstracted out into a new parent class, IndexableResult. This refactoring was undertaken to facilitate code reuse and improve maintainability, as both classes deal with storing fields that need to be saved in result indices.

Secondly, we've also extracted common code from ThresholdingResult into another new class, IntermediateResult. This work is in preparation for a forthcoming PR which will introduce RCFCasterResult. In this change, we moved the 'toAnomalyResult' method to IndexableResult, and renamed it to 'toIndexableResults'. This renaming allows for reuse in the upcoming RCFCasterResult. The updated method now returns a list of results instead of a single result. The shift caters for the RCFCasterResult use case where one RCFCasterResult is stored across multiple ForecastResult documents.

Furthermore, this commit modifies the 'entity' field in several methods to be of the Optional type, indicating that it can be empty. This change provides a clear signal to other developers about the optional nature of these fields.

In addition, this commit updates the mapping for the forecast result index. The prior mapping was a placeholder and this change brings it up to date.

We've also moved to using httpcore5 and httpclient5 versions from OpenSearch core, as necessitated by recent changes in [OpenSearch PR #8434](https://github.com/opensearch-project/OpenSearch/pull/8434/files). This shift resolves potential jar hell issues with httpcore5 and httpclient5.

Lastly, we've made name and package changes:
1. Renamed 'initDetectionStateIndex' in ADIndexManagement to 'initStateIndex' to align with ForecastIndexManagement naming.
2. Moved the 'DataByFeatureId' class to the 'timeseries' package for better organization.

This commit also completes the 'createDummyIndexRequest' method in ForecastIndexManagement by invoking 'ForecastResult.getDummyResult()', which is now possible due to the introduction of the ForecastResult implementation.

Testing done:
1. added tests for new code
2. gralde build passes
3. e2e tests

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo authored Jul 10, 2023
1 parent f24d9e3 commit a120382
Show file tree
Hide file tree
Showing 37 changed files with 1,313 additions and 479 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ configurations.all {
resolutionStrategy {
force "joda-time:joda-time:${versions.joda}"
force "commons-logging:commons-logging:${versions.commonslogging}"
force "org.apache.httpcomponents:httpcore5:${versions.httpcore5}"
force "org.apache.httpcomponents.core5:httpcore5:${versions.httpcore5}"
force "org.apache.httpcomponents.client5:httpclient5:${versions.httpclient5}"
force "commons-codec:commons-codec:${versions.commonscodec}"

force "org.mockito:mockito-core:2.25.0"
Expand Down
15 changes: 9 additions & 6 deletions src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -168,20 +169,23 @@ private List<AnomalyResult> parsePreviewResult(
AnomalyResult result;
if (results != null && results.size() > i) {
ThresholdingResult thresholdingResult = results.get(i);
result = thresholdingResult
.toAnomalyResult(
List<AnomalyResult> resultsToSave = thresholdingResult
.toIndexableResults(
detector,
Instant.ofEpochMilli(timeRange.getKey()),
Instant.ofEpochMilli(timeRange.getValue()),
null,
null,
featureDatas,
entity,
Optional.ofNullable(entity),
CommonValue.NO_SCHEMA_VERSION,
null,
null,
null
);
for (AnomalyResult r : resultsToSave) {
anomalyResults.add(r);
}
} else {
result = new AnomalyResult(
detector.getId(),
Expand All @@ -192,14 +196,13 @@ private List<AnomalyResult> parsePreviewResult(
null,
null,
null,
entity,
Optional.ofNullable(entity),
detector.getUser(),
CommonValue.NO_SCHEMA_VERSION,
null
);
anomalyResults.add(result);
}

anomalyResults.add(result);
}
}
return anomalyResults;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -282,7 +283,7 @@ public void indexAnomalyResultException(
executionStartTime,
Instant.now(),
errorMessage,
null, // single-stream detectors have no entity
Optional.empty(), // single-stream detectors have no entity
user,
anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT),
null // no model id
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/ProfileUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private static SearchRequest createRealtimeInittedEverRequest(String detectorId,
filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.ANOMALY_SCORE_FIELD).gt(0));
// Historical analysis result also stored in result index, which has non-null task_id.
// For realtime detection result, we should filter task_id == null
ExistsQueryBuilder taskIdExistsFilter = QueryBuilders.existsQuery(AnomalyResult.TASK_ID_FIELD);
ExistsQueryBuilder taskIdExistsFilter = QueryBuilders.existsQuery(CommonName.TASK_ID_FIELD);
filterQuery.mustNot(taskIdExistsFilter);

SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void migrateData() {
migrateDetectorInternalStateToRealtimeTask();
} else {
// If detection index doesn't exist, create index and backfill realtime task.
detectionIndices.initDetectionStateIndex(ActionListener.wrap(r -> {
detectionIndices.initStateIndex(ActionListener.wrap(r -> {
if (r.isAcknowledged()) {
logger.info("Created {} with mappings.", ADCommonName.DETECTION_STATE_INDEX);
migrateDetectorInternalStateToRealtimeTask();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ public void initDefaultResultIndexDirectly(ActionListener<CreateIndexResponse> a
*
* @param actionListener action called after create index
*/
public void initDetectionStateIndex(ActionListener<CreateIndexResponse> actionListener) {
@Override
public void initStateIndex(ActionListener<CreateIndexResponse> actionListener) {
try {
CreateIndexRequest request = new CreateIndexRequest(ADCommonName.DETECTION_STATE_INDEX)
.mapping(getStateMappings(), XContentType.JSON)
Expand Down
123 changes: 58 additions & 65 deletions src/main/java/org/opensearch/ad/ml/ThresholdingResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,24 @@

import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

import org.apache.commons.lang.builder.ToStringBuilder;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.timeseries.ml.IntermediateResult;
import org.opensearch.timeseries.model.Config;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.FeatureData;

/**
* Data object containing thresholding results.
*/
public class ThresholdingResult {
public class ThresholdingResult extends IntermediateResult<AnomalyResult> {

private final double grade;
private final double confidence;
private final double rcfScore;
private long totalUpdates;

/**
* position of the anomaly vis a vis the current time (can be -ve) if anomaly is
* detected late, which can and should happen sometime; for shingle size 1; this
Expand Down Expand Up @@ -135,6 +134,8 @@ public class ThresholdingResult {
// size of the forest
private int forestSize;

protected final double confidence;

/**
* Constructor for default empty value or backward compatibility.
* In terms of bwc, when an old node sends request for threshold results,
Expand Down Expand Up @@ -163,10 +164,10 @@ public ThresholdingResult(
double threshold,
int forestSize
) {
this.grade = grade;
super(totalUpdates, rcfScore);
this.confidence = confidence;
this.rcfScore = rcfScore;
this.totalUpdates = totalUpdates;
this.grade = grade;

this.relativeIndex = relativeIndex;
this.relevantAttribution = relevantAttribution;
this.pastValues = pastValues;
Expand All @@ -177,29 +178,21 @@ public ThresholdingResult(
}

/**
* Returns the anomaly grade.
* Returns the confidence for the result (e.g., anomaly grade in AD).
*
* @return the anoamly grade
* @return confidence for the result
*/
public double getGrade() {
return grade;
public double getConfidence() {
return confidence;
}

/**
* Returns the confidence for the grade.
* Returns the anomaly grade.
*
* @return confidence for the grade
* @return the anoamly grade
*/
public double getConfidence() {
return confidence;
}

public double getRcfScore() {
return rcfScore;
}

public long getTotalUpdates() {
return totalUpdates;
public double getGrade() {
return grade;
}

public int getRelativeIndex() {
Expand Down Expand Up @@ -232,32 +225,29 @@ public int getForestSize() {

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
if (!super.equals(o))
return false;
if (getClass() != o.getClass())
return false;
ThresholdingResult that = (ThresholdingResult) o;
return this.grade == that.grade
&& this.confidence == that.confidence
&& this.rcfScore == that.rcfScore
&& this.totalUpdates == that.totalUpdates
return Double.doubleToLongBits(confidence) == Double.doubleToLongBits(that.confidence)
&& Double.doubleToLongBits(this.grade) == Double.doubleToLongBits(that.grade)
&& this.relativeIndex == that.relativeIndex
&& Arrays.equals(relevantAttribution, that.relevantAttribution)
&& Arrays.equals(pastValues, that.pastValues)
&& Arrays.deepEquals(expectedValuesList, that.expectedValuesList)
&& Arrays.equals(likelihoodOfValues, that.likelihoodOfValues)
&& threshold == that.threshold
&& Double.doubleToLongBits(threshold) == Double.doubleToLongBits(that.threshold)
&& forestSize == that.forestSize;
}

@Override
public int hashCode() {
return Objects
.hash(
grade,
super.hashCode(),
confidence,
rcfScore,
totalUpdates,
grade,
relativeIndex,
Arrays.hashCode(relevantAttribution),
Arrays.hashCode(pastValues),
Expand All @@ -271,10 +261,9 @@ public int hashCode() {
@Override
public String toString() {
return new ToStringBuilder(this)
.append(super.toString())
.append("grade", grade)
.append("confidence", confidence)
.append("rcfScore", rcfScore)
.append("totalUpdates", totalUpdates)
.append("relativeIndex", relativeIndex)
.append("relevantAttribution", Arrays.toString(relevantAttribution))
.append("pastValues", Arrays.toString(pastValues))
Expand Down Expand Up @@ -302,43 +291,47 @@ public String toString() {
* @param error Error
* @return converted AnomalyResult
*/
public AnomalyResult toAnomalyResult(
AnomalyDetector detector,
@Override
public List<AnomalyResult> toIndexableResults(
Config detector,
Instant dataStartInstant,
Instant dataEndInstant,
Instant executionStartInstant,
Instant executionEndInstant,
List<FeatureData> featureData,
Entity entity,
Optional<Entity> entity,
Integer schemaVersion,
String modelId,
String taskId,
String error
) {
return AnomalyResult
.fromRawTRCFResult(
detector.getId(),
detector.getIntervalInMilliseconds(),
taskId,
rcfScore,
grade,
confidence,
featureData,
dataStartInstant,
dataEndInstant,
executionStartInstant,
executionEndInstant,
error,
entity,
detector.getUser(),
schemaVersion,
modelId,
relevantAttribution,
relativeIndex,
pastValues,
expectedValuesList,
likelihoodOfValues,
threshold
return Collections
.singletonList(
AnomalyResult
.fromRawTRCFResult(
detector.getId(),
detector.getIntervalInMilliseconds(),
taskId,
rcfScore,
grade,
confidence,
featureData,
dataStartInstant,
dataEndInstant,
executionStartInstant,
executionEndInstant,
error,
entity,
detector.getUser(),
schemaVersion,
modelId,
relevantAttribution,
relativeIndex,
pastValues,
expectedValuesList,
likelihoodOfValues,
threshold
)
);
}
}
Loading

0 comments on commit a120382

Please sign in to comment.