diff --git a/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterEventListener.java b/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterEventListener.java index f883d1f3b3..e28660dbfe 100644 --- a/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterEventListener.java +++ b/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterEventListener.java @@ -5,7 +5,7 @@ package org.opensearch.ml.cluster; -import static org.opensearch.ml.common.CommonValue.TASK_POLLING_JOB_INDEX; +import static org.opensearch.ml.common.CommonValue.ML_JOBS_INDEX; import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_MONITORING_REQUEST_COUNT; import java.util.List; @@ -39,6 +39,7 @@ public class MLCommonsClusterEventListener implements ClusterStateListener { private final MLModelAutoReDeployer mlModelAutoReDeployer; private final Client client; private final MLFeatureEnabledSetting mlFeatureEnabledSetting; + private boolean startedStatsJob; public MLCommonsClusterEventListener( ClusterService clusterService, @@ -90,12 +91,13 @@ public void clusterChanged(ClusterChangedEvent event) { */ for (DiscoveryNode node : state.nodes()) { if (node.isDataNode() && node.getVersion().onOrAfter(Version.V_3_1_0)) { - if (mlFeatureEnabledSetting.isMetricCollectionEnabled() && mlFeatureEnabledSetting.isStaticMetricCollectionEnabled()) { + if (mlFeatureEnabledSetting.isMetricCollectionEnabled() + && mlFeatureEnabledSetting.isStaticMetricCollectionEnabled() + && !clusterService.state().getMetadata().hasIndex(ML_JOBS_INDEX) + && !this.startedStatsJob) { mlTaskManager.indexStatsCollectorJob(true); - } - - if (clusterService.state().getMetadata().hasIndex(TASK_POLLING_JOB_INDEX)) { - mlTaskManager.startTaskPollingJob(); + // using this variable in case if same node has a cluster state change event and the state is not updated yet + this.startedStatsJob = true; } break; diff --git a/plugin/src/main/java/org/opensearch/ml/task/MLTaskManager.java b/plugin/src/main/java/org/opensearch/ml/task/MLTaskManager.java index f5bba724dc..1752ae5d29 100644 --- a/plugin/src/main/java/org/opensearch/ml/task/MLTaskManager.java +++ b/plugin/src/main/java/org/opensearch/ml/task/MLTaskManager.java @@ -577,6 +577,11 @@ public void onStaticMetricCollectionEnabledChanged(boolean isEnabled) { } public void indexStatsCollectorJob(boolean enabled) { + if (this.statsCollectorJobStarted && enabled) { + log.debug("Stats collector job already in desired state: {}", enabled); + return; + } + try { MLJobParameter jobParameter = new MLJobParameter( MLJobType.STATS_COLLECTOR.name(), @@ -593,7 +598,10 @@ public void indexStatsCollectorJob(boolean enabled) { .source(jobParameter.toXContent(JsonXContent.contentBuilder(), null)) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - indexJob(indexRequest, MLJobType.STATS_COLLECTOR, () -> {}); + indexJob(indexRequest, MLJobType.STATS_COLLECTOR, () -> { + this.statsCollectorJobStarted = enabled; + log.debug("Stats collector job {} successfully", enabled ? "started" : "stopped"); + }); } catch (IOException e) { log.error("Failed to index stats collection job", e); } diff --git a/plugin/src/test/java/org/opensearch/ml/cluster/MLCommonsClusterEventListenerTests.java b/plugin/src/test/java/org/opensearch/ml/cluster/MLCommonsClusterEventListenerTests.java index f2d8fdffcb..0de509d1ed 100644 --- a/plugin/src/test/java/org/opensearch/ml/cluster/MLCommonsClusterEventListenerTests.java +++ b/plugin/src/test/java/org/opensearch/ml/cluster/MLCommonsClusterEventListenerTests.java @@ -10,7 +10,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.opensearch.ml.common.CommonValue.TASK_POLLING_JOB_INDEX; +import static org.opensearch.ml.common.CommonValue.ML_JOBS_INDEX; import java.util.Collections; @@ -82,36 +82,34 @@ public void testClusterChanged_WithV31DataNode_MetricCollectionEnabled() { listener.clusterChanged(event); verify(mlTaskManager).indexStatsCollectorJob(true); - verify(mlTaskManager, never()).startTaskPollingJob(); } - public void testClusterChanged_WithV31DataNode_TaskPollingIndexExists() { - DiscoveryNode dataNode = createDataNode(Version.V_3_1_0); - setupClusterState(dataNode, true); + public void testClusterChanged_WithPreV31DataNode_NoJobsStarted() { + DiscoveryNode dataNode = createDataNode(Version.V_3_0_0); + setupClusterState(dataNode, false); - when(mlFeatureEnabledSetting.isMetricCollectionEnabled()).thenReturn(false); + when(mlFeatureEnabledSetting.isMetricCollectionEnabled()).thenReturn(true); + when(mlFeatureEnabledSetting.isStaticMetricCollectionEnabled()).thenReturn(true); listener.clusterChanged(event); verify(mlTaskManager, never()).indexStatsCollectorJob(anyBoolean()); - verify(mlTaskManager).startTaskPollingJob(); } - public void testClusterChanged_WithPreV31DataNode_NoJobsStarted() { - DiscoveryNode dataNode = createDataNode(Version.V_3_0_0); - setupClusterState(dataNode, true); + public void testClusterChanged_WithPostV31DataNode_JobsStarted() { + DiscoveryNode dataNode = createDataNode(Version.V_3_2_0); + setupClusterState(dataNode, false); when(mlFeatureEnabledSetting.isMetricCollectionEnabled()).thenReturn(true); when(mlFeatureEnabledSetting.isStaticMetricCollectionEnabled()).thenReturn(true); listener.clusterChanged(event); - verify(mlTaskManager, never()).indexStatsCollectorJob(anyBoolean()); - verify(mlTaskManager, never()).startTaskPollingJob(); + verify(mlTaskManager).indexStatsCollectorJob(true); } - public void testClusterChanged_WithPostV31DataNode_JobsStarted() { - DiscoveryNode dataNode = createDataNode(Version.V_3_2_0); + public void testClusterChanged_IndexAlreadyPresent_JobNotStarted() { + DiscoveryNode dataNode = createDataNode(Version.V_3_1_0); setupClusterState(dataNode, true); when(mlFeatureEnabledSetting.isMetricCollectionEnabled()).thenReturn(true); @@ -119,8 +117,7 @@ public void testClusterChanged_WithPostV31DataNode_JobsStarted() { listener.clusterChanged(event); - verify(mlTaskManager).indexStatsCollectorJob(true); - verify(mlTaskManager).startTaskPollingJob(); + verify(mlTaskManager, never()).indexStatsCollectorJob(anyBoolean()); } private DiscoveryNode createDataNode(Version version) { @@ -134,7 +131,7 @@ private DiscoveryNode createDataNode(Version version) { ); } - private void setupClusterState(DiscoveryNode node, boolean hasTaskPollingIndex) { + private void setupClusterState(DiscoveryNode node, boolean hasMLJobsIndex) { DiscoveryNodes nodes = DiscoveryNodes.builder().add(node).build(); when(event.state()).thenReturn(clusterState); @@ -143,7 +140,7 @@ private void setupClusterState(DiscoveryNode node, boolean hasTaskPollingIndex) when(clusterState.nodes()).thenReturn(nodes); when(clusterState.getMetadata()).thenReturn(metadata); when(clusterService.state()).thenReturn(clusterState); - when(metadata.hasIndex(TASK_POLLING_JOB_INDEX)).thenReturn(hasTaskPollingIndex); + when(metadata.hasIndex(ML_JOBS_INDEX)).thenReturn(hasMLJobsIndex); when(metadata.settings()).thenReturn(org.opensearch.common.settings.Settings.EMPTY); } }