Skip to content

Commit fd173dd

Browse files
committed
feature: job extension and static job
1 parent 7847bee commit fd173dd

17 files changed

+560
-378
lines changed

common/src/main/java/org/opensearch/ml/common/CommonValue.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,11 @@ public class CommonValue {
4444
public static final String ML_MEMORY_META_INDEX = ".plugins-ml-memory-meta";
4545
public static final String ML_MEMORY_MESSAGE_INDEX = ".plugins-ml-memory-message";
4646
public static final String ML_STOP_WORDS_INDEX = ".plugins-ml-stop-words";
47+
// index used in 2.19 to track MlTaskBatchUpdate task
4748
public static final String TASK_POLLING_JOB_INDEX = ".ml_commons_task_polling_job";
4849
public static final String MCP_SESSION_MANAGEMENT_INDEX = ".plugins-ml-mcp-session-management";
50+
// index created in 3.0 to track all ml jobs created via job scheduler
51+
public static final String ML_JOBS_INDEX = ".plugins-ml-jobs";
4952
public static final Set<String> stopWordsIndices = ImmutableSet.of(".plugins-ml-stop-words");
5053
public static final String TOOL_PARAMETERS_PREFIX = "tools.parameters.";
5154

common/src/main/java/org/opensearch/ml/common/MLModel.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.ml.common.model.MetricsCorrelationModelConfig;
4141
import org.opensearch.ml.common.model.QuestionAnsweringModelConfig;
4242
import org.opensearch.ml.common.model.TextEmbeddingModelConfig;
43+
import org.opensearch.telemetry.metrics.tags.Tags;
4344

4445
import lombok.Builder;
4546
import lombok.Getter;
@@ -745,4 +746,15 @@ public static MLModel fromStream(StreamInput in) throws IOException {
745746
return new MLModel(in);
746747
}
747748

749+
public Tags getModelTags() {
750+
return Tags
751+
.create()
752+
.addTag("type", algorithm == FunctionName.REMOTE ? "remote" : "local")
753+
.addTag("provider", algorithm == FunctionName.REMOTE ? getRemoteModelType() : algorithm.name());
754+
}
755+
756+
private String getRemoteModelType() {
757+
return "remote_sub_tye";
758+
}
759+
748760
}

plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,31 @@
88
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_SYNC_UP_JOB_INTERVAL_IN_SECONDS;
99
import static org.opensearch.ml.plugin.MachineLearningPlugin.GENERAL_THREAD_POOL;
1010

11+
import java.io.IOException;
12+
import java.time.Instant;
13+
import java.time.temporal.ChronoUnit;
1114
import java.util.List;
1215

16+
import org.opensearch.action.index.IndexRequest;
17+
import org.opensearch.action.support.WriteRequest;
1318
import org.opensearch.cluster.LocalNodeClusterManagerListener;
1419
import org.opensearch.cluster.service.ClusterService;
1520
import org.opensearch.common.lifecycle.LifecycleListener;
1621
import org.opensearch.common.settings.Settings;
1722
import org.opensearch.common.unit.TimeValue;
23+
import org.opensearch.common.xcontent.json.JsonXContent;
1824
import org.opensearch.core.action.ActionListener;
25+
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
1926
import org.opensearch.ml.autoredeploy.MLModelAutoReDeployer;
2027
import org.opensearch.ml.common.settings.MLFeatureEnabledSetting;
2128
import org.opensearch.ml.engine.encryptor.Encryptor;
2229
import org.opensearch.ml.engine.indices.MLIndicesHandler;
30+
import org.opensearch.ml.common.CommonValue;
31+
import org.opensearch.ml.engine.encryptor.Encryptor;
32+
import org.opensearch.ml.engine.indices.MLIndicesHandler;
33+
import org.opensearch.ml.jobs.MLJobParameter;
34+
import org.opensearch.ml.jobs.MLJobType;
35+
import org.opensearch.ml.settings.MLFeatureEnabledSetting;
2336
import org.opensearch.remote.metadata.client.SdkClient;
2437
import org.opensearch.threadpool.Scheduler;
2538
import org.opensearch.threadpool.ThreadPool;
@@ -95,6 +108,40 @@ public void onClusterManager() {
95108
TimeValue.timeValueSeconds(jobInterval),
96109
GENERAL_THREAD_POOL
97110
);
111+
// startStatsCollectorJob();
112+
}
113+
114+
public void startStatsCollectorJob() {
115+
try {
116+
int intervalInMinutes = 5;
117+
Long lockDurationSeconds = 20L;
118+
119+
MLJobParameter jobParameter = new MLJobParameter(
120+
MLJobType.STATS_COLLECTOR.name(),
121+
new IntervalSchedule(Instant.now(), intervalInMinutes, ChronoUnit.MINUTES),
122+
lockDurationSeconds,
123+
null,
124+
MLJobType.STATS_COLLECTOR
125+
);
126+
127+
IndexRequest indexRequest = new IndexRequest()
128+
.index(CommonValue.ML_JOBS_INDEX)
129+
.id(MLJobType.STATS_COLLECTOR.name())
130+
.source(jobParameter.toXContent(JsonXContent.contentBuilder(), null))
131+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
132+
133+
client
134+
.index(
135+
indexRequest,
136+
ActionListener
137+
.wrap(
138+
r -> log.info("Indexed ml stats collection job successfully"),
139+
e -> log.error("Failed to index stats collection job", e)
140+
)
141+
);
142+
} catch (IOException e) {
143+
log.error("Failed to index stats collection job", e);
144+
}
98145
}
99146

100147
private void startSyncModelRoutingCron() {

plugin/src/main/java/org/opensearch/ml/jobs/MLBatchTaskUpdateExtension.java

Lines changed: 0 additions & 88 deletions
This file was deleted.

plugin/src/main/java/org/opensearch/ml/jobs/MLBatchTaskUpdateJobRunner.java

Lines changed: 0 additions & 168 deletions
This file was deleted.

0 commit comments

Comments
 (0)