Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring NodeStateManager etc. to support forecasting functionality #965

Merged
merged 3 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ buildscript {
js_resource_folder = "src/test/resources/job-scheduler"
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build)
bwcVersionShort = "2.9.0"
bwcVersionShort = "2.10.0"
bwcVersion = bwcVersionShort + ".0"
bwcOpenSearchADDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' +
'opensearch/plugins/opensearch-anomaly-detection-' + bwcVersion + '.zip'
Expand Down Expand Up @@ -672,8 +672,6 @@ List<String> jacocoExclusions = [
'org.opensearch.timeseries.settings.TimeSeriesSettings',
'org.opensearch.forecast.settings.ForecastSettings',

'org.opensearch.ad.util.ClientUtil',

'org.opensearch.ad.transport.CronRequest',
'org.opensearch.ad.AnomalyDetectorRunner',

Expand Down
38 changes: 20 additions & 18 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,12 @@
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.ADTaskState;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.AnomalyResultAction;
import org.opensearch.ad.transport.AnomalyResultRequest;
import org.opensearch.ad.transport.AnomalyResultResponse;
import org.opensearch.ad.transport.AnomalyResultTransportAction;
import org.opensearch.ad.util.SecurityUtil;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -58,11 +56,15 @@
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.common.exception.InternalFailure;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.function.ExecutorFunction;
import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.util.SecurityUtil;

import com.google.common.base.Throwables;

Expand Down Expand Up @@ -134,12 +136,12 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont
String detectorId = scheduledJobParameter.getName();
log.info("Start to run AD job {}", detectorId);
adTaskManager.refreshRealtimeJobRunTime(detectorId);
if (!(scheduledJobParameter instanceof AnomalyDetectorJob)) {
if (!(scheduledJobParameter instanceof Job)) {
throw new IllegalArgumentException(
"Job parameter is not instance of AnomalyDetectorJob, type: " + scheduledJobParameter.getClass().getCanonicalName()
"Job parameter is not instance of Job, type: " + scheduledJobParameter.getClass().getCanonicalName()
);
}
AnomalyDetectorJob jobParameter = (AnomalyDetectorJob) scheduledJobParameter;
Job jobParameter = (Job) scheduledJobParameter;
Instant executionStartTime = Instant.now();
IntervalSchedule schedule = (IntervalSchedule) jobParameter.getSchedule();
Instant detectionStartTime = executionStartTime.minus(schedule.getInterval(), schedule.getUnit());
Expand All @@ -148,12 +150,12 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont

Runnable runnable = () -> {
try {
nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> {
nodeStateManager.getConfig(detectorId, AnalysisType.AD, ActionListener.wrap(detectorOptional -> {
if (!detectorOptional.isPresent()) {
log.error(new ParameterizedMessage("fail to get detector [{}]", detectorId));
return;
}
AnomalyDetector detector = detectorOptional.get();
AnomalyDetector detector = (AnomalyDetector) detectorOptional.get();

if (jobParameter.getLockDurationSeconds() != null) {
lockService
Expand Down Expand Up @@ -216,7 +218,7 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont
* @param detector associated detector accessor
*/
protected void runAdJob(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand Down Expand Up @@ -284,7 +286,7 @@ protected void runAdJob(
}

private void runAnomalyDetectionJob(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand Down Expand Up @@ -393,7 +395,7 @@ private void runAnomalyDetectionJob(
* @param detector associated detector accessor
*/
protected void handleAdException(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand Down Expand Up @@ -482,7 +484,7 @@ protected void handleAdException(
}

private void stopAdJobForEndRunException(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand Down Expand Up @@ -524,9 +526,9 @@ private void stopAdJob(String detectorId, ExecutorFunction function) {
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
Job job = Job.parse(parser);
if (job.isEnabled()) {
AnomalyDetectorJob newJob = new AnomalyDetectorJob(
Job newJob = new Job(
job.getName(),
job.getSchedule(),
job.getWindowDelay(),
Expand Down Expand Up @@ -566,7 +568,7 @@ private void stopAdJob(String detectorId, ExecutorFunction function) {
}

private void indexAnomalyResult(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand All @@ -590,7 +592,7 @@ private void indexAnomalyResult(
}

private void indexAnomalyResultException(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand Down Expand Up @@ -621,7 +623,7 @@ private void indexAnomalyResultException(
}

private void indexAnomalyResultException(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand All @@ -646,7 +648,7 @@ private void indexAnomalyResultException(
}

private void indexAnomalyResultException(
AnomalyDetectorJob jobParameter,
Job jobParameter,
LockService lockService,
LockModel lock,
Instant detectionStartTime,
Expand All @@ -666,7 +668,7 @@ private void indexAnomalyResultException(
}
}

private void releaseLock(AnomalyDetectorJob jobParameter, LockService lockService, LockModel lock) {
private void releaseLock(Job jobParameter, LockService lockService, LockModel lock) {
lockService
.release(
lock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.DetectorProfile;
import org.opensearch.ad.model.DetectorProfileName;
import org.opensearch.ad.model.DetectorState;
Expand All @@ -49,9 +48,6 @@
import org.opensearch.ad.transport.RCFPollingAction;
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.RCFPollingResponse;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -68,11 +64,16 @@
import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder;
import org.opensearch.search.aggregations.metrics.InternalCardinality;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.common.exception.NotSerializedExceptionName;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
import org.opensearch.timeseries.util.ExceptionUtil;
import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;
import org.opensearch.timeseries.util.SecurityClientUtil;
import org.opensearch.transport.TransportService;

public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {
Expand Down Expand Up @@ -159,7 +160,7 @@ private void prepareProfile(
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
Job job = Job.parse(parser);
long enabledTimeMs = job.getEnabledTime().toEpochMilli();

boolean isMultiEntityDetector = detector.isHighCardinality();
Expand Down Expand Up @@ -315,6 +316,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
client::search,
detector.getId(),
client,
AnalysisType.AD,
searchResponseListener
);
} else {
Expand Down Expand Up @@ -368,6 +370,7 @@ private void profileEntityStats(MultiResponsesDelegateActionListener<DetectorPro
client::search,
detector.getId(),
client,
AnalysisType.AD,
searchResponseListener
);
}
Expand Down Expand Up @@ -418,7 +421,7 @@ private void profileStateRelated(
private void profileModels(
AnomalyDetector detector,
Set<DetectorProfileName> profiles,
AnomalyDetectorJob job,
Job job,
boolean forMultiEntityDetector,
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
Expand All @@ -430,7 +433,7 @@ private void profileModels(
private ActionListener<ProfileResponse> onModelResponse(
AnomalyDetector detector,
Set<DetectorProfileName> profilesToCollect,
AnomalyDetectorJob job,
Job job,
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
boolean isMultientityDetector = detector.isHighCardinality();
Expand Down Expand Up @@ -464,7 +467,7 @@ private ActionListener<ProfileResponse> onModelResponse(
}

private void profileMultiEntityDetectorStateRelated(
AnomalyDetectorJob job,
Job job,
Set<DetectorProfileName> profilesToCollect,
ProfileResponse profileResponse,
DetectorProfile.Builder profileBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.EntityAnomalyResult;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.Feature;
import org.opensearch.timeseries.model.FeatureData;
import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;

/**
* Runner to trigger an anomaly detector.
Expand Down
12 changes: 7 additions & 5 deletions src/main/java/org/opensearch/ad/EntityProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.opensearch.ad.constant.ADCommonMessages;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.EntityProfile;
import org.opensearch.ad.model.EntityProfileName;
Expand All @@ -38,8 +37,6 @@
import org.opensearch.ad.transport.EntityProfileAction;
import org.opensearch.ad.transport.EntityProfileRequest;
import org.opensearch.ad.transport.EntityProfileResponse;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
Expand All @@ -53,11 +50,15 @@
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener;
import org.opensearch.timeseries.util.ParseUtils;
import org.opensearch.timeseries.util.SecurityClientUtil;

public class EntityProfileRunner extends AbstractProfileRunner {
private final Logger logger = LogManager.getLogger(EntityProfileRunner.class);
Expand Down Expand Up @@ -188,6 +189,7 @@ private void validateEntity(
client::search,
detector.getId(),
client,
AnalysisType.AD,
searchResponseListener
);

Expand Down Expand Up @@ -228,7 +230,7 @@ private void getJob(
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser);
Job job = Job.parse(parser);

int totalResponsesToWait = 0;
if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS)
Expand Down Expand Up @@ -331,7 +333,7 @@ private void profileStateRelated(
Entity entityValue,
Set<EntityProfileName> profilesToCollect,
AnomalyDetector detector,
AnomalyDetectorJob job,
Job job,
MultiResponsesDelegateActionListener<EntityProfile> delegateListener
) {
if (totalUpdates == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,22 @@
import org.opensearch.ad.transport.RCFPollingAction;
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.commons.authuser.User;
import org.opensearch.search.SearchHits;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.model.FeatureData;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
import org.opensearch.timeseries.util.ExceptionUtil;

public class ExecuteADResultResponseRecorder {
private static final Logger log = LogManager.getLogger(ExecuteADResultResponseRecorder.class);
Expand Down Expand Up @@ -337,20 +339,20 @@ private void confirmTotalRCFUpdatesFound(
String error,
ActionListener<Long> listener
) {
nodeStateManager.getAnomalyDetector(detectorId, ActionListener.wrap(detectorOptional -> {
nodeStateManager.getConfig(detectorId, AnalysisType.AD, ActionListener.wrap(detectorOptional -> {
if (!detectorOptional.isPresent()) {
listener.onFailure(new TimeSeriesException(detectorId, "fail to get detector"));
return;
}
nodeStateManager.getAnomalyDetectorJob(detectorId, ActionListener.wrap(jobOptional -> {
nodeStateManager.getJob(detectorId, ActionListener.wrap(jobOptional -> {
if (!jobOptional.isPresent()) {
listener.onFailure(new TimeSeriesException(detectorId, "fail to get job"));
return;
}

ProfileUtil
.confirmDetectorRealtimeInitStatus(
detectorOptional.get(),
(AnomalyDetector) detectorOptional.get(),
jobOptional.get().getEnabledTime().toEpochMilli(),
client,
ActionListener.wrap(searchResponse -> {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/caching/CacheBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.ExpiringState;
import org.opensearch.ad.MemoryTracker;
import org.opensearch.ad.MemoryTracker.Origin;
import org.opensearch.ad.ml.EntityModel;
Expand All @@ -36,6 +35,7 @@
import org.opensearch.ad.ratelimit.CheckpointWriteWorker;
import org.opensearch.ad.ratelimit.RequestPriority;
import org.opensearch.ad.util.DateUtils;
import org.opensearch.timeseries.ExpiringState;

/**
* We use a layered cache to manage active entities’ states. We have a two-level
Expand Down
Loading
Loading