Skip to content

Commit 9e9e4cf

Browse files
committed
fix: ensure jobs index is system index and created on startup of data node
Signed-off-by: Pavan Yekbote <[email protected]>
1 parent ad24357 commit 9e9e4cf

File tree

11 files changed

+173
-83
lines changed

11 files changed

+173
-83
lines changed

common/src/main/java/org/opensearch/ml/common/CommonValue.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class CommonValue {
6565
public static final String ML_MEMORY_MESSAGE_INDEX_MAPPING_PATH = "index-mappings/ml_memory_message.json";
6666
public static final String ML_MCP_SESSION_MANAGEMENT_INDEX_MAPPING_PATH = "index-mappings/ml_mcp_session_management.json";
6767
public static final String ML_MCP_TOOLS_INDEX_MAPPING_PATH = "index-mappings/ml_mcp_tools.json";
68+
public static final String ML_JOBS_INDEX_MAPPING_PATH = "index-mappings/ml_jobs.json";
6869

6970
// Calculate Versions independently of OpenSearch core version
7071
public static final Version VERSION_2_11_0 = Version.fromString("2.11.0");

common/src/main/java/org/opensearch/ml/common/MLIndex.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import static org.opensearch.ml.common.CommonValue.ML_CONNECTOR_INDEX_MAPPING_PATH;
1616
import static org.opensearch.ml.common.CommonValue.ML_CONTROLLER_INDEX;
1717
import static org.opensearch.ml.common.CommonValue.ML_CONTROLLER_INDEX_MAPPING_PATH;
18+
import static org.opensearch.ml.common.CommonValue.ML_JOBS_INDEX;
19+
import static org.opensearch.ml.common.CommonValue.ML_JOBS_INDEX_MAPPING_PATH;
1820
import static org.opensearch.ml.common.CommonValue.ML_MCP_SESSION_MANAGEMENT_INDEX_MAPPING_PATH;
1921
import static org.opensearch.ml.common.CommonValue.ML_MCP_TOOLS_INDEX_MAPPING_PATH;
2022
import static org.opensearch.ml.common.CommonValue.ML_MEMORY_MESSAGE_INDEX;
@@ -44,7 +46,8 @@ public enum MLIndex {
4446
MEMORY_META(ML_MEMORY_META_INDEX, false, ML_MEMORY_META_INDEX_MAPPING_PATH),
4547
MEMORY_MESSAGE(ML_MEMORY_MESSAGE_INDEX, false, ML_MEMORY_MESSAGE_INDEX_MAPPING_PATH),
4648
MCP_SESSION_MANAGEMENT(MCP_SESSION_MANAGEMENT_INDEX, false, ML_MCP_SESSION_MANAGEMENT_INDEX_MAPPING_PATH),
47-
MCP_TOOLS(MCP_TOOLS_INDEX, false, ML_MCP_TOOLS_INDEX_MAPPING_PATH);
49+
MCP_TOOLS(MCP_TOOLS_INDEX, false, ML_MCP_TOOLS_INDEX_MAPPING_PATH),
50+
JOBS(ML_JOBS_INDEX, false, ML_JOBS_INDEX_MAPPING_PATH);
4851

4952
private final String indexName;
5053
// whether we use an alias for the index

common/src/main/java/org/opensearch/ml/common/settings/MLCommonsSettings.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,9 +345,9 @@ private MLCommonsSettings() {}
345345

346346
// Feature flag for enabling telemetry metric collection via metrics framework
347347
public static final Setting<Boolean> ML_COMMONS_METRIC_COLLECTION_ENABLED = Setting
348-
.boolSetting("plugins.ml_commons.metrics_collection_enabled", false, Setting.Property.NodeScope, Setting.Property.Dynamic);
348+
.boolSetting("plugins.ml_commons.metrics_collection_enabled", false, Setting.Property.NodeScope, Setting.Property.Final);
349349

350350
// Feature flag for enabling telemetry static metric collection job -- MLStatsJobProcessor
351351
public static final Setting<Boolean> ML_COMMONS_STATIC_METRIC_COLLECTION_ENABLED = Setting
352-
.boolSetting("plugins.ml_commons.metrics_static_collection_enabled", false, Setting.Property.NodeScope, Setting.Property.Dynamic);
352+
.boolSetting("plugins.ml_commons.metrics_static_collection_enabled", false, Setting.Property.NodeScope, Setting.Property.Final);
353353
}

common/src/main/java/org/opensearch/ml/common/settings/MLFeatureEnabledSetting.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,6 @@ public MLFeatureEnabledSetting(ClusterService clusterService, Settings settings)
8888
clusterService
8989
.getClusterSettings()
9090
.addSettingsUpdateConsumer(MLCommonsSettings.ML_COMMONS_RAG_PIPELINE_FEATURE_ENABLED, it -> isRagSearchPipelineEnabled = it);
91-
clusterService
92-
.getClusterSettings()
93-
.addSettingsUpdateConsumer(ML_COMMONS_METRIC_COLLECTION_ENABLED, it -> isMetricCollectionEnabled = it);
94-
clusterService
95-
.getClusterSettings()
96-
.addSettingsUpdateConsumer(ML_COMMONS_STATIC_METRIC_COLLECTION_ENABLED, it -> isStaticMetricCollectionEnabled = it);
9791
}
9892

