Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support forecast tasks in profile API; enable index field modifications #1316

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -696,15 +696,11 @@ List<String> jacocoExclusions = [

// TODO: add test coverage (kaituo)
'org.opensearch.forecast.*',
'org.opensearch.ad.transport.ADHCImputeNodeResponse',
'org.opensearch.ad.transport.GetAnomalyDetectorTransportAction',
'org.opensearch.timeseries.transport.BooleanNodeResponse',
'org.opensearch.timeseries.ml.TimeSeriesSingleStreamCheckpointDao',
'org.opensearch.timeseries.transport.JobRequest',
'org.opensearch.timeseries.transport.handler.ResultBulkIndexingHandler',
'org.opensearch.timeseries.ml.Inferencer',
'org.opensearch.timeseries.transport.SingleStreamResultRequest',
'org.opensearch.timeseries.transport.BooleanResponse',
'org.opensearch.timeseries.rest.handler.IndexJobActionHandler.1',
'org.opensearch.timeseries.transport.SuggestConfigParamResponse',
'org.opensearch.timeseries.transport.SuggestConfigParamRequest',
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/opensearch/ad/model/ADTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ public static ADTask parse(XContentParser parser, String taskId) throws IOExcept
detector.getCustomResultIndexMinSize(),
detector.getCustomResultIndexMinAge(),
detector.getCustomResultIndexTTL(),
detector.getFlattenResultIndexMapping()
detector.getFlattenResultIndexMapping(),
detector.getLastBreakingUIChangeTime()
);
return new Builder()
.taskId(parsedTaskId)
Expand Down
17 changes: 14 additions & 3 deletions src/main/java/org/opensearch/ad/model/AnomalyDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ public Integer getShingleSize(Integer customShingleSize) {
* @param customResultIndexMinAge custom result index lifecycle management min age condition
* @param customResultIndexTTL custom result index lifecycle management ttl
* @param flattenResultIndexMapping flag to indicate whether to flatten result index mapping or not
* @param lastBreakingUIChangeTime last update time to configuration that can break UI and we have
* to display updates from the changed time
*/
public AnomalyDetector(
String detectorId,
Expand Down Expand Up @@ -178,7 +180,8 @@ public AnomalyDetector(
Integer customResultIndexMinSize,
Integer customResultIndexMinAge,
Integer customResultIndexTTL,
Boolean flattenResultIndexMapping
Boolean flattenResultIndexMapping,
Instant lastBreakingUIChangeTime
) {
super(
detectorId,
Expand Down Expand Up @@ -206,7 +209,8 @@ public AnomalyDetector(
customResultIndexMinSize,
customResultIndexMinAge,
customResultIndexTTL,
flattenResultIndexMapping
flattenResultIndexMapping,
lastBreakingUIChangeTime
);

checkAndThrowValidationErrors(ValidationAspect.DETECTOR);
Expand Down Expand Up @@ -284,6 +288,7 @@ public AnomalyDetector(StreamInput input) throws IOException {
this.customResultIndexMinAge = input.readOptionalInt();
this.customResultIndexTTL = input.readOptionalInt();
this.flattenResultIndexMapping = input.readOptionalBoolean();
this.lastUIBreakingChangeTime = input.readOptionalInstant();
}

public XContentBuilder toXContent(XContentBuilder builder) throws IOException {
Expand Down Expand Up @@ -350,6 +355,7 @@ public void writeTo(StreamOutput output) throws IOException {
output.writeOptionalInt(customResultIndexMinAge);
output.writeOptionalInt(customResultIndexTTL);
output.writeOptionalBoolean(flattenResultIndexMapping);
output.writeOptionalInstant(lastUIBreakingChangeTime);
}

@Override
Expand Down Expand Up @@ -447,6 +453,7 @@ public static AnomalyDetector parse(
Integer customResultIndexMinAge = null;
Integer customResultIndexTTL = null;
Boolean flattenResultIndexMapping = null;
Instant lastBreakingUIChangeTime = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -584,6 +591,9 @@ public static AnomalyDetector parse(
case FLATTEN_RESULT_INDEX_MAPPING:
flattenResultIndexMapping = onlyParseBooleanValue(parser);
break;
case BREAKING_UI_CHANGE_TIME:
lastBreakingUIChangeTime = ParseUtils.toInstant(parser);
break;
default:
parser.skipChildren();
break;
Expand Down Expand Up @@ -615,7 +625,8 @@ public static AnomalyDetector parse(
customResultIndexMinSize,
customResultIndexMinAge,
customResultIndexTTL,
flattenResultIndexMapping
flattenResultIndexMapping,
lastBreakingUIChangeTime
);
detector.setDetectionDateRange(detectionDateRange);
return detector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
: WriteRequest.RefreshPolicy.IMMEDIATE;
RestRequest.Method method = request.getHttpRequest().method();

if (method == RestRequest.Method.POST && detectorId != AnomalyDetector.NO_ID) {
// reset detector to empty string detectorId is only meant for updating detector
detectorId = AnomalyDetector.NO_ID;
}

IndexAnomalyDetectorRequest indexAnomalyDetectorRequest = new IndexAnomalyDetectorRequest(
detectorId,
seqNo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ protected AnomalyDetector copyConfig(User user, Config config) {
config.getCustomResultIndexMinSize(),
config.getCustomResultIndexMinAge(),
config.getCustomResultIndexTTL(),
config.getFlattenResultIndexMapping()
config.getFlattenResultIndexMapping(),
breakingUIChange ? Instant.now() : config.getLastBreakingUIChangeTime()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,18 @@ public class ForecastTaskProfileRunner implements TaskProfileRunner<ForecastTask

@Override
public void getTaskProfile(ForecastTask configLevelTask, ActionListener<ForecastTaskProfile> listener) {
// return null since forecasting have no in-memory task profiles as AD
listener.onResponse(null);
// return null in other fields since forecasting have no in-memory task profiles as AD
listener
.onResponse(
new ForecastTaskProfile(
configLevelTask,
null,
null,
null,
configLevelTask == null ? null : configLevelTask.getTaskId(),
null
)
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ public static ForecastTask parse(XContentParser parser, String taskId) throws IO
forecaster.getCustomResultIndexMinSize(),
forecaster.getCustomResultIndexMinAge(),
forecaster.getCustomResultIndexTTL(),
forecaster.getFlattenResultIndexMapping()
forecaster.getFlattenResultIndexMapping(),
forecaster.getLastBreakingUIChangeTime()
);
return new Builder()
.taskId(parsedTaskId)
Expand Down Expand Up @@ -375,10 +376,12 @@ public static ForecastTask parse(XContentParser parser, String taskId) throws IO
@Generated
@Override
public boolean equals(Object other) {
if (this == other)
if (this == other) {
return true;
if (other == null || getClass() != other.getClass())
}
if (other == null || getClass() != other.getClass()) {
return false;
}
ForecastTask that = (ForecastTask) other;
return super.equals(that)
&& Objects.equal(getForecaster(), that.getForecaster())
Expand Down
13 changes: 10 additions & 3 deletions src/main/java/org/opensearch/forecast/model/Forecaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public Forecaster(
Integer customResultIndexMinSize,
Integer customResultIndexMinAge,
Integer customResultIndexTTL,
Boolean flattenResultIndexMapping
Boolean flattenResultIndexMapping,
Instant lastBreakingUIChangeTime
) {
super(
forecasterId,
Expand Down Expand Up @@ -163,7 +164,8 @@ public Forecaster(
customResultIndexMinSize,
customResultIndexMinAge,
customResultIndexTTL,
flattenResultIndexMapping
flattenResultIndexMapping,
lastBreakingUIChangeTime
);

checkAndThrowValidationErrors(ValidationAspect.FORECASTER);
Expand Down Expand Up @@ -306,6 +308,7 @@ public static Forecaster parse(
Integer customResultIndexMinAge = null;
Integer customResultIndexTTL = null;
Boolean flattenResultIndexMapping = null;
Instant lastBreakingUIChangeTime = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -437,6 +440,9 @@ public static Forecaster parse(
case FLATTEN_RESULT_INDEX_MAPPING:
flattenResultIndexMapping = parser.booleanValue();
break;
case BREAKING_UI_CHANGE_TIME:
lastBreakingUIChangeTime = ParseUtils.toInstant(parser);
break;
default:
parser.skipChildren();
break;
Expand Down Expand Up @@ -468,7 +474,8 @@ public static Forecaster parse(
customResultIndexMinSize,
customResultIndexMinAge,
customResultIndexTTL,
flattenResultIndexMapping
flattenResultIndexMapping,
lastBreakingUIChangeTime
);
return forecaster;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
: WriteRequest.RefreshPolicy.IMMEDIATE;
RestRequest.Method method = request.getHttpRequest().method();

if (method == RestRequest.Method.POST && forecasterId != Config.NO_ID) {
// reset detector to empty string detectorId is only meant for updating detector
forecasterId = Config.NO_ID;
}

IndexForecasterRequest indexAnomalyDetectorRequest = new IndexForecasterRequest(
forecasterId,
seqNo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ protected Config copyConfig(User user, Config config) {
config.getCustomResultIndexMinSize(),
config.getCustomResultIndexMinAge(),
config.getCustomResultIndexTTL(),
config.getFlattenResultIndexMapping()
config.getFlattenResultIndexMapping(),
breakingUIChange ? Instant.now() : config.getLastBreakingUIChangeTime()
);
}

Expand Down
19 changes: 18 additions & 1 deletion src/main/java/org/opensearch/timeseries/model/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public abstract class Config implements Writeable, ToXContentObject {
public static final String RESULT_INDEX_FIELD_MIN_AGE = "result_index_min_age";
public static final String RESULT_INDEX_FIELD_TTL = "result_index_ttl";
public static final String FLATTEN_RESULT_INDEX_MAPPING = "flatten_result_index_mapping";
// Changing categorical field, feature attributes, interval, windowDelay, time field, horizon, indices,
// result index would force us to display results only from the most recent update. Otherwise,
// the UI appear cluttered and unclear.
// We cannot use last update time as it would change whenever other fields like name changes.
public static final String BREAKING_UI_CHANGE_TIME = "last_ui_breaking_change_time";

protected String id;
protected Long version;
Expand Down Expand Up @@ -120,6 +125,7 @@ public abstract class Config implements Writeable, ToXContentObject {
protected Integer customResultIndexMinAge;
protected Integer customResultIndexTTL;
protected Boolean flattenResultIndexMapping;
protected Instant lastUIBreakingChangeTime;

public static String INVALID_RESULT_INDEX_NAME_SIZE = "Result index name size must contains less than "
+ MAX_RESULT_INDEX_NAME_SIZE
Expand Down Expand Up @@ -151,7 +157,8 @@ protected Config(
Integer customResultIndexMinSize,
Integer customResultIndexMinAge,
Integer customResultIndexTTL,
Boolean flattenResultIndexMapping
Boolean flattenResultIndexMapping,
Instant lastBreakingUIChangeTime
) {
if (Strings.isBlank(name)) {
errorMessage = CommonMessages.EMPTY_NAME;
Expand Down Expand Up @@ -291,6 +298,7 @@ protected Config(
this.customResultIndexMinAge = Strings.trimToNull(resultIndex) == null ? null : customResultIndexMinAge;
this.customResultIndexTTL = Strings.trimToNull(resultIndex) == null ? null : customResultIndexTTL;
this.flattenResultIndexMapping = Strings.trimToNull(resultIndex) == null ? null : flattenResultIndexMapping;
this.lastUIBreakingChangeTime = lastBreakingUIChangeTime;
}

public int suggestHistory() {
Expand Down Expand Up @@ -335,6 +343,7 @@ public Config(StreamInput input) throws IOException {
this.customResultIndexMinAge = input.readOptionalInt();
this.customResultIndexTTL = input.readOptionalInt();
this.flattenResultIndexMapping = input.readOptionalBoolean();
this.lastUIBreakingChangeTime = input.readOptionalInstant();
}

/*
Expand Down Expand Up @@ -388,6 +397,7 @@ public void writeTo(StreamOutput output) throws IOException {
output.writeOptionalInt(customResultIndexMinAge);
output.writeOptionalInt(customResultIndexTTL);
output.writeOptionalBoolean(flattenResultIndexMapping);
output.writeOptionalInstant(lastUIBreakingChangeTime);
}

public boolean invalidShingleSizeRange(Integer shingleSizeToTest) {
Expand Down Expand Up @@ -525,6 +535,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (flattenResultIndexMapping != null) {
builder.field(FLATTEN_RESULT_INDEX_MAPPING, flattenResultIndexMapping);
}
if (lastUIBreakingChangeTime != null) {
builder.field(BREAKING_UI_CHANGE_TIME, lastUIBreakingChangeTime.toEpochMilli());
}
return builder;
}

Expand Down Expand Up @@ -737,6 +750,10 @@ public Boolean getFlattenResultIndexMapping() {
return flattenResultIndexMapping;
}

public Instant getLastBreakingUIChangeTime() {
return lastUIBreakingChangeTime;
}

/**
* Identifies redundant feature names.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public abstract class AbstractTimeSeriesActionHandler<T extends ActionResponse,
protected final Clock clock;
protected final Settings settings;
protected final ValidationAspect configValidationAspect;
protected boolean breakingUIChange;

public AbstractTimeSeriesActionHandler(
Config config,
Expand Down Expand Up @@ -203,6 +204,7 @@ public AbstractTimeSeriesActionHandler(
this.settings = settings;
this.handler = new ConfigUpdateConfirmer<>(taskManager, transportService);
this.configValidationAspect = configValidationAspect;
this.breakingUIChange = false;
}

/**
Expand Down Expand Up @@ -456,6 +458,11 @@ private void onGetConfigResponse(GetResponse response, boolean indexingDryRun, S
);
return;
}
} else {
if (!ParseUtils.listEqualsWithoutConsideringOrder(existingConfig.getCategoryFields(), config.getCategoryFields())
|| !Objects.equals(existingConfig.getCustomResultIndexOrAlias(), config.getCustomResultIndexOrAlias())) {
breakingUIChange = true;
}
}

ActionListener<Void> confirmBatchRunningListener = ActionListener
Expand Down Expand Up @@ -675,7 +682,6 @@ protected void validateCategoricalField(
);
}

@SuppressWarnings("unchecked")
protected void searchConfigInputIndices(String configId, boolean indexingDryRun, ActionListener<T> listener) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.query(QueryBuilders.matchAllQuery())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public boolean isAnswerTrue() {

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(answer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public boolean isAnswerTrue() {

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(answer);
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/mappings/anomaly-detection-state.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"dynamic": false,
"_meta": {
"schema_version": 4
"schema_version": 5
},
"properties": {
"schema_version": {
Expand Down
6 changes: 5 additions & 1 deletion src/main/resources/mappings/config.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"dynamic": false,
"_meta": {
"schema_version": 6
"schema_version": 7
},
"properties": {
"schema_version": {
Expand Down Expand Up @@ -232,6 +232,10 @@
},
"flatten_result_index_mapping": {
"type": "boolean"
},
"last_ui_breaking_change_time" : {
"type": "date",
"format": "strict_date_time||epoch_millis"
}
}
}
Loading
Loading