diff --git a/build.gradle b/build.gradle index 7d467888f..b9c539f56 100644 --- a/build.gradle +++ b/build.gradle @@ -211,7 +211,8 @@ configurations.all { resolutionStrategy { force "joda-time:joda-time:${versions.joda}" force "commons-logging:commons-logging:${versions.commonslogging}" - force "org.apache.httpcomponents:httpcore5:${versions.httpcore5}" + force "org.apache.httpcomponents.core5:httpcore5:${versions.httpcore5}" + force "org.apache.httpcomponents.client5:httpclient5:${versions.httpclient5}" force "commons-codec:commons-codec:${versions.commonscodec}" force "org.mockito:mockito-core:2.25.0" diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java b/src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java index ba9b4ed08..90b7d350f 100644 --- a/src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java +++ b/src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; @@ -168,20 +169,23 @@ private List parsePreviewResult( AnomalyResult result; if (results != null && results.size() > i) { ThresholdingResult thresholdingResult = results.get(i); - result = thresholdingResult - .toAnomalyResult( + List resultsToSave = thresholdingResult + .toIndexableResults( detector, Instant.ofEpochMilli(timeRange.getKey()), Instant.ofEpochMilli(timeRange.getValue()), null, null, featureDatas, - entity, + Optional.ofNullable(entity), CommonValue.NO_SCHEMA_VERSION, null, null, null ); + for (AnomalyResult r : resultsToSave) { + anomalyResults.add(r); + } } else { result = new AnomalyResult( detector.getId(), @@ -192,14 +196,13 @@ private List parsePreviewResult( null, null, null, - entity, + Optional.ofNullable(entity), detector.getUser(), CommonValue.NO_SCHEMA_VERSION, null ); + anomalyResults.add(result); } - - anomalyResults.add(result); } } return anomalyResults; diff --git a/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java b/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java index d63343bdb..3d0f58ac7 100644 --- a/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java +++ b/src/main/java/org/opensearch/ad/ExecuteADResultResponseRecorder.java @@ -16,6 +16,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.HashSet; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -282,7 +283,7 @@ public void indexAnomalyResultException( executionStartTime, Instant.now(), errorMessage, - null, // single-stream detectors have no entity + Optional.empty(), // single-stream detectors have no entity user, anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT), null // no model id diff --git a/src/main/java/org/opensearch/ad/ProfileUtil.java b/src/main/java/org/opensearch/ad/ProfileUtil.java index 4d7563890..8afd98dc3 100644 --- a/src/main/java/org/opensearch/ad/ProfileUtil.java +++ b/src/main/java/org/opensearch/ad/ProfileUtil.java @@ -40,7 +40,7 @@ private static SearchRequest createRealtimeInittedEverRequest(String detectorId, filterQuery.filter(QueryBuilders.rangeQuery(AnomalyResult.ANOMALY_SCORE_FIELD).gt(0)); // Historical analysis result also stored in result index, which has non-null task_id. // For realtime detection result, we should filter task_id == null - ExistsQueryBuilder taskIdExistsFilter = QueryBuilders.existsQuery(AnomalyResult.TASK_ID_FIELD); + ExistsQueryBuilder taskIdExistsFilter = QueryBuilders.existsQuery(CommonName.TASK_ID_FIELD); filterQuery.mustNot(taskIdExistsFilter); SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1); diff --git a/src/main/java/org/opensearch/ad/cluster/ADDataMigrator.java b/src/main/java/org/opensearch/ad/cluster/ADDataMigrator.java index d5e0ace05..1340a8f8b 100644 --- a/src/main/java/org/opensearch/ad/cluster/ADDataMigrator.java +++ b/src/main/java/org/opensearch/ad/cluster/ADDataMigrator.java @@ -103,7 +103,7 @@ public void migrateData() { migrateDetectorInternalStateToRealtimeTask(); } else { // If detection index doesn't exist, create index and backfill realtime task. - detectionIndices.initDetectionStateIndex(ActionListener.wrap(r -> { + detectionIndices.initStateIndex(ActionListener.wrap(r -> { if (r.isAcknowledged()) { logger.info("Created {} with mappings.", ADCommonName.DETECTION_STATE_INDEX); migrateDetectorInternalStateToRealtimeTask(); diff --git a/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java b/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java index fb0d9d4d5..918107073 100644 --- a/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java +++ b/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java @@ -192,7 +192,8 @@ public void initDefaultResultIndexDirectly(ActionListener a * * @param actionListener action called after create index */ - public void initDetectionStateIndex(ActionListener actionListener) { + @Override + public void initStateIndex(ActionListener actionListener) { try { CreateIndexRequest request = new CreateIndexRequest(ADCommonName.DETECTION_STATE_INDEX) .mapping(getStateMappings(), XContentType.JSON) diff --git a/src/main/java/org/opensearch/ad/ml/ThresholdingResult.java b/src/main/java/org/opensearch/ad/ml/ThresholdingResult.java index ecef940b3..a2da03f51 100644 --- a/src/main/java/org/opensearch/ad/ml/ThresholdingResult.java +++ b/src/main/java/org/opensearch/ad/ml/ThresholdingResult.java @@ -13,25 +13,24 @@ import java.time.Instant; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Optional; import org.apache.commons.lang.builder.ToStringBuilder; -import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyResult; +import org.opensearch.timeseries.ml.IntermediateResult; +import org.opensearch.timeseries.model.Config; import org.opensearch.timeseries.model.Entity; import org.opensearch.timeseries.model.FeatureData; /** * Data object containing thresholding results. */ -public class ThresholdingResult { +public class ThresholdingResult extends IntermediateResult { private final double grade; - private final double confidence; - private final double rcfScore; - private long totalUpdates; - /** * position of the anomaly vis a vis the current time (can be -ve) if anomaly is * detected late, which can and should happen sometime; for shingle size 1; this @@ -135,6 +134,8 @@ public class ThresholdingResult { // size of the forest private int forestSize; + protected final double confidence; + /** * Constructor for default empty value or backward compatibility. * In terms of bwc, when an old node sends request for threshold results, @@ -163,10 +164,10 @@ public ThresholdingResult( double threshold, int forestSize ) { - this.grade = grade; + super(totalUpdates, rcfScore); this.confidence = confidence; - this.rcfScore = rcfScore; - this.totalUpdates = totalUpdates; + this.grade = grade; + this.relativeIndex = relativeIndex; this.relevantAttribution = relevantAttribution; this.pastValues = pastValues; @@ -177,29 +178,21 @@ public ThresholdingResult( } /** - * Returns the anomaly grade. + * Returns the confidence for the result (e.g., anomaly grade in AD). * - * @return the anoamly grade + * @return confidence for the result */ - public double getGrade() { - return grade; + public double getConfidence() { + return confidence; } /** - * Returns the confidence for the grade. + * Returns the anomaly grade. * - * @return confidence for the grade + * @return the anoamly grade */ - public double getConfidence() { - return confidence; - } - - public double getRcfScore() { - return rcfScore; - } - - public long getTotalUpdates() { - return totalUpdates; + public double getGrade() { + return grade; } public int getRelativeIndex() { @@ -232,21 +225,19 @@ public int getForestSize() { @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) + if (!super.equals(o)) + return false; + if (getClass() != o.getClass()) return false; ThresholdingResult that = (ThresholdingResult) o; - return this.grade == that.grade - && this.confidence == that.confidence - && this.rcfScore == that.rcfScore - && this.totalUpdates == that.totalUpdates + return Double.doubleToLongBits(confidence) == Double.doubleToLongBits(that.confidence) + && Double.doubleToLongBits(this.grade) == Double.doubleToLongBits(that.grade) && this.relativeIndex == that.relativeIndex && Arrays.equals(relevantAttribution, that.relevantAttribution) && Arrays.equals(pastValues, that.pastValues) && Arrays.deepEquals(expectedValuesList, that.expectedValuesList) && Arrays.equals(likelihoodOfValues, that.likelihoodOfValues) - && threshold == that.threshold + && Double.doubleToLongBits(threshold) == Double.doubleToLongBits(that.threshold) && forestSize == that.forestSize; } @@ -254,10 +245,9 @@ public boolean equals(Object o) { public int hashCode() { return Objects .hash( - grade, + super.hashCode(), confidence, - rcfScore, - totalUpdates, + grade, relativeIndex, Arrays.hashCode(relevantAttribution), Arrays.hashCode(pastValues), @@ -271,10 +261,9 @@ public int hashCode() { @Override public String toString() { return new ToStringBuilder(this) + .append(super.toString()) .append("grade", grade) .append("confidence", confidence) - .append("rcfScore", rcfScore) - .append("totalUpdates", totalUpdates) .append("relativeIndex", relativeIndex) .append("relevantAttribution", Arrays.toString(relevantAttribution)) .append("pastValues", Arrays.toString(pastValues)) @@ -302,43 +291,47 @@ public String toString() { * @param error Error * @return converted AnomalyResult */ - public AnomalyResult toAnomalyResult( - AnomalyDetector detector, + @Override + public List toIndexableResults( + Config detector, Instant dataStartInstant, Instant dataEndInstant, Instant executionStartInstant, Instant executionEndInstant, List featureData, - Entity entity, + Optional entity, Integer schemaVersion, String modelId, String taskId, String error ) { - return AnomalyResult - .fromRawTRCFResult( - detector.getId(), - detector.getIntervalInMilliseconds(), - taskId, - rcfScore, - grade, - confidence, - featureData, - dataStartInstant, - dataEndInstant, - executionStartInstant, - executionEndInstant, - error, - entity, - detector.getUser(), - schemaVersion, - modelId, - relevantAttribution, - relativeIndex, - pastValues, - expectedValuesList, - likelihoodOfValues, - threshold + return Collections + .singletonList( + AnomalyResult + .fromRawTRCFResult( + detector.getId(), + detector.getIntervalInMilliseconds(), + taskId, + rcfScore, + grade, + confidence, + featureData, + dataStartInstant, + dataEndInstant, + executionStartInstant, + executionEndInstant, + error, + entity, + detector.getUser(), + schemaVersion, + modelId, + relevantAttribution, + relativeIndex, + pastValues, + expectedValuesList, + likelihoodOfValues, + threshold + ) ); } } diff --git a/src/main/java/org/opensearch/ad/model/AnomalyResult.java b/src/main/java/org/opensearch/ad/model/AnomalyResult.java index 3d0cfdb1a..f8222651b 100644 --- a/src/main/java/org/opensearch/ad/model/AnomalyResult.java +++ b/src/main/java/org/opensearch/ad/model/AnomalyResult.java @@ -18,27 +18,28 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.ad.constant.CommonValue; import org.opensearch.ad.ml.ThresholdingResult; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.io.stream.Writeable; import org.opensearch.commons.authuser.User; import org.opensearch.core.ParseField; import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.timeseries.annotation.Generated; import org.opensearch.timeseries.constant.CommonName; +import org.opensearch.timeseries.constant.CommonValue; +import org.opensearch.timeseries.model.DataByFeatureId; import org.opensearch.timeseries.model.Entity; import org.opensearch.timeseries.model.FeatureData; +import org.opensearch.timeseries.model.IndexableResult; import org.opensearch.timeseries.util.ParseUtils; import com.google.common.base.Objects; @@ -46,7 +47,7 @@ /** * Include result returned from RCF model and feature data. */ -public class AnomalyResult implements ToXContentObject, Writeable { +public class AnomalyResult extends IndexableResult { private static final Logger LOG = LogManager.getLogger(ThresholdingResult.class); public static final String PARSE_FIELD_NAME = "AnomalyResult"; public static final NamedXContentRegistry.Entry XCONTENT_REGISTRY = new NamedXContentRegistry.Entry( @@ -58,7 +59,6 @@ public class AnomalyResult implements ToXContentObject, Writeable { public static final String DETECTOR_ID_FIELD = "detector_id"; public static final String ANOMALY_SCORE_FIELD = "anomaly_score"; public static final String ANOMALY_GRADE_FIELD = "anomaly_grade"; - public static final String TASK_ID_FIELD = "task_id"; public static final String APPROX_ANOMALY_START_FIELD = "approx_anomaly_start_time"; public static final String RELEVANT_ATTRIBUTION_FIELD = "relevant_attribution"; public static final String PAST_VALUES_FIELD = "past_values"; @@ -67,31 +67,8 @@ public class AnomalyResult implements ToXContentObject, Writeable { // unused currently. added since odfe 1.4 public static final String IS_ANOMALY_FIELD = "is_anomaly"; - private final String detectorId; - private final String taskId; private final Double anomalyScore; private final Double anomalyGrade; - private final Double confidence; - private final List featureData; - private final Instant dataStartTime; - private final Instant dataEndTime; - private final Instant executionStartTime; - private final Instant executionEndTime; - private final String error; - private final Entity entity; - private User user; - private final Integer schemaVersion; - /* - * model id for easy aggregations of entities. The front end needs to query - * for entities ordered by the descending order of anomaly grades and the - * number of anomalies. After supporting multi-category fields, it is hard - * to write such queries since the entity information is stored in a nested - * object array. Also, the front end has all code/queries/ helper functions - * in place to rely on a single key per entity combo. This PR adds model id - * to anomaly result to help the transition to multi-categorical field less - * painful. - */ - private final String modelId; /** * the approximate time of current anomaly. We might detect anomaly late. This field @@ -204,6 +181,7 @@ So if we detect anomaly late, we get the baseDimension values from the past (cur // rcf score threshold at the time of writing a result private final Double threshold; + protected final Double confidence; // used when indexing exception or error or an empty result public AnomalyResult( @@ -215,7 +193,7 @@ public AnomalyResult( Instant executionStartTime, Instant executionEndTime, String error, - Entity entity, + Optional entity, User user, Integer schemaVersion, String modelId @@ -245,7 +223,7 @@ public AnomalyResult( } public AnomalyResult( - String detectorId, + String configId, String taskId, Double anomalyScore, Double anomalyGrade, @@ -256,7 +234,7 @@ public AnomalyResult( Instant executionStartTime, Instant executionEndTime, String error, - Entity entity, + Optional entity, User user, Integer schemaVersion, String modelId, @@ -266,21 +244,23 @@ public AnomalyResult( List expectedValuesList, Double threshold ) { - this.detectorId = detectorId; - this.taskId = taskId; + super( + configId, + featureData, + dataStartTime, + dataEndTime, + executionStartTime, + executionEndTime, + error, + entity, + user, + schemaVersion, + modelId, + taskId + ); + this.confidence = confidence; this.anomalyScore = anomalyScore; this.anomalyGrade = anomalyGrade; - this.confidence = confidence; - this.featureData = featureData; - this.dataStartTime = dataStartTime; - this.dataEndTime = dataEndTime; - this.executionStartTime = executionStartTime; - this.executionEndTime = executionEndTime; - this.error = error; - this.entity = entity; - this.user = user; - this.schemaVersion = schemaVersion; - this.modelId = modelId; this.approxAnomalyStartTime = approxAnomalyStartTime; this.relevantAttribution = relevantAttribution; this.pastValues = pastValues; @@ -327,7 +307,7 @@ public static AnomalyResult fromRawTRCFResult( Instant executionStartTime, Instant executionEndTime, String error, - Entity entity, + Optional entity, User user, Integer schemaVersion, String modelId, @@ -441,34 +421,10 @@ public static AnomalyResult fromRawTRCFResult( } public AnomalyResult(StreamInput input) throws IOException { - this.detectorId = input.readString(); + super(input); + this.confidence = input.readDouble(); this.anomalyScore = input.readDouble(); this.anomalyGrade = input.readDouble(); - this.confidence = input.readDouble(); - int featureSize = input.readVInt(); - this.featureData = new ArrayList<>(featureSize); - for (int i = 0; i < featureSize; i++) { - featureData.add(new FeatureData(input)); - } - this.dataStartTime = input.readInstant(); - this.dataEndTime = input.readInstant(); - this.executionStartTime = input.readInstant(); - this.executionEndTime = input.readInstant(); - this.error = input.readOptionalString(); - if (input.readBoolean()) { - this.entity = new Entity(input); - } else { - this.entity = null; - } - if (input.readBoolean()) { - this.user = new User(input); - } else { - user = null; - } - this.schemaVersion = input.readInt(); - this.taskId = input.readOptionalString(); - this.modelId = input.readOptionalString(); - // if anomaly is caused by current input, we don't show approximate time this.approxAnomalyStartTime = input.readOptionalInstant(); @@ -509,7 +465,7 @@ public AnomalyResult(StreamInput input) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { XContentBuilder xContentBuilder = builder .startObject() - .field(DETECTOR_ID_FIELD, detectorId) + .field(DETECTOR_ID_FIELD, configId) .field(CommonName.SCHEMA_VERSION_FIELD, schemaVersion); // In normal AD result, we always pass data start/end times. In custom result index, // we need to write/delete a dummy AD result to verify if user has write permission @@ -545,14 +501,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (error != null) { xContentBuilder.field(CommonName.ERROR_FIELD, error); } - if (entity != null) { - xContentBuilder.field(CommonName.ENTITY_FIELD, entity); + if (optionalEntity.isPresent()) { + xContentBuilder.field(CommonName.ENTITY_FIELD, optionalEntity.get()); } if (user != null) { xContentBuilder.field(CommonName.USER_FIELD, user); } if (taskId != null) { - xContentBuilder.field(TASK_ID_FIELD, taskId); + xContentBuilder.field(CommonName.TASK_ID_FIELD, taskId); } if (modelId != null) { xContentBuilder.field(CommonName.MODEL_ID_FIELD, modelId); @@ -651,7 +607,7 @@ public static AnomalyResult parse(XContentParser parser) throws IOException { case CommonName.SCHEMA_VERSION_FIELD: schemaVersion = parser.intValue(); break; - case TASK_ID_FIELD: + case CommonName.TASK_ID_FIELD: taskId = parser.text(); break; case CommonName.MODEL_ID_FIELD: @@ -699,7 +655,7 @@ public static AnomalyResult parse(XContentParser parser) throws IOException { executionStartTime, executionEndTime, error, - entity, + Optional.ofNullable(entity), user, schemaVersion, modelId, @@ -714,24 +670,14 @@ public static AnomalyResult parse(XContentParser parser) throws IOException { @Generated @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) + if (!super.equals(o)) + return false; + if (getClass() != o.getClass()) return false; AnomalyResult that = (AnomalyResult) o; - return Objects.equal(detectorId, that.detectorId) - && Objects.equal(taskId, that.taskId) + return Objects.equal(confidence, that.confidence) && Objects.equal(anomalyScore, that.anomalyScore) && Objects.equal(anomalyGrade, that.anomalyGrade) - && Objects.equal(confidence, that.confidence) - && Objects.equal(featureData, that.featureData) - && Objects.equal(dataStartTime, that.dataStartTime) - && Objects.equal(dataEndTime, that.dataEndTime) - && Objects.equal(executionStartTime, that.executionStartTime) - && Objects.equal(executionEndTime, that.executionEndTime) - && Objects.equal(error, that.error) - && Objects.equal(entity, that.entity) - && Objects.equal(modelId, that.modelId) && Objects.equal(approxAnomalyStartTime, that.approxAnomalyStartTime) && Objects.equal(relevantAttribution, that.relevantAttribution) && Objects.equal(pastValues, that.pastValues) @@ -742,60 +688,45 @@ public boolean equals(Object o) { @Generated @Override public int hashCode() { - return Objects + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects .hashCode( - detectorId, - taskId, + confidence, anomalyScore, anomalyGrade, - confidence, - featureData, - dataStartTime, - dataEndTime, - executionStartTime, - executionEndTime, - error, - entity, - modelId, approxAnomalyStartTime, relevantAttribution, pastValues, expectedValuesList, threshold ); + return result; } @Generated @Override public String toString() { - return new ToStringBuilder(this) - .append("detectorId", detectorId) - .append("taskId", taskId) - .append("anomalyScore", anomalyScore) - .append("anomalyGrade", anomalyGrade) - .append("confidence", confidence) - .append("featureData", featureData) - .append("dataStartTime", dataStartTime) - .append("dataEndTime", dataEndTime) - .append("executionStartTime", executionStartTime) - .append("executionEndTime", executionEndTime) - .append("error", error) - .append("entity", entity) - .append("modelId", modelId) - .append("approAnomalyStartTime", approxAnomalyStartTime) - .append("relavantAttribution", relevantAttribution) - .append("pastValues", pastValues) - .append("expectedValuesList", StringUtils.join(expectedValuesList, "|")) - .append("threshold", threshold) - .toString(); + return super.toString() + + ", " + + new ToStringBuilder(this) + .append("confidence", confidence) + .append("anomalyScore", anomalyScore) + .append("anomalyGrade", anomalyGrade) + .append("approAnomalyStartTime", approxAnomalyStartTime) + .append("relavantAttribution", relevantAttribution) + .append("pastValues", pastValues) + .append("expectedValuesList", StringUtils.join(expectedValuesList, "|")) + .append("threshold", threshold) + .toString(); } - public String getId() { - return detectorId; + public Double getConfidence() { + return confidence; } - public String getTaskId() { - return taskId; + public String getDetectorId() { + return configId; } public Double getAnomalyScore() { @@ -806,42 +737,6 @@ public Double getAnomalyGrade() { return anomalyGrade; } - public Double getConfidence() { - return confidence; - } - - public List getFeatureData() { - return featureData; - } - - public Instant getDataStartTime() { - return dataStartTime; - } - - public Instant getDataEndTime() { - return dataEndTime; - } - - public Instant getExecutionStartTime() { - return executionStartTime; - } - - public Instant getExecutionEndTime() { - return executionEndTime; - } - - public String getError() { - return error; - } - - public Entity getEntity() { - return entity; - } - - public String getModelId() { - return modelId; - } - public Instant getApproAnomalyStartTime() { return approxAnomalyStartTime; } @@ -868,6 +763,7 @@ public Double getThreshold() { * @return whether the anomaly result is important when the anomaly grade is not 0 * or error is there. */ + @Override public boolean isHighPriority() { // AnomalyResult.toXContent won't record Double.NaN and thus make it null return (getAnomalyGrade() != null && getAnomalyGrade() > 0) || getError() != null; @@ -875,34 +771,10 @@ public boolean isHighPriority() { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeString(detectorId); + super.writeTo(out); + out.writeDouble(confidence); out.writeDouble(anomalyScore); out.writeDouble(anomalyGrade); - out.writeDouble(confidence); - out.writeVInt(featureData.size()); - for (FeatureData feature : featureData) { - feature.writeTo(out); - } - out.writeInstant(dataStartTime); - out.writeInstant(dataEndTime); - out.writeInstant(executionStartTime); - out.writeInstant(executionEndTime); - out.writeOptionalString(error); - if (entity != null) { - out.writeBoolean(true); - entity.writeTo(out); - } else { - out.writeBoolean(false); - } - if (user != null) { - out.writeBoolean(true); // user exists - user.writeTo(out); - } else { - out.writeBoolean(false); // user does not exist - } - out.writeInt(schemaVersion); - out.writeOptionalString(taskId); - out.writeOptionalString(modelId); out.writeOptionalInstant(approxAnomalyStartTime); @@ -946,7 +818,7 @@ public static AnomalyResult getDummyResult() { null, null, null, - null, + Optional.empty(), null, CommonValue.NO_SCHEMA_VERSION, null diff --git a/src/main/java/org/opensearch/ad/model/DataByFeatureId.java b/src/main/java/org/opensearch/ad/model/DataByFeatureId.java deleted file mode 100644 index ee21c7ea2..000000000 --- a/src/main/java/org/opensearch/ad/model/DataByFeatureId.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - * - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.ad.model; - -import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; - -import java.io.IOException; - -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.io.stream.Writeable; -import org.opensearch.core.xcontent.ToXContent.Params; -import org.opensearch.core.xcontent.ToXContentObject; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; - -import com.google.common.base.Objects; - -/** - * Data and its Id - * - */ -public class DataByFeatureId implements ToXContentObject, Writeable { - - public static final String FEATURE_ID_FIELD = "feature_id"; - public static final String DATA_FIELD = "data"; - - protected String featureId; - protected Double data; - - public DataByFeatureId(String featureId, Double data) { - this.featureId = featureId; - this.data = data; - } - - /* - * Used by the subclass that has its own way of initializing data like - * reading from StreamInput - */ - protected DataByFeatureId() {} - - public DataByFeatureId(StreamInput input) throws IOException { - this.featureId = input.readString(); - this.data = input.readDouble(); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - XContentBuilder xContentBuilder = builder.startObject().field(FEATURE_ID_FIELD, featureId).field(DATA_FIELD, data); - return xContentBuilder.endObject(); - } - - public static DataByFeatureId parse(XContentParser parser) throws IOException { - String featureId = null; - Double data = null; - - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); - while (parser.nextToken() != XContentParser.Token.END_OBJECT) { - String fieldName = parser.currentName(); - parser.nextToken(); - - switch (fieldName) { - case FEATURE_ID_FIELD: - featureId = parser.text(); - break; - case DATA_FIELD: - data = parser.doubleValue(); - break; - default: - // the unknown field and it's children should be ignored - parser.skipChildren(); - break; - } - } - return new DataByFeatureId(featureId, data); - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - DataByFeatureId that = (DataByFeatureId) o; - return Objects.equal(getFeatureId(), that.getFeatureId()) && Objects.equal(getData(), that.getData()); - } - - @Override - public int hashCode() { - return Objects.hashCode(getFeatureId(), getData()); - } - - public String getFeatureId() { - return featureId; - } - - public Double getData() { - return data; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(featureId); - out.writeDouble(data); - } - -} diff --git a/src/main/java/org/opensearch/ad/model/ExpectedValueList.java b/src/main/java/org/opensearch/ad/model/ExpectedValueList.java index 25fdf2e1d..bad7e956b 100644 --- a/src/main/java/org/opensearch/ad/model/ExpectedValueList.java +++ b/src/main/java/org/opensearch/ad/model/ExpectedValueList.java @@ -26,6 +26,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.timeseries.constant.CommonName; +import org.opensearch.timeseries.model.DataByFeatureId; import com.google.common.base.Objects; diff --git a/src/main/java/org/opensearch/ad/ratelimit/CheckpointReadWorker.java b/src/main/java/org/opensearch/ad/ratelimit/CheckpointReadWorker.java index dca44e173..e06d3e08e 100644 --- a/src/main/java/org/opensearch/ad/ratelimit/CheckpointReadWorker.java +++ b/src/main/java/org/opensearch/ad/ratelimit/CheckpointReadWorker.java @@ -386,31 +386,35 @@ private ActionListener> onGetDetector( } if (result != null && result.getRcfScore() > 0) { - AnomalyResult resultToSave = result - .toAnomalyResult( + RequestPriority requestPriority = result.getGrade() > 0 ? RequestPriority.HIGH : RequestPriority.MEDIUM; + + List resultsToSave = result + .toIndexableResults( detector, Instant.ofEpochMilli(origRequest.getDataStartTimeMillis()), Instant.ofEpochMilli(origRequest.getDataStartTimeMillis() + detector.getIntervalInMilliseconds()), Instant.now(), Instant.now(), ParseUtils.getFeatureData(origRequest.getCurrentFeature(), detector), - entity, + Optional.ofNullable(entity), indexUtil.getSchemaVersion(ADIndex.RESULT), modelId, null, null ); - resultWriteQueue - .put( - new ResultWriteRequest( - origRequest.getExpirationEpochMs(), - detectorId, - result.getGrade() > 0 ? RequestPriority.HIGH : RequestPriority.MEDIUM, - resultToSave, - detector.getCustomResultIndex() - ) - ); + for (AnomalyResult r : resultsToSave) { + resultWriteQueue + .put( + new ResultWriteRequest( + origRequest.getExpirationEpochMs(), + detectorId, + requestPriority, + r, + detector.getCustomResultIndex() + ) + ); + } } // try to load to cache diff --git a/src/main/java/org/opensearch/ad/ratelimit/ResultWriteWorker.java b/src/main/java/org/opensearch/ad/ratelimit/ResultWriteWorker.java index 0aa94c35a..dad1b409b 100644 --- a/src/main/java/org/opensearch/ad/ratelimit/ResultWriteWorker.java +++ b/src/main/java/org/opensearch/ad/ratelimit/ResultWriteWorker.java @@ -154,7 +154,7 @@ private void enqueueRetryRequestIteration(List requestToRetry, int return; } AnomalyResult result = resultToRetry.get(); - String detectorId = result.getId(); + String detectorId = result.getConfigId(); nodeStateManager.getAnomalyDetector(detectorId, onGetDetector(requestToRetry, index, detectorId, result)); } diff --git a/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java b/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java index d944f67ab..a30e3f14d 100644 --- a/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java +++ b/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java @@ -1104,7 +1104,7 @@ private void detectAnomaly( executeStartTime, Instant.now(), error, - adTask.getEntity(), + Optional.ofNullable(adTask.getEntity()), adTask.getDetector().getUser(), anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT), adTask.getEntityModelId() @@ -1134,7 +1134,7 @@ private void detectAnomaly( executeStartTime, Instant.now(), null, - adTask.getEntity(), + Optional.ofNullable(adTask.getEntity()), adTask.getDetector().getUser(), anomalyDetectionIndices.getSchemaVersion(ADIndex.RESULT), adTask.getEntityModelId(), diff --git a/src/main/java/org/opensearch/ad/task/ADTaskManager.java b/src/main/java/org/opensearch/ad/task/ADTaskManager.java index b79388148..bed979656 100644 --- a/src/main/java/org/opensearch/ad/task/ADTaskManager.java +++ b/src/main/java/org/opensearch/ad/task/ADTaskManager.java @@ -39,7 +39,6 @@ import static org.opensearch.ad.model.ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES; import static org.opensearch.ad.model.ADTaskType.REALTIME_TASK_TYPES; import static org.opensearch.ad.model.ADTaskType.taskTypeToString; -import static org.opensearch.ad.model.AnomalyResult.TASK_ID_FIELD; import static org.opensearch.ad.settings.AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS; import static org.opensearch.ad.settings.AnomalyDetectorSettings.DELETE_AD_RESULT_WHEN_DELETE_DETECTOR; import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE; @@ -55,6 +54,7 @@ import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.timeseries.constant.CommonMessages.CREATE_INDEX_NOT_ACKNOWLEDGED; import static org.opensearch.timeseries.constant.CommonMessages.FAIL_TO_FIND_CONFIG_MSG; +import static org.opensearch.timeseries.constant.CommonName.TASK_ID_FIELD; import static org.opensearch.timeseries.util.ParseUtils.isNullOrEmpty; import static org.opensearch.timeseries.util.RestHandlerUtils.XCONTENT_WITH_TYPE; import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry; @@ -748,7 +748,7 @@ public void startDetector( }, transportService, true, listener); } else { // If detection index doesn't exist, create index and execute detector. - detectionIndices.initDetectionStateIndex(ActionListener.wrap(r -> { + detectionIndices.initStateIndex(ActionListener.wrap(r -> { if (r.isAcknowledged()) { logger.info("Created {} with mappings.", DETECTION_STATE_INDEX); updateLatestFlagOfOldTasksAndCreateNewTask(detector, detectionDateRange, user, listener); @@ -2168,7 +2168,7 @@ private void recreateRealtimeTask(ExecutorFunction function, ActionListener { + detectionIndices.initStateIndex(ActionListener.wrap(r -> { if (r.isAcknowledged()) { logger.info("Created {} with mappings.", DETECTION_STATE_INDEX); function.execute(); diff --git a/src/main/java/org/opensearch/ad/transport/AnomalyResultResponse.java b/src/main/java/org/opensearch/ad/transport/AnomalyResultResponse.java index 24b815d59..fb23a7c40 100644 --- a/src/main/java/org/opensearch/ad/transport/AnomalyResultResponse.java +++ b/src/main/java/org/opensearch/ad/transport/AnomalyResultResponse.java @@ -18,6 +18,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import org.opensearch.action.ActionResponse; import org.opensearch.ad.model.AnomalyResult; @@ -360,7 +361,7 @@ public AnomalyResult toAnomalyResult( executionStartInstant, executionEndInstant, error, - null, + Optional.empty(), user, schemaVersion, null, // single-stream real-time has no model id diff --git a/src/main/java/org/opensearch/ad/transport/EntityResultTransportAction.java b/src/main/java/org/opensearch/ad/transport/EntityResultTransportAction.java index 021dec3b1..0927a88b1 100644 --- a/src/main/java/org/opensearch/ad/transport/EntityResultTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/EntityResultTransportAction.java @@ -211,31 +211,32 @@ private ActionListener> onGetDetector( // result.getGrade() = 0 means it is not an anomaly // So many OpenSearchRejectedExecutionException if we write no matter what if (result.getRcfScore() > 0) { - AnomalyResult resultToSave = result - .toAnomalyResult( + List resultsToSave = result + .toIndexableResults( detector, Instant.ofEpochMilli(request.getStart()), Instant.ofEpochMilli(request.getEnd()), executionStartTime, Instant.now(), ParseUtils.getFeatureData(datapoint, detector), - categoricalValues, + Optional.ofNullable(categoricalValues), indexUtil.getSchemaVersion(ADIndex.RESULT), modelId, null, null ); - - resultWriteQueue - .put( - new ResultWriteRequest( - System.currentTimeMillis() + detector.getIntervalInMilliseconds(), - detectorId, - result.getGrade() > 0 ? RequestPriority.HIGH : RequestPriority.MEDIUM, - resultToSave, - detector.getCustomResultIndex() - ) - ); + for (AnomalyResult r : resultsToSave) { + resultWriteQueue + .put( + new ResultWriteRequest( + System.currentTimeMillis() + detector.getIntervalInMilliseconds(), + detectorId, + result.getGrade() > 0 ? RequestPriority.HIGH : RequestPriority.MEDIUM, + r, + detector.getCustomResultIndex() + ) + ); + } } } catch (IllegalArgumentException e) { // fail to score likely due to model corruption. Re-cold start to recover. diff --git a/src/main/java/org/opensearch/ad/transport/SearchTopAnomalyResultTransportAction.java b/src/main/java/org/opensearch/ad/transport/SearchTopAnomalyResultTransportAction.java index f42c114e9..86ad7941a 100644 --- a/src/main/java/org/opensearch/ad/transport/SearchTopAnomalyResultTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/SearchTopAnomalyResultTransportAction.java @@ -486,11 +486,11 @@ private QueryBuilder generateQuery(SearchTopAnomalyResultRequest request) { query.filter(dateRangeFilter).filter(anomalyGradeFilter); if (request.getHistorical() == true) { - TermQueryBuilder taskIdFilter = QueryBuilders.termQuery(AnomalyResult.TASK_ID_FIELD, request.getTaskId()); + TermQueryBuilder taskIdFilter = QueryBuilders.termQuery(CommonName.TASK_ID_FIELD, request.getTaskId()); query.filter(taskIdFilter); } else { TermQueryBuilder detectorIdFilter = QueryBuilders.termQuery(AnomalyResult.DETECTOR_ID_FIELD, request.getId()); - ExistsQueryBuilder taskIdExistsFilter = QueryBuilders.existsQuery(AnomalyResult.TASK_ID_FIELD); + ExistsQueryBuilder taskIdExistsFilter = QueryBuilders.existsQuery(CommonName.TASK_ID_FIELD); query.filter(detectorIdFilter).mustNot(taskIdExistsFilter); } return query; diff --git a/src/main/java/org/opensearch/ad/transport/handler/AnomalyResultBulkIndexHandler.java b/src/main/java/org/opensearch/ad/transport/handler/AnomalyResultBulkIndexHandler.java index 1e3aeb4da..c021ead73 100644 --- a/src/main/java/org/opensearch/ad/transport/handler/AnomalyResultBulkIndexHandler.java +++ b/src/main/java/org/opensearch/ad/transport/handler/AnomalyResultBulkIndexHandler.java @@ -68,7 +68,7 @@ public void bulkIndexAnomalyResult(String resultIndex, List anoma listener.onResponse(null); return; } - String detectorId = anomalyResults.get(0).getId(); + String detectorId = anomalyResults.get(0).getConfigId(); try { if (resultIndex != null) { // Only create custom AD result index when create detector, won’t recreate custom AD result index in realtime diff --git a/src/main/java/org/opensearch/forecast/indices/ForecastIndexManagement.java b/src/main/java/org/opensearch/forecast/indices/ForecastIndexManagement.java index adddadb00..0cde7b28f 100644 --- a/src/main/java/org/opensearch/forecast/indices/ForecastIndexManagement.java +++ b/src/main/java/org/opensearch/forecast/indices/ForecastIndexManagement.java @@ -34,7 +34,10 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.forecast.constant.ForecastCommonName; +import org.opensearch.forecast.model.ForecastResult; import org.opensearch.threadpool.ThreadPool; import org.opensearch.timeseries.common.exception.EndRunException; import org.opensearch.timeseries.indices.IndexManagement; @@ -171,6 +174,7 @@ public boolean doesCheckpointIndexExist() { * * @param actionListener action called after create index */ + @Override public void initStateIndex(ActionListener actionListener) { try { CreateIndexRequest request = new CreateIndexRequest(ForecastCommonName.FORECAST_STATE_INDEX) @@ -236,8 +240,10 @@ public void initJobIndex(ActionListener actionListener) { @Override protected IndexRequest createDummyIndexRequest(String resultIndex) throws IOException { - // TODO: add real support when committing ForecastResult class - return new IndexRequest(resultIndex).id(DUMMY_FORECAST_RESULT_ID).source(XContentType.JSON, "field", "value"); + ForecastResult dummyResult = ForecastResult.getDummyResult(); + return new IndexRequest(resultIndex) + .id(DUMMY_FORECAST_RESULT_ID) + .source(dummyResult.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS)); } @Override diff --git a/src/main/java/org/opensearch/forecast/model/ForecastResult.java b/src/main/java/org/opensearch/forecast/model/ForecastResult.java new file mode 100644 index 000000000..3d1042e2c --- /dev/null +++ b/src/main/java/org/opensearch/forecast/model/ForecastResult.java @@ -0,0 +1,590 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.forecast.model; + +import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.forecast.constant.ForecastCommonName.DUMMY_FORECASTER_ID; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.lang.builder.ToStringBuilder; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.commons.authuser.User; +import org.opensearch.core.ParseField; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.forecast.constant.ForecastCommonName; +import org.opensearch.timeseries.annotation.Generated; +import org.opensearch.timeseries.constant.CommonName; +import org.opensearch.timeseries.constant.CommonValue; +import org.opensearch.timeseries.model.Entity; +import org.opensearch.timeseries.model.FeatureData; +import org.opensearch.timeseries.model.IndexableResult; +import org.opensearch.timeseries.util.ParseUtils; + +import com.google.common.base.Objects; + +/** + * Include result returned from RCF model and feature data. + */ +public class ForecastResult extends IndexableResult { + public static final String PARSE_FIELD_NAME = "ForecastResult"; + public static final NamedXContentRegistry.Entry XCONTENT_REGISTRY = new NamedXContentRegistry.Entry( + ForecastResult.class, + new ParseField(PARSE_FIELD_NAME), + it -> parse(it) + ); + + public static final String FEATURE_ID_FIELD = "feature_id"; + public static final String VALUE_FIELD = "forecast_value"; + public static final String LOWER_BOUND_FIELD = "forecast_lower_bound"; + public static final String UPPER_BOUND_FIELD = "forecast_upper_bound"; + public static final String INTERVAL_WIDTH_FIELD = "confidence_interval_width"; + public static final String FORECAST_DATA_START_TIME_FIELD = "forecast_data_start_time"; + public static final String FORECAST_DATA_END_TIME_FIELD = "forecast_data_end_time"; + public static final String HORIZON_INDEX_FIELD = "horizon_index"; + + private final String featureId; + private final Float forecastValue; + private final Float lowerBound; + private final Float upperBound; + private final Float confidenceIntervalWidth; + private final Instant forecastDataStartTime; + private final Instant forecastDataEndTime; + private final Integer horizonIndex; + protected final Double dataQuality; + + // used when indexing exception or error or an empty result + public ForecastResult( + String forecasterId, + String taskId, + List featureData, + Instant dataStartTime, + Instant dataEndTime, + Instant executionStartTime, + Instant executionEndTime, + String error, + Optional entity, + User user, + Integer schemaVersion, + String modelId + ) { + this( + forecasterId, + taskId, + Double.NaN, + featureData, + dataStartTime, + dataEndTime, + executionStartTime, + executionEndTime, + error, + entity, + user, + schemaVersion, + modelId, + null, + null, + null, + null, + null, + null, + null + ); + } + + public ForecastResult( + String forecasterId, + String taskId, + Double dataQuality, + List featureData, + Instant dataStartTime, + Instant dataEndTime, + Instant executionStartTime, + Instant executionEndTime, + String error, + Optional entity, + User user, + Integer schemaVersion, + String modelId, + String featureId, + Float forecastValue, + Float lowerBound, + Float upperBound, + Instant forecastDataStartTime, + Instant forecastDataEndTime, + Integer horizonIndex + ) { + super( + forecasterId, + featureData, + dataStartTime, + dataEndTime, + executionStartTime, + executionEndTime, + error, + entity, + user, + schemaVersion, + modelId, + taskId + ); + this.featureId = featureId; + this.dataQuality = dataQuality; + this.forecastValue = forecastValue; + this.lowerBound = lowerBound; + this.upperBound = upperBound; + this.confidenceIntervalWidth = lowerBound != null && upperBound != null ? Math.abs(upperBound - lowerBound) : Float.NaN; + this.forecastDataStartTime = forecastDataStartTime; + this.forecastDataEndTime = forecastDataEndTime; + this.horizonIndex = horizonIndex; + } + + public static List fromRawRCFCasterResult( + String forecasterId, + long intervalMillis, + Double dataQuality, + List featureData, + Instant dataStartTime, + Instant dataEndTime, + Instant executionStartTime, + Instant executionEndTime, + String error, + Optional entity, + User user, + Integer schemaVersion, + String modelId, + float[] forecastsValues, + float[] forecastsUppers, + float[] forecastsLowers, + String taskId + ) { + int inputLength = featureData.size(); + int numberOfForecasts = forecastsValues.length / inputLength; + + List convertedForecastValues = new ArrayList<>(numberOfForecasts); + + // store feature data and forecast value separately for easy query on feature data + // we can join them using forecasterId, entityId, and executionStartTime/executionEndTime + convertedForecastValues + .add( + new ForecastResult( + forecasterId, + taskId, + dataQuality, + featureData, + dataStartTime, + dataEndTime, + executionStartTime, + executionEndTime, + error, + entity, + user, + schemaVersion, + modelId, + null, + null, + null, + null, + null, + null, + -1 + ) + ); + Instant forecastDataStartTime = dataEndTime; + + for (int i = 0; i < numberOfForecasts; i++) { + Instant forecastDataEndTime = forecastDataStartTime.plusMillis(intervalMillis); + for (int j = 0; j < inputLength; j++) { + int k = i * inputLength + j; + convertedForecastValues + .add( + new ForecastResult( + forecasterId, + taskId, + dataQuality, + null, + null, + null, + executionStartTime, + executionEndTime, + error, + entity, + user, + schemaVersion, + modelId, + featureData.get(j).getFeatureId(), + forecastsValues[k], + forecastsLowers[k], + forecastsUppers[k], + forecastDataStartTime, + forecastDataEndTime, + i + ) + ); + } + forecastDataStartTime = forecastDataEndTime; + } + + return convertedForecastValues; + } + + public ForecastResult(StreamInput input) throws IOException { + super(input); + this.featureId = input.readOptionalString(); + this.dataQuality = input.readOptionalDouble(); + this.forecastValue = input.readOptionalFloat(); + this.lowerBound = input.readOptionalFloat(); + this.upperBound = input.readOptionalFloat(); + this.confidenceIntervalWidth = input.readOptionalFloat(); + this.forecastDataStartTime = input.readOptionalInstant(); + this.forecastDataEndTime = input.readOptionalInstant(); + this.horizonIndex = input.readOptionalInt(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + XContentBuilder xContentBuilder = builder + .startObject() + .field(ForecastCommonName.FORECASTER_ID_KEY, configId) + .field(CommonName.SCHEMA_VERSION_FIELD, schemaVersion); + + if (dataStartTime != null) { + xContentBuilder.field(CommonName.DATA_START_TIME_FIELD, dataStartTime.toEpochMilli()); + } + if (dataEndTime != null) { + xContentBuilder.field(CommonName.DATA_END_TIME_FIELD, dataEndTime.toEpochMilli()); + } + if (featureData != null) { + // can be null during preview + xContentBuilder.field(CommonName.FEATURE_DATA_FIELD, featureData.toArray()); + } + if (executionStartTime != null) { + // can be null during preview + xContentBuilder.field(CommonName.EXECUTION_START_TIME_FIELD, executionStartTime.toEpochMilli()); + } + if (executionEndTime != null) { + // can be null during preview + xContentBuilder.field(CommonName.EXECUTION_END_TIME_FIELD, executionEndTime.toEpochMilli()); + } + if (error != null) { + xContentBuilder.field(CommonName.ERROR_FIELD, error); + } + if (optionalEntity.isPresent()) { + xContentBuilder.field(CommonName.ENTITY_FIELD, optionalEntity.get()); + } + if (user != null) { + xContentBuilder.field(CommonName.USER_FIELD, user); + } + if (modelId != null) { + xContentBuilder.field(CommonName.MODEL_ID_FIELD, modelId); + } + if (dataQuality != null && !dataQuality.isNaN()) { + xContentBuilder.field(CommonName.DATA_QUALITY_FIELD, dataQuality); + } + if (taskId != null) { + xContentBuilder.field(CommonName.TASK_ID_FIELD, taskId); + } + if (entityId != null) { + xContentBuilder.field(CommonName.ENTITY_ID_FIELD, entityId); + } + if (forecastValue != null) { + xContentBuilder.field(VALUE_FIELD, forecastValue); + } + if (lowerBound != null) { + xContentBuilder.field(LOWER_BOUND_FIELD, lowerBound); + } + if (upperBound != null) { + xContentBuilder.field(UPPER_BOUND_FIELD, upperBound); + } + if (forecastDataStartTime != null) { + xContentBuilder.field(FORECAST_DATA_START_TIME_FIELD, forecastDataStartTime.toEpochMilli()); + } + if (forecastDataEndTime != null) { + xContentBuilder.field(FORECAST_DATA_END_TIME_FIELD, forecastDataEndTime.toEpochMilli()); + } + if (horizonIndex != null) { + xContentBuilder.field(HORIZON_INDEX_FIELD, horizonIndex); + } + if (featureId != null) { + xContentBuilder.field(FEATURE_ID_FIELD, featureId); + } + + return xContentBuilder.endObject(); + } + + public static ForecastResult parse(XContentParser parser) throws IOException { + String forecasterId = null; + Double dataQuality = null; + List featureData = null; + Instant dataStartTime = null; + Instant dataEndTime = null; + Instant executionStartTime = null; + Instant executionEndTime = null; + String error = null; + Entity entity = null; + User user = null; + Integer schemaVersion = CommonValue.NO_SCHEMA_VERSION; + String modelId = null; + String taskId = null; + + String featureId = null; + Float forecastValue = null; + Float lowerBound = null; + Float upperBound = null; + Instant forecastDataStartTime = null; + Instant forecastDataEndTime = null; + Integer horizonIndex = null; + + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + + switch (fieldName) { + case ForecastCommonName.FORECASTER_ID_KEY: + forecasterId = parser.text(); + break; + case CommonName.DATA_QUALITY_FIELD: + dataQuality = parser.doubleValue(); + break; + case CommonName.FEATURE_DATA_FIELD: + ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.currentToken(), parser); + featureData = new ArrayList<>(); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + featureData.add(FeatureData.parse(parser)); + } + break; + case CommonName.DATA_START_TIME_FIELD: + dataStartTime = ParseUtils.toInstant(parser); + break; + case CommonName.DATA_END_TIME_FIELD: + dataEndTime = ParseUtils.toInstant(parser); + break; + case CommonName.EXECUTION_START_TIME_FIELD: + executionStartTime = ParseUtils.toInstant(parser); + break; + case CommonName.EXECUTION_END_TIME_FIELD: + executionEndTime = ParseUtils.toInstant(parser); + break; + case CommonName.ERROR_FIELD: + error = parser.text(); + break; + case CommonName.ENTITY_FIELD: + entity = Entity.parse(parser); + break; + case CommonName.USER_FIELD: + user = User.parse(parser); + break; + case CommonName.SCHEMA_VERSION_FIELD: + schemaVersion = parser.intValue(); + break; + case CommonName.MODEL_ID_FIELD: + modelId = parser.text(); + break; + case FEATURE_ID_FIELD: + featureId = parser.text(); + break; + case LOWER_BOUND_FIELD: + lowerBound = parser.floatValue(); + break; + case UPPER_BOUND_FIELD: + upperBound = parser.floatValue(); + break; + case VALUE_FIELD: + forecastValue = parser.floatValue(); + break; + case FORECAST_DATA_START_TIME_FIELD: + forecastDataStartTime = ParseUtils.toInstant(parser); + break; + case FORECAST_DATA_END_TIME_FIELD: + forecastDataEndTime = ParseUtils.toInstant(parser); + break; + case CommonName.TASK_ID_FIELD: + taskId = parser.text(); + break; + case HORIZON_INDEX_FIELD: + horizonIndex = parser.intValue(); + break; + default: + parser.skipChildren(); + break; + } + } + + return new ForecastResult( + forecasterId, + taskId, + dataQuality, + featureData, + dataStartTime, + dataEndTime, + executionStartTime, + executionEndTime, + error, + Optional.ofNullable(entity), + user, + schemaVersion, + modelId, + featureId, + forecastValue, + lowerBound, + upperBound, + forecastDataStartTime, + forecastDataEndTime, + horizonIndex + ); + } + + @Generated + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + if (!super.equals(o)) + return false; + ForecastResult that = (ForecastResult) o; + return Objects.equal(featureId, that.featureId) + && Objects.equal(dataQuality, that.dataQuality) + && Objects.equal(forecastValue, that.forecastValue) + && Objects.equal(lowerBound, that.lowerBound) + && Objects.equal(upperBound, that.upperBound) + && Objects.equal(confidenceIntervalWidth, that.confidenceIntervalWidth) + && Objects.equal(forecastDataStartTime, that.forecastDataStartTime) + && Objects.equal(forecastDataEndTime, that.forecastDataEndTime) + && Objects.equal(horizonIndex, that.horizonIndex); + } + + @Generated + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Objects + .hashCode( + featureId, + dataQuality, + forecastValue, + lowerBound, + upperBound, + confidenceIntervalWidth, + forecastDataStartTime, + forecastDataEndTime, + horizonIndex + ); + return result; + } + + @Generated + @Override + public String toString() { + return super.toString() + + ", " + + new ToStringBuilder(this) + .append("featureId", featureId) + .append("dataQuality", dataQuality) + .append("forecastValue", forecastValue) + .append("lowerBound", lowerBound) + .append("upperBound", upperBound) + .append("confidenceIntervalWidth", confidenceIntervalWidth) + .append("forecastDataStartTime", forecastDataStartTime) + .append("forecastDataEndTime", forecastDataEndTime) + .append("horizonIndex", horizonIndex) + .toString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + + out.writeOptionalString(featureId); + out.writeOptionalDouble(dataQuality); + out.writeOptionalFloat(forecastValue); + out.writeOptionalFloat(lowerBound); + out.writeOptionalFloat(upperBound); + out.writeOptionalFloat(confidenceIntervalWidth); + out.writeOptionalInstant(forecastDataStartTime); + out.writeOptionalInstant(forecastDataEndTime); + out.writeOptionalInt(horizonIndex); + } + + public static ForecastResult getDummyResult() { + return new ForecastResult( + DUMMY_FORECASTER_ID, + null, + null, + null, + null, + null, + null, + null, + Optional.empty(), + null, + CommonValue.NO_SCHEMA_VERSION, + null + ); + } + + /** + * Used to throw away requests when index pressure is high. + * @return when the error is there. + */ + @Override + public boolean isHighPriority() { + // AnomalyResult.toXContent won't record Double.NaN and thus make it null + return getError() != null; + } + + public Double getDataQuality() { + return dataQuality; + } + + public String getFeatureId() { + return featureId; + } + + public Float getForecastValue() { + return forecastValue; + } + + public Float getLowerBound() { + return lowerBound; + } + + public Float getUpperBound() { + return upperBound; + } + + public Float getConfidenceIntervalWidth() { + return confidenceIntervalWidth; + } + + public Instant getForecastDataStartTime() { + return forecastDataStartTime; + } + + public Instant getForecastDataEndTime() { + return forecastDataEndTime; + } + + public Integer getHorizonIndex() { + return horizonIndex; + } +} diff --git a/src/main/java/org/opensearch/timeseries/constant/CommonName.java b/src/main/java/org/opensearch/timeseries/constant/CommonName.java index 8ee60b421..129378ceb 100644 --- a/src/main/java/org/opensearch/timeseries/constant/CommonName.java +++ b/src/main/java/org/opensearch/timeseries/constant/CommonName.java @@ -40,6 +40,7 @@ public class CommonName { // Validation // ====================================== public static final String MODEL_ASPECT = "model"; + public static final String CONFIG_ID_MISSING_MSG = "config ID is missing"; // ====================================== // Used for custom forecast result index @@ -64,6 +65,7 @@ public class CommonName { public static final String ENTITY_FIELD = "entity"; public static final String USER_FIELD = "user"; public static final String CONFIDENCE_FIELD = "confidence"; + public static final String DATA_QUALITY_FIELD = "data_quality"; // MODEL_ID_FIELD can be used in profile and stats API as well public static final String MODEL_ID_FIELD = "model_id"; public static final String TIMESTAMP = "timestamp"; @@ -75,13 +77,16 @@ public class CommonName { // current key for entity samples public static final String ENTITY_SAMPLE_QUEUE = "samples"; - public static final String FORECASTER_ID_FIELD = "forecaster_id"; - // ====================================== // Profile name // ====================================== public static final String MODEL_SIZE_IN_BYTES = "model_size_in_bytes"; + // ====================================== + // Used for backward-compatibility in messaging + // ====================================== + public static final String EMPTY_FIELD = ""; + // ====================================== // Query // ====================================== @@ -93,4 +98,12 @@ public class CommonName { public static final String DATE_HISTOGRAM = "date_histogram"; // feature aggregation name public static final String FEATURE_AGGS = "feature_aggs"; + + // ====================================== + // Used in toXContent + // ====================================== + public static final String CONFIG_ID_KEY = "config_id"; + public static final String MODEL_ID_KEY = "model_id"; + public static final String TASK_ID_FIELD = "task_id"; + public static final String ENTITY_ID_FIELD = "entity_id"; } diff --git a/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java b/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java index 7040f2ff8..5fefd6415 100644 --- a/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java +++ b/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java @@ -74,7 +74,6 @@ import org.opensearch.timeseries.constant.CommonName; import org.opensearch.timeseries.constant.CommonValue; import org.opensearch.timeseries.function.ExecutorFunction; -import org.opensearch.timeseries.indices.IndexManagement.IndexState; import org.opensearch.timeseries.settings.TimeSeriesSettings; import org.opensearch.timeseries.util.DiscoveryNodeFilterer; @@ -983,4 +982,6 @@ protected void initResultIndexDirectly( protected abstract void rolloverAndDeleteHistoryIndex(); public abstract void initCustomResultIndexDirectly(String resultIndex, ActionListener actionListener); + + public abstract void initStateIndex(ActionListener actionListener); } diff --git a/src/main/java/org/opensearch/timeseries/ml/IntermediateResult.java b/src/main/java/org/opensearch/timeseries/ml/IntermediateResult.java new file mode 100644 index 000000000..9a8704842 --- /dev/null +++ b/src/main/java/org/opensearch/timeseries/ml/IntermediateResult.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.timeseries.ml; + +import java.time.Instant; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +import org.opensearch.timeseries.model.Config; +import org.opensearch.timeseries.model.Entity; +import org.opensearch.timeseries.model.FeatureData; +import org.opensearch.timeseries.model.IndexableResult; + +public abstract class IntermediateResult { + protected final long totalUpdates; + protected final double rcfScore; + + public IntermediateResult(long totalUpdates, double rcfScore) { + this.totalUpdates = totalUpdates; + this.rcfScore = rcfScore; + } + + public long getTotalUpdates() { + return totalUpdates; + } + + public double getRcfScore() { + return rcfScore; + } + + @Override + public int hashCode() { + return Objects.hash(totalUpdates); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + IntermediateResult other = (IntermediateResult) obj; + return totalUpdates == other.totalUpdates && Double.doubleToLongBits(rcfScore) == Double.doubleToLongBits(other.rcfScore); + } + + /** + * convert intermediateResult into 1+ indexable results. + * @param config Config accessor + * @param dataStartInstant data start time + * @param dataEndInstant data end time + * @param executionStartInstant execution start time + * @param executionEndInstant execution end time + * @param featureData feature data + * @param entity entity info + * @param schemaVersion schema version + * @param modelId Model id + * @param taskId Task id + * @param error Error + * @return 1+ indexable results + */ + public abstract List toIndexableResults( + Config config, + Instant dataStartInstant, + Instant dataEndInstant, + Instant executionStartInstant, + Instant executionEndInstant, + List featureData, + Optional entity, + Integer schemaVersion, + String modelId, + String taskId, + String error + ); +} diff --git a/src/main/java/org/opensearch/timeseries/model/IndexableResult.java b/src/main/java/org/opensearch/timeseries/model/IndexableResult.java new file mode 100644 index 000000000..d6186ba63 --- /dev/null +++ b/src/main/java/org/opensearch/timeseries/model/IndexableResult.java @@ -0,0 +1,258 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.timeseries.model; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.lang.builder.ToStringBuilder; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.commons.authuser.User; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.timeseries.annotation.Generated; + +import com.google.common.base.Objects; + +public abstract class IndexableResult implements Writeable, ToXContentObject { + + protected final String configId; + protected final List featureData; + protected final Instant dataStartTime; + protected final Instant dataEndTime; + protected final Instant executionStartTime; + protected final Instant executionEndTime; + protected final String error; + protected final Optional optionalEntity; + protected User user; + protected final Integer schemaVersion; + /* + * model id for easy aggregations of entities. The front end needs to query + * for entities ordered by the descending/ascending order of feature values. + * After supporting multi-category fields, it is hard to write such queries + * since the entity information is stored in a nested object array. + * Also, the front end has all code/queries/ helper functions in place to + * rely on a single key per entity combo. Adding model id to forecast result + * to help the transition to multi-categorical field less painful. + */ + protected final String modelId; + protected final String entityId; + protected final String taskId; + + public IndexableResult( + String configId, + List featureData, + Instant dataStartTime, + Instant dataEndTime, + Instant executionStartTime, + Instant executionEndTime, + String error, + Optional entity, + User user, + Integer schemaVersion, + String modelId, + String taskId + ) { + this.configId = configId; + this.featureData = featureData; + this.dataStartTime = dataStartTime; + this.dataEndTime = dataEndTime; + this.executionStartTime = executionStartTime; + this.executionEndTime = executionEndTime; + this.error = error; + this.optionalEntity = entity; + this.user = user; + this.schemaVersion = schemaVersion; + this.modelId = modelId; + this.taskId = taskId; + this.entityId = getEntityId(entity, configId); + } + + public IndexableResult(StreamInput input) throws IOException { + this.configId = input.readString(); + int featureSize = input.readVInt(); + this.featureData = new ArrayList<>(featureSize); + for (int i = 0; i < featureSize; i++) { + featureData.add(new FeatureData(input)); + } + this.dataStartTime = input.readInstant(); + this.dataEndTime = input.readInstant(); + this.executionStartTime = input.readInstant(); + this.executionEndTime = input.readInstant(); + this.error = input.readOptionalString(); + if (input.readBoolean()) { + this.optionalEntity = Optional.of(new Entity(input)); + } else { + this.optionalEntity = Optional.empty(); + } + if (input.readBoolean()) { + this.user = new User(input); + } else { + user = null; + } + this.schemaVersion = input.readInt(); + this.modelId = input.readOptionalString(); + this.taskId = input.readOptionalString(); + this.entityId = input.readOptionalString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(configId); + out.writeVInt(featureData.size()); + for (FeatureData feature : featureData) { + feature.writeTo(out); + } + out.writeInstant(dataStartTime); + out.writeInstant(dataEndTime); + out.writeInstant(executionStartTime); + out.writeInstant(executionEndTime); + out.writeOptionalString(error); + if (optionalEntity.isPresent()) { + out.writeBoolean(true); + optionalEntity.get().writeTo(out); + } else { + out.writeBoolean(false); + } + if (user != null) { + out.writeBoolean(true); // user exists + user.writeTo(out); + } else { + out.writeBoolean(false); // user does not exist + } + out.writeInt(schemaVersion); + out.writeOptionalString(modelId); + out.writeOptionalString(taskId); + out.writeOptionalString(entityId); + } + + public String getConfigId() { + return configId; + } + + public List getFeatureData() { + return featureData; + } + + public Instant getDataStartTime() { + return dataStartTime; + } + + public Instant getDataEndTime() { + return dataEndTime; + } + + public Instant getExecutionStartTime() { + return executionStartTime; + } + + public Instant getExecutionEndTime() { + return executionEndTime; + } + + public String getError() { + return error; + } + + public Optional getEntity() { + return optionalEntity; + } + + public String getModelId() { + return modelId; + } + + public String getTaskId() { + return taskId; + } + + public String getEntityId() { + return entityId; + } + + /** + * entityId equals to model Id. It is hard to explain to users what + * modelId is. entityId is more user friendly. + * @param entity Entity info + * @param configId config id + * @return entity id + */ + public static String getEntityId(Optional entity, String configId) { + return entity.flatMap(e -> e.getModelId(configId)).orElse(null); + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + IndexableResult that = (IndexableResult) o; + return Objects.equal(configId, that.configId) + && Objects.equal(taskId, that.taskId) + && Objects.equal(featureData, that.featureData) + && Objects.equal(dataStartTime, that.dataStartTime) + && Objects.equal(dataEndTime, that.dataEndTime) + && Objects.equal(executionStartTime, that.executionStartTime) + && Objects.equal(executionEndTime, that.executionEndTime) + && Objects.equal(error, that.error) + && Objects.equal(optionalEntity, that.optionalEntity) + && Objects.equal(modelId, that.modelId) + && Objects.equal(entityId, that.entityId); + } + + @Generated + @Override + public int hashCode() { + return Objects + .hashCode( + configId, + taskId, + featureData, + dataStartTime, + dataEndTime, + executionStartTime, + executionEndTime, + error, + optionalEntity, + modelId, + entityId + ); + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("configId", configId) + .append("taskId", taskId) + .append("featureData", featureData) + .append("dataStartTime", dataStartTime) + .append("dataEndTime", dataEndTime) + .append("executionStartTime", executionStartTime) + .append("executionEndTime", executionEndTime) + .append("error", error) + .append("entity", optionalEntity) + .append("modelId", modelId) + .append("entityId", entityId) + .toString(); + } + + /** + * Used to throw away requests when index pressure is high. + * @return whether the result is high priority. + */ + public abstract boolean isHighPriority(); +} diff --git a/src/main/resources/mappings/forecast-results.json b/src/main/resources/mappings/forecast-results.json index 124fded11..745d308ad 100644 --- a/src/main/resources/mappings/forecast-results.json +++ b/src/main/resources/mappings/forecast-results.json @@ -18,6 +18,9 @@ } } }, + "data_quality": { + "type": "double" + }, "data_start_time": { "type": "date", "format": "strict_date_time||epoch_millis" @@ -95,30 +98,34 @@ "model_id": { "type": "keyword" }, - "forecast_series": { - "type": "nested", - "properties": { - "lower_bound": { - "type": "double" - }, - "upper_bound": { - "type": "double" - }, - "feature_id": { - "type": "keyword" - }, - "value": { - "type": "double" - }, - "data_start_time": { - "type": "date", - "format": "strict_date_time||epoch_millis" - }, - "data_end_time": { - "type": "date", - "format": "strict_date_time||epoch_millis" - } - } - } + "entity_id": { + "type": "keyword" + }, + "forecast_lower_bound": { + "type": "double" + }, + "forecast_upper_bound": { + "type": "double" + }, + "confidence_interval_width": { + "type": "double" + }, + "forecast_value": { + "type": "double" + }, + "horizon_index": { + "type": "integer" + }, + "forecast_data_start_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "forecast_data_end_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "feature_id": { + "type": "keyword" + } } } diff --git a/src/test/java/org/opensearch/ad/cluster/ADDataMigratorTests.java b/src/test/java/org/opensearch/ad/cluster/ADDataMigratorTests.java index dcfc14298..1b2bcde4a 100644 --- a/src/test/java/org/opensearch/ad/cluster/ADDataMigratorTests.java +++ b/src/test/java/org/opensearch/ad/cluster/ADDataMigratorTests.java @@ -125,7 +125,7 @@ public void testMigrateDataWithInitingDetectionStateIndexFailure() { ActionListener listener = invocation.getArgument(0); listener.onFailure(new RuntimeException("test")); return null; - }).when(detectionIndices).initDetectionStateIndex(any()); + }).when(detectionIndices).initStateIndex(any()); doAnswer(invocation -> { ActionListener listener = invocation.getArgument(1); @@ -145,7 +145,7 @@ public void testMigrateDataWithInitingDetectionStateIndexAlreadyExists() { ActionListener listener = invocation.getArgument(0); listener.onFailure(new ResourceAlreadyExistsException("test")); return null; - }).when(detectionIndices).initDetectionStateIndex(any()); + }).when(detectionIndices).initStateIndex(any()); doAnswer(invocation -> { ActionListener listener = invocation.getArgument(1); @@ -165,7 +165,7 @@ public void testMigrateDataWithInitingDetectionStateIndexNotAcknowledged() { ActionListener listener = invocation.getArgument(0); listener.onResponse(new CreateIndexResponse(false, false, DETECTION_STATE_INDEX)); return null; - }).when(detectionIndices).initDetectionStateIndex(any()); + }).when(detectionIndices).initStateIndex(any()); doAnswer(invocation -> { ActionListener listener = invocation.getArgument(1); @@ -185,7 +185,7 @@ public void testMigrateDataWithInitingDetectionStateIndexAcknowledged() { ActionListener listener = invocation.getArgument(0); listener.onResponse(new CreateIndexResponse(true, false, DETECTION_STATE_INDEX)); return null; - }).when(detectionIndices).initDetectionStateIndex(any()); + }).when(detectionIndices).initStateIndex(any()); doAnswer(invocation -> { ActionListener listener = invocation.getArgument(1); diff --git a/src/test/java/org/opensearch/ad/indices/CustomIndexTests.java b/src/test/java/org/opensearch/ad/indices/CustomIndexTests.java index 9e4741c1b..53bea9015 100644 --- a/src/test/java/org/opensearch/ad/indices/CustomIndexTests.java +++ b/src/test/java/org/opensearch/ad/indices/CustomIndexTests.java @@ -190,7 +190,7 @@ private Map createMapping() { mappings.put(CommonName.SCHEMA_VERSION_FIELD, Collections.singletonMap("type", "integer")); - mappings.put(AnomalyResult.TASK_ID_FIELD, Collections.singletonMap("type", "keyword")); + mappings.put(CommonName.TASK_ID_FIELD, Collections.singletonMap("type", "keyword")); mappings.put(AnomalyResult.THRESHOLD_FIELD, Collections.singletonMap("type", "double")); diff --git a/src/test/java/org/opensearch/ad/indices/InitAnomalyDetectionIndicesTests.java b/src/test/java/org/opensearch/ad/indices/InitAnomalyDetectionIndicesTests.java index 686710e9d..9e3c169c2 100644 --- a/src/test/java/org/opensearch/ad/indices/InitAnomalyDetectionIndicesTests.java +++ b/src/test/java/org/opensearch/ad/indices/InitAnomalyDetectionIndicesTests.java @@ -123,7 +123,7 @@ private void fixedPrimaryShardsIndexCreationTemplate(String index) throws IOExce if (index.equals(CommonName.CONFIG_INDEX)) { adIndices.initConfigIndexIfAbsent(listener); } else { - adIndices.initDetectionStateIndex(listener); + adIndices.initStateIndex(listener); } ArgumentCaptor captor = ArgumentCaptor.forClass(CreateIndexResponse.class); @@ -184,7 +184,7 @@ private void adaptivePrimaryShardsIndexCreationTemplate(String index) throws IOE if (index.equals(CommonName.CONFIG_INDEX)) { adIndices.initConfigIndexIfAbsent(listener); } else if (index.equals(ADCommonName.DETECTION_STATE_INDEX)) { - adIndices.initDetectionStateIndex(listener); + adIndices.initStateIndex(listener); } else if (index.equals(ADCommonName.CHECKPOINT_INDEX_NAME)) { adIndices.initCheckpointIndex(listener); } else if (index.equals(CommonName.JOB_INDEX)) { diff --git a/src/test/java/org/opensearch/ad/ml/ThresholdingResultTests.java b/src/test/java/org/opensearch/ad/ml/ThresholdingResultTests.java index 0daddea0d..492bbec45 100644 --- a/src/test/java/org/opensearch/ad/ml/ThresholdingResultTests.java +++ b/src/test/java/org/opensearch/ad/ml/ThresholdingResultTests.java @@ -18,6 +18,7 @@ import org.junit.Test; import org.junit.runner.RunWith; +import org.opensearch.ad.model.AnomalyResult; @RunWith(JUnitParamsRunner.class) public class ThresholdingResultTests { @@ -36,6 +37,9 @@ public void getters_returnExcepted() { private Object[] equalsData() { return new Object[] { + new Object[] { thresholdingResult, thresholdingResult, true }, + new Object[] { thresholdingResult, null, false }, + new Object[] { thresholdingResult, AnomalyResult.getDummyResult(), false }, new Object[] { thresholdingResult, null, false }, new Object[] { thresholdingResult, thresholdingResult, true }, new Object[] { thresholdingResult, 1, false }, diff --git a/src/test/java/org/opensearch/ad/model/AnomalyResultTests.java b/src/test/java/org/opensearch/ad/model/AnomalyResultTests.java index 59cc71861..e5c2e7b41 100644 --- a/src/test/java/org/opensearch/ad/model/AnomalyResultTests.java +++ b/src/test/java/org/opensearch/ad/model/AnomalyResultTests.java @@ -70,7 +70,7 @@ public void testParseAnomalyDetectorWithoutNormalResult() throws IOException { .replaceFirst("\\{", String.format(Locale.ROOT, "{\"%s\":\"%s\",", randomAlphaOfLength(5), randomAlphaOfLength(5))); AnomalyResult parsedDetectResult = AnomalyResult.parse(TestHelpers.parser(detectResultString)); assertTrue( - Objects.equal(detectResult.getId(), parsedDetectResult.getId()) + Objects.equal(detectResult.getConfigId(), parsedDetectResult.getConfigId()) && Objects.equal(detectResult.getTaskId(), parsedDetectResult.getTaskId()) && Objects.equal(detectResult.getAnomalyScore(), parsedDetectResult.getAnomalyScore()) && Objects.equal(detectResult.getAnomalyGrade(), parsedDetectResult.getAnomalyGrade()) @@ -95,7 +95,7 @@ public void testParseAnomalyDetectorWithNanAnomalyResult() throws IOException { assertNull(parsedDetectResult.getAnomalyGrade()); assertNull(parsedDetectResult.getAnomalyScore()); assertTrue( - Objects.equal(detectResult.getId(), parsedDetectResult.getId()) + Objects.equal(detectResult.getConfigId(), parsedDetectResult.getConfigId()) && Objects.equal(detectResult.getTaskId(), parsedDetectResult.getTaskId()) && Objects.equal(detectResult.getFeatureData(), parsedDetectResult.getFeatureData()) && Objects.equal(detectResult.getDataStartTime(), parsedDetectResult.getDataStartTime()) diff --git a/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java b/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java index 76a174956..f520439a3 100644 --- a/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java +++ b/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java @@ -636,7 +636,7 @@ public void testSearchAnomalyResult() throws Exception { ); assertEquals("Post anomaly result failed", RestStatus.CREATED, TestHelpers.restStatus(response)); - SearchSourceBuilder search = (new SearchSourceBuilder()).query(QueryBuilders.termQuery("detector_id", anomalyResult.getId())); + SearchSourceBuilder search = (new SearchSourceBuilder()).query(QueryBuilders.termQuery("detector_id", anomalyResult.getConfigId())); updateClusterSettings(ADEnabledSetting.AD_ENABLED, false); diff --git a/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java b/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java index cf24daea5..f8b5dcfc6 100644 --- a/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java +++ b/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java @@ -328,7 +328,7 @@ public void testCreateTaskIndexNotAcknowledged() throws IOException { ActionListener listener = invocation.getArgument(0); listener.onResponse(new CreateIndexResponse(false, false, ANOMALY_RESULT_INDEX_ALIAS)); return null; - }).when(detectionIndices).initDetectionStateIndex(any()); + }).when(detectionIndices).initStateIndex(any()); doReturn(false).when(detectionIndices).doesStateIndexExist(); AnomalyDetector detector = randomDetector(ImmutableList.of(randomFeature(true)), randomAlphaOfLength(5), 1, randomAlphaOfLength(5)); setupGetDetector(detector); @@ -344,7 +344,7 @@ public void testCreateTaskIndexWithResourceAlreadyExistsException() throws IOExc ActionListener listener = invocation.getArgument(0); listener.onFailure(new ResourceAlreadyExistsException("index created")); return null; - }).when(detectionIndices).initDetectionStateIndex(any()); + }).when(detectionIndices).initStateIndex(any()); doReturn(false).when(detectionIndices).doesStateIndexExist(); AnomalyDetector detector = randomDetector(ImmutableList.of(randomFeature(true)), randomAlphaOfLength(5), 1, randomAlphaOfLength(5)); setupGetDetector(detector); @@ -359,7 +359,7 @@ public void testCreateTaskIndexWithException() throws IOException { ActionListener listener = invocation.getArgument(0); listener.onFailure(new RuntimeException(error)); return null; - }).when(detectionIndices).initDetectionStateIndex(any()); + }).when(detectionIndices).initStateIndex(any()); doReturn(false).when(detectionIndices).doesStateIndexExist(); AnomalyDetector detector = randomDetector(ImmutableList.of(randomFeature(true)), randomAlphaOfLength(5), 1, randomAlphaOfLength(5)); setupGetDetector(detector); @@ -1459,7 +1459,7 @@ public void testStartDetectorWithException() throws IOException { User user = null; ActionListener listener = mock(ActionListener.class); when(detectionIndices.doesStateIndexExist()).thenReturn(false); - doThrow(new RuntimeException("test")).when(detectionIndices).initDetectionStateIndex(any()); + doThrow(new RuntimeException("test")).when(detectionIndices).initStateIndex(any()); adTaskManager.startDetector(detector, detectionDateRange, user, transportService, listener); verify(listener, times(1)).onFailure(any()); } diff --git a/src/test/java/org/opensearch/ad/transport/handler/AnomalyResultBulkIndexHandlerTests.java b/src/test/java/org/opensearch/ad/transport/handler/AnomalyResultBulkIndexHandlerTests.java index 0c6ff4727..771088893 100644 --- a/src/test/java/org/opensearch/ad/transport/handler/AnomalyResultBulkIndexHandlerTests.java +++ b/src/test/java/org/opensearch/ad/transport/handler/AnomalyResultBulkIndexHandlerTests.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.time.Clock; +import java.util.Optional; import org.opensearch.action.ActionListener; import org.opensearch.action.DocWriteRequest; @@ -98,7 +99,7 @@ public void testNullAnomalyResults() { public void testAnomalyResultBulkIndexHandler_IndexNotExist() { when(anomalyDetectionIndices.doesIndexExist("testIndex")).thenReturn(false); AnomalyResult anomalyResult = mock(AnomalyResult.class); - when(anomalyResult.getId()).thenReturn("testId"); + when(anomalyResult.getConfigId()).thenReturn("testId"); bulkIndexHandler.bulkIndexAnomalyResult("testIndex", ImmutableList.of(anomalyResult), listener); verify(listener, times(1)).onFailure(exceptionCaptor.capture()); @@ -109,7 +110,7 @@ public void testAnomalyResultBulkIndexHandler_InValidResultIndexMapping() { when(anomalyDetectionIndices.doesIndexExist("testIndex")).thenReturn(true); when(anomalyDetectionIndices.isValidResultIndexMapping("testIndex")).thenReturn(false); AnomalyResult anomalyResult = mock(AnomalyResult.class); - when(anomalyResult.getId()).thenReturn("testId"); + when(anomalyResult.getConfigId()).thenReturn("testId"); bulkIndexHandler.bulkIndexAnomalyResult("testIndex", ImmutableList.of(anomalyResult), listener); verify(listener, times(1)).onFailure(exceptionCaptor.capture()); @@ -120,7 +121,7 @@ public void testAnomalyResultBulkIndexHandler_FailBulkIndexAnomaly() throws IOEx when(anomalyDetectionIndices.doesIndexExist("testIndex")).thenReturn(true); when(anomalyDetectionIndices.isValidResultIndexMapping("testIndex")).thenReturn(true); AnomalyResult anomalyResult = mock(AnomalyResult.class); - when(anomalyResult.getId()).thenReturn("testId"); + when(anomalyResult.getConfigId()).thenReturn("testId"); when(anomalyResult.toXContent(any(), any())).thenThrow(new RuntimeException()); bulkIndexHandler.bulkIndexAnomalyResult("testIndex", ImmutableList.of(anomalyResult), listener); @@ -203,7 +204,7 @@ private AnomalyResult wrongAnomalyResult() { null, null, randomAlphaOfLength(5), - null, + Optional.empty(), null, null, null, diff --git a/src/test/java/org/opensearch/forecast/indices/ForecastIndexManagementTests.java b/src/test/java/org/opensearch/forecast/indices/ForecastIndexManagementTests.java index f7cbb96ea..366db213a 100644 --- a/src/test/java/org/opensearch/forecast/indices/ForecastIndexManagementTests.java +++ b/src/test/java/org/opensearch/forecast/indices/ForecastIndexManagementTests.java @@ -31,24 +31,24 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.index.IndexNotFoundException; import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.timeseries.TestHelpers; import org.opensearch.timeseries.function.ExecutorFunction; import org.opensearch.timeseries.indices.IndexManagementIntegTestCase; import org.opensearch.timeseries.settings.TimeSeriesSettings; import org.opensearch.timeseries.util.DiscoveryNodeFilterer; -/** - * Inherit from OpenSearchIntegTestCase we need methods like client() and nodePlugins(). - * Have to name this to Tests instead of IT. Otherwise, we have - * to run it using integTest and there are quite a few errors like - * "java.lang.IllegalArgumentException: Cannot run TEST scope test with tests.cluster". - * - */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, supportsDedicatedMasters = false) public class ForecastIndexManagementTests extends IndexManagementIntegTestCase { private ForecastIndexManagement indices; private Settings settings; private DiscoveryNodeFilterer nodeFilter; + @Override + protected boolean ignoreExternalCluster() { + return true; + } + // help register setting using AnomalyDetectorPlugin.getSettings. Otherwise, AnomalyDetectionIndices's constructor would fail due to // unregistered settings like AD_RESULT_HISTORY_MAX_DOCS. @Override @@ -66,6 +66,9 @@ public void setup() throws IOException { .put("plugins.forecast.request_timeout", TimeValue.timeValueSeconds(10)) .build(); + internalCluster().ensureAtLeastNumDataNodes(1); + ensureStableCluster(1); + nodeFilter = new DiscoveryNodeFilterer(clusterService()); indices = new ForecastIndexManagement( diff --git a/src/test/java/org/opensearch/forecast/indices/ForecastIndexMappingTests.java b/src/test/java/org/opensearch/forecast/indices/ForecastIndexMappingTests.java index 915f7419c..a79eda373 100644 --- a/src/test/java/org/opensearch/forecast/indices/ForecastIndexMappingTests.java +++ b/src/test/java/org/opensearch/forecast/indices/ForecastIndexMappingTests.java @@ -31,7 +31,7 @@ public void testGetForecastResultMappings() throws IOException { assertTrue("schema_version field is missing", mappingJson.path("properties").has("schema_version")); assertTrue("task_id field is missing", mappingJson.path("properties").has("task_id")); assertTrue("model_id field is missing", mappingJson.path("properties").has("model_id")); - assertTrue("forecast_series field is missing", mappingJson.path("properties").has("forecast_series")); + assertTrue("forecast_series field is missing", mappingJson.path("properties").has("forecast_value")); } public void testGetCheckpointMappings() throws IOException { diff --git a/src/test/java/org/opensearch/forecast/model/ForecastResultTests.java b/src/test/java/org/opensearch/forecast/model/ForecastResultTests.java new file mode 100644 index 000000000..8e7c4e17a --- /dev/null +++ b/src/test/java/org/opensearch/forecast/model/ForecastResultTests.java @@ -0,0 +1,103 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.forecast.model; + +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import org.junit.Before; +import org.opensearch.commons.authuser.User; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.timeseries.TestHelpers; +import org.opensearch.timeseries.model.Entity; +import org.opensearch.timeseries.model.FeatureData; + +public class ForecastResultTests extends OpenSearchTestCase { + List result; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + // Arrange + String forecasterId = "testId"; + long intervalMillis = 1000; + Double dataQuality = 0.9; + List featureData = new ArrayList<>(); + featureData.add(new FeatureData("f1", "f1", 1.0d)); + featureData.add(new FeatureData("f2", "f2", 2.0d)); + long currentTimeMillis = System.currentTimeMillis(); + Instant instantFromMillis = Instant.ofEpochMilli(currentTimeMillis); + Instant dataStartTime = instantFromMillis; + Instant dataEndTime = dataStartTime.plusSeconds(10); + Instant executionStartTime = instantFromMillis; + Instant executionEndTime = executionStartTime.plusSeconds(10); + String error = null; + Optional entity = Optional.empty(); + User user = new User("testUser", Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + Integer schemaVersion = 1; + String modelId = "testModelId"; + float[] forecastsValues = new float[] { 1.0f, 2.0f, 3.0f, 4.0f }; + float[] forecastsUppers = new float[] { 1.5f, 2.5f, 3.5f, 4.5f }; + float[] forecastsLowers = new float[] { 0.5f, 1.5f, 2.5f, 3.5f }; + String taskId = "testTaskId"; + + // Act + result = ForecastResult + .fromRawRCFCasterResult( + forecasterId, + intervalMillis, + dataQuality, + featureData, + dataStartTime, + dataEndTime, + executionStartTime, + executionEndTime, + error, + entity, + user, + schemaVersion, + modelId, + forecastsValues, + forecastsUppers, + forecastsLowers, + taskId + ); + } + + public void testFromRawRCFCasterResult() { + // Assert + assertEquals(5, result.size()); + assertEquals("f1", result.get(1).getFeatureId()); + assertEquals(1.0f, result.get(1).getForecastValue(), 0.01); + assertEquals("f2", result.get(2).getFeatureId()); + assertEquals(2.0f, result.get(2).getForecastValue(), 0.01); + + assertTrue( + "actual: " + result.toString(), + result + .toString() + .contains( + "featureId=f2,dataQuality=0.9,forecastValue=2.0,lowerBound=1.5,upperBound=2.5,confidenceIntervalWidth=1.0,forecastDataStartTime=" + ) + ); + } + + public void testParseAnomalyDetector() throws IOException { + for (int i = 0; i < 5; i++) { + String forecastResultString = TestHelpers + .xContentBuilderToString(result.get(i).toXContent(TestHelpers.builder(), ToXContent.EMPTY_PARAMS)); + ForecastResult parsedForecastResult = ForecastResult.parse(TestHelpers.parser(forecastResultString)); + assertEquals("Parsing forecast result doesn't work", result.get(i), parsedForecastResult); + assertTrue("Parsing forecast result doesn't work", result.get(i).hashCode() == parsedForecastResult.hashCode()); + } + } +} diff --git a/src/test/java/org/opensearch/timeseries/TestHelpers.java b/src/test/java/org/opensearch/timeseries/TestHelpers.java index 0eb65538a..609e079e6 100644 --- a/src/test/java/org/opensearch/timeseries/TestHelpers.java +++ b/src/test/java/org/opensearch/timeseries/TestHelpers.java @@ -74,7 +74,6 @@ import org.opensearch.ad.model.AnomalyDetectorJob; import org.opensearch.ad.model.AnomalyResult; import org.opensearch.ad.model.AnomalyResultBucket; -import org.opensearch.ad.model.DataByFeatureId; import org.opensearch.ad.model.DetectorInternalState; import org.opensearch.ad.model.DetectorValidationIssue; import org.opensearch.ad.model.ExpectedValueList; @@ -138,6 +137,7 @@ import org.opensearch.timeseries.constant.CommonName; import org.opensearch.timeseries.dataprocessor.ImputationMethod; import org.opensearch.timeseries.dataprocessor.ImputationOption; +import org.opensearch.timeseries.model.DataByFeatureId; import org.opensearch.timeseries.model.DateRange; import org.opensearch.timeseries.model.Entity; import org.opensearch.timeseries.model.Feature; @@ -870,7 +870,7 @@ public static AnomalyResult randomAnomalyDetectResult(double score, String error Instant.now().truncatedTo(ChronoUnit.SECONDS), Instant.now().truncatedTo(ChronoUnit.SECONDS), error, - null, + Optional.empty(), user, CommonValue.NO_SCHEMA_VERSION, null, @@ -950,8 +950,8 @@ public static AnomalyResult randomHCADAnomalyDetectResult( endTimeEpochMillis == null ? Instant.now().truncatedTo(ChronoUnit.SECONDS) : Instant.ofEpochMilli(endTimeEpochMillis), error, entityAttrs == null - ? Entity.createSingleAttributeEntity(randomAlphaOfLength(5), randomAlphaOfLength(5)) - : Entity.createEntityByReordering(entityAttrs), + ? Optional.ofNullable(Entity.createSingleAttributeEntity(randomAlphaOfLength(5), randomAlphaOfLength(5))) + : Optional.ofNullable(Entity.createEntityByReordering(entityAttrs)), randomUser(), CommonValue.NO_SCHEMA_VERSION, null,