Skip to content

Commit

Permalink
Refactor ADTask and Related Components (#969)
Browse files Browse the repository at this point in the history
* Refactor ADTask and Related Components

This PR includes several key refactoring changes:
- Extracts common code from ADTask into TimeSeriesTask, creating ForecastTask for forecasting-specific logic.
- Consolidates common code from ADTaskType into TaskType and introduces ForecastTaskType for forecasting-related purposes.
- Renames ADTaskState to TaskState for consistent code reuse.
- Renames the method getId in ADTask to getConfigId to differentiate it from other IDs like task id.

Testing done:
1. Added unit tests for the new code to ensure functionality.
2. Executed a successful Gradle build.

Signed-off-by: Kaituo Li <[email protected]>

* add comments and address compiler errors

Signed-off-by: Kaituo Li <[email protected]>

* address Amit's comments and address compiler failure

Signed-off-by: Kaituo Li <[email protected]>

---------

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo authored Aug 7, 2023
1 parent 1130a1b commit d4946f0
Show file tree
Hide file tree
Showing 49 changed files with 1,545 additions and 692 deletions.
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.ADTaskState;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.task.ADTaskManager;
Expand Down Expand Up @@ -64,6 +63,7 @@
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.function.ExecutorFunction;
import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.model.TaskState;
import org.opensearch.timeseries.util.SecurityUtil;

import com.google.common.base.Throwables;
Expand Down Expand Up @@ -509,7 +509,7 @@ private void stopAdJobForEndRunException(
executionStartTime,
error,
true,
ADTaskState.STOPPED.name(),
TaskState.STOPPED.name(),
recorder,
detector
)
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/org/opensearch/ad/cluster/ADDataMigrator.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import static org.opensearch.ad.model.ADTask.DETECTOR_ID_FIELD;
import static org.opensearch.ad.model.ADTask.IS_LATEST_FIELD;
import static org.opensearch.ad.model.ADTask.TASK_TYPE_FIELD;
import static org.opensearch.ad.model.ADTaskType.taskTypeToString;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_DETECTOR_UPPER_LIMIT;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.timeseries.model.TaskType.taskTypeToString;
import static org.opensearch.timeseries.util.RestHandlerUtils.XCONTENT_WITH_TYPE;
import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry;

Expand All @@ -39,7 +39,6 @@
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.ADTask;
import org.opensearch.ad.model.ADTaskState;
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.DetectorInternalState;
Expand All @@ -59,6 +58,7 @@
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.function.ExecutorFunction;
import org.opensearch.timeseries.model.Job;
import org.opensearch.timeseries.model.TaskState;
import org.opensearch.timeseries.util.ExceptionUtil;

/**
Expand Down Expand Up @@ -245,15 +245,15 @@ private void createRealtimeADTask(Job job, String error, ConcurrentLinkedQueue<J
Instant now = Instant.now();
String userName = job.getUser() != null ? job.getUser().getName() : null;
ADTask adTask = new ADTask.Builder()
.detectorId(detector.getId())
.configId(detector.getId())
.detector(detector)
.error(error)
.isLatest(true)
.taskType(taskType.name())
.executionStartTime(now)
.taskProgress(0.0f)
.initProgress(0.0f)
.state(ADTaskState.CREATED.name())
.state(TaskState.CREATED.name())
.lastUpdateTime(now)
.startedBy(userName)
.coordinatingNode(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.opensearch.client.Client;
import org.opensearch.cluster.LocalNodeClusterManagerListener;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.component.LifecycleListener;
import org.opensearch.common.lifecycle.LifecycleListener;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/cluster/HashRing.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
import org.opensearch.cluster.routing.Murmur3HashFunction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.constant.CommonName;
Expand Down
Loading

0 comments on commit d4946f0

Please sign in to comment.