9993
/**
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
{
2+
"_meta": {
3+
"schema_version": 1
4+
},
5+
"properties": {
6+
"name": {
7+
"type": "keyword"
8+
},
9+
"enabled": {
10+
"type": "boolean"
11+
},
12+
"enabled_time": {
13+
"type": "date",
14+
"format": "strict_date_time||epoch_millis"
15+
},
16+
"last_update_time": {
17+
"type": "date",
18+
"format": "strict_date_time||epoch_millis"
19+
},
20+
"schedule": {
21+
"properties": {
22+
"interval": {
23+
"properties": {
24+
"start_time": {
25+
"type": "date",
26+
"format": "strict_date_time||epoch_millis"
27+
},
28+
"period": {
29+
"type": "integer"
30+
},
31+
"unit": {
32+
"type": "keyword"
33+
}
34+
}
35+
}
36+
}
37+
},
38+
"lock_duration_seconds": {
39+
"type": "long"
40+
},
41+
"jitter": {
42+
"type": "double"
43+
},
44+
"type": {
45+
"type": "keyword"
46+
}
47+
}
48+
}

ml-algorithms/src/main/java/org/opensearch/ml/engine/indices/MLIndicesHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ public void initMLMcpToolsIndex(ActionListener<Boolean> listener) {
9292
initMLIndexIfAbsent(MLIndex.MCP_TOOLS, listener);
9393
}
9494

95+
public void initMLJobsIndex(ActionListener<Boolean> listener) {
96+
initMLIndexIfAbsent(MLIndex.JOBS, listener);
97+
}
98+
9599
public void initMLAgentIndex(ActionListener<Boolean> listener) {
96100
initMLIndexIfAbsent(MLIndex.AGENT, listener);
97101
}

plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterEventListener.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,29 @@
55

66
package org.opensearch.ml.cluster;
77

8+
import static org.opensearch.ml.common.CommonValue.TASK_POLLING_JOB_INDEX;
89
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_MONITORING_REQUEST_COUNT;
910

1011
import java.util.List;
1112
import java.util.Set;
1213
import java.util.stream.Collectors;
1314

15+
import org.opensearch.Version;
1416
import org.opensearch.cluster.ClusterChangedEvent;
1517
import org.opensearch.cluster.ClusterState;
1618
import org.opensearch.cluster.ClusterStateListener;
1719
import org.opensearch.cluster.node.DiscoveryNode;
1820
import org.opensearch.cluster.node.DiscoveryNodes;
1921
import org.opensearch.cluster.service.ClusterService;
2022
import org.opensearch.common.settings.Settings;
23+
import org.opensearch.core.action.ActionListener;
2124
import org.opensearch.ml.autoredeploy.MLModelAutoReDeployer;
25+
import org.opensearch.ml.common.settings.MLFeatureEnabledSetting;
26+
import org.opensearch.ml.engine.indices.MLIndicesHandler;
2227
import org.opensearch.ml.model.MLModelCacheHelper;
2328
import org.opensearch.ml.model.MLModelManager;
2429
import org.opensearch.ml.task.MLTaskManager;
30+
import org.opensearch.transport.client.Client;
2531

2632
import lombok.extern.log4j.Log4j2;
2733

@@ -32,22 +38,30 @@ public class MLCommonsClusterEventListener implements ClusterStateListener {
3238
private final MLModelManager mlModelManager;
3339
private final MLTaskManager mlTaskManager;
3440
private final MLModelCacheHelper modelCacheHelper;
35-
3641
private final MLModelAutoReDeployer mlModelAutoReDeployer;
42+
private final Client client;
43+
private final MLIndicesHandler mlIndicesHandler;
44+
private final MLFeatureEnabledSetting mlFeatureEnabledSetting;
3745

3846
public MLCommonsClusterEventListener(
3947
ClusterService clusterService,
4048
MLModelManager mlModelManager,
4149
MLTaskManager mlTaskManager,
4250
MLModelCacheHelper modelCacheHelper,
43-
MLModelAutoReDeployer mlModelAutoReDeployer
51+
MLModelAutoReDeployer mlModelAutoReDeployer,
52+
Client client,
53+
MLIndicesHandler mlIndicesHandler,
54+
MLFeatureEnabledSetting mlFeatureEnabledSetting
4455
) {
4556
this.clusterService = clusterService;
4657
this.clusterService.addListener(this);
4758
this.mlModelManager = mlModelManager;
4859
this.mlTaskManager = mlTaskManager;
4960
this.modelCacheHelper = modelCacheHelper;
5061
this.mlModelAutoReDeployer = mlModelAutoReDeployer;
62+
this.client = client;
63+
this.mlIndicesHandler = mlIndicesHandler;
64+
this.mlFeatureEnabledSetting = mlFeatureEnabledSetting;
5165
}
5266

5367
@Override
@@ -66,6 +80,29 @@ public void clusterChanged(ClusterChangedEvent event) {
6680
Set<String> removedNodeIds = delta.removedNodes().stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
6781
mlModelManager.removeWorkerNodes(removedNodeIds, false);
6882
} else if (delta.added()) {
83+
for (DiscoveryNode node : delta.addedNodes()) {
84+
// 3.1 introduces a new index for the job scheduler to track jobs
85+
// the statsCollectorJob needs to be run when a cluster is started with the stats settings enabled
86+
// As a result, we need to wait for a data node to come up before creating the new jobs index
87+
if (node.isDataNode() && Version.V_3_1_0.onOrAfter(node.getVersion())) {
88+
if (mlFeatureEnabledSetting.isMetricCollectionEnabled() && mlFeatureEnabledSetting.isStaticMetricCollectionEnabled()) {
89+
mlIndicesHandler.initMLJobsIndex(ActionListener.wrap(success -> {
90+
if (success) {
91+
mlTaskManager.startStatsCollectorJob();
92+
}
93+
}, e -> log.error("Failed to initialize ML jobs index", e)));
94+
}
95+
96+
if (clusterService.state().getMetadata().hasIndex(TASK_POLLING_JOB_INDEX)) {
97+
mlIndicesHandler.initMLJobsIndex(ActionListener.wrap(success -> {
98+
if (success) {
99+
mlTaskManager.startTaskPollingJob();
100+
}
101+
}, e -> log.error("Failed to initialize ML jobs index", e)));
102+
}
103+
}
104+
}
105+
69106
List<String> addedNodesIds = delta.addedNodes().stream().map(DiscoveryNode::getId).collect(Collectors.toList());
70107
mlModelAutoReDeployer.buildAutoReloadArrangement(addedNodesIds, state.getNodes().getClusterManagerNodeId());
71108
}

plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -8,28 +8,18 @@
88
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_SYNC_UP_JOB_INTERVAL_IN_SECONDS;
99
import static org.opensearch.ml.plugin.MachineLearningPlugin.GENERAL_THREAD_POOL;
1010

11-
import java.io.IOException;
12-
import java.time.Instant;
13-
import java.time.temporal.ChronoUnit;
1411
import java.util.List;
1512

16-
import org.opensearch.action.index.IndexRequest;
17-
import org.opensearch.action.support.WriteRequest;
1813
import org.opensearch.cluster.LocalNodeClusterManagerListener;
1914
import org.opensearch.cluster.service.ClusterService;
2015
import org.opensearch.common.lifecycle.LifecycleListener;
2116
import org.opensearch.common.settings.Settings;
2217
import org.opensearch.common.unit.TimeValue;
23-
import org.opensearch.common.xcontent.json.JsonXContent;
2418
import org.opensearch.core.action.ActionListener;
25-
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
2619
import org.opensearch.ml.autoredeploy.MLModelAutoReDeployer;
27-
import org.opensearch.ml.common.CommonValue;
2820
import org.opensearch.ml.common.settings.MLFeatureEnabledSetting;
2921
import org.opensearch.ml.engine.encryptor.Encryptor;
3022
import org.opensearch.ml.engine.indices.MLIndicesHandler;
31-
import org.opensearch.ml.jobs.MLJobParameter;
32-
import org.opensearch.ml.jobs.MLJobType;
3323
import org.opensearch.remote.metadata.client.SdkClient;
3424
import org.opensearch.threadpool.Scheduler;
3525
import org.opensearch.threadpool.ThreadPool;
@@ -105,40 +95,6 @@ public void onClusterManager() {
10595
TimeValue.timeValueSeconds(jobInterval),
10696
GENERAL_THREAD_POOL
10797
);
108-
startStatsCollectorJob();
109-
}
110-
111-
private void startStatsCollectorJob() {
112-
try {
113-
int intervalInMinutes = 5;
114-
Long lockDurationSeconds = 20L;
115-
116-
MLJobParameter jobParameter = new MLJobParameter(
117-
MLJobType.STATS_COLLECTOR.name(),
118-
new IntervalSchedule(Instant.now(), intervalInMinutes, ChronoUnit.MINUTES),
119-
lockDurationSeconds,
120-
null,
121-
MLJobType.STATS_COLLECTOR
122-
);
123-
124-
IndexRequest indexRequest = new IndexRequest()
125-
.index(CommonValue.ML_JOBS_INDEX)
126-
.id(MLJobType.STATS_COLLECTOR.name())
127-
.source(jobParameter.toXContent(JsonXContent.contentBuilder(), null))
128-
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
129-
130-
client
131-
.index(
132-
indexRequest,
133-
ActionListener
134-
.wrap(
135-
r -> log.info("Indexed ml stats collection job successfully"),
136-
e -> log.error("Failed to index stats collection job", e)
137-
)
138-
);
139-
} catch (IOException e) {
140-
log.error("Failed to index stats collection job", e);
141-
}
14298
}
14399

144100
private void startSyncModelRoutingCron() {

plugin/src/main/java/org/opensearch/ml/jobs/processors/MLJobProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.opensearch.jobscheduler.spi.JobExecutionContext;
1515
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
1616
import org.opensearch.jobscheduler.spi.utils.LockService;
17-
import org.opensearch.ml.common.exception.MLException;
1817
import org.opensearch.threadpool.ThreadPool;
1918
import org.opensearch.transport.client.Client;
2019

@@ -36,7 +35,8 @@ public MLJobProcessor(ClusterService clusterService, Client client, ThreadPool t
3635

3736
public void process(ScheduledJobParameter scheduledJobParameter, JobExecutionContext jobExecutionContext, boolean isProcessorEnabled) {
3837
if (!isProcessorEnabled) {
39-
throw new MLException(scheduledJobParameter.getName() + " not enabled.");
38+
log.warn("{} not enabled.", scheduledJobParameter.getName());
39+
return;
4040
}
4141

4242
process(scheduledJobParameter, jobExecutionContext);

plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import static org.opensearch.ml.common.CommonValue.ML_CONFIG_INDEX;
1010
import static org.opensearch.ml.common.CommonValue.ML_CONNECTOR_INDEX;
1111
import static org.opensearch.ml.common.CommonValue.ML_CONTROLLER_INDEX;
12+
import static org.opensearch.ml.common.CommonValue.ML_JOBS_INDEX;
1213
import static org.opensearch.ml.common.CommonValue.ML_MEMORY_MESSAGE_INDEX;
1314
import static org.opensearch.ml.common.CommonValue.ML_MEMORY_META_INDEX;
1415
import static org.opensearch.ml.common.CommonValue.ML_MODEL_GROUP_INDEX;
@@ -387,7 +388,9 @@
387388
import com.google.common.annotations.VisibleForTesting;
388389

389390
import lombok.SneakyThrows;
391+
import lombok.extern.log4j.Log4j2;
390392

393+
@Log4j2
391394
public class MachineLearningPlugin extends Plugin
392395
implements
393396
ActionPlugin,
@@ -773,7 +776,10 @@ public Collection<Object> createComponents(
773776
mlModelManager,
774777
mlTaskManager,
775778
modelCacheHelper,
776-
mlModelAutoRedeployer
779+
mlModelAutoRedeployer,
780+
client,
781+
mlIndicesHandler,
782+
mlFeatureEnabledSetting
777783
);
778784
MLCommonsClusterManagerEventListener clusterManagerEventListener = new MLCommonsClusterManagerEventListener(
779785
clusterService,
@@ -791,8 +797,11 @@ public Collection<Object> createComponents(
791797
MLJobRunner
792798
.getInstance()
793799
.initialize(clusterService, threadPool, client, sdkClient, connectorAccessControlHelper, mlFeatureEnabledSetting);
794-
MLOperationalMetricsCounter.initialize(clusterService.getClusterName().toString(), metricsRegistry, mlFeatureEnabledSetting);
795-
MLAdoptionMetricsCounter.initialize(clusterService.getClusterName().toString(), metricsRegistry, mlFeatureEnabledSetting);
800+
801+
if (mlFeatureEnabledSetting.isMetricCollectionEnabled()) {
802+
MLOperationalMetricsCounter.initialize(clusterService.getClusterName().toString(), metricsRegistry, mlFeatureEnabledSetting);
803+
MLAdoptionMetricsCounter.initialize(clusterService.getClusterName().toString(), metricsRegistry, mlFeatureEnabledSetting);
804+
}
796805

797806
mcpToolsHelper = new McpToolsHelper(client, threadPool, toolFactoryWrapper);
798807
McpAsyncServerHolder.init(mlIndicesHandler, mcpToolsHelper);
@@ -1267,6 +1276,7 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett
12671276
systemIndexDescriptors.add(new SystemIndexDescriptor(ML_MEMORY_META_INDEX, "ML Commons Memory Meta Index"));
12681277
systemIndexDescriptors.add(new SystemIndexDescriptor(ML_MEMORY_MESSAGE_INDEX, "ML Commons Memory Message Index"));
12691278
systemIndexDescriptors.add(new SystemIndexDescriptor(ML_STOP_WORDS_INDEX, "ML Commons Stop Words Index"));
1279+
systemIndexDescriptors.add(new SystemIndexDescriptor(ML_JOBS_INDEX, "ML Commons Jobs Index"));
12701280
return systemIndexDescriptors;
12711281
}
12721282

0 commit comments

Comments
 (0)