Skip to content

Commit c8f73b9

Browse files
pyek-botgithub-actions[bot]
authored andcommitted
[Job Scheduler] Add additional checks for initializing the stats job collector (#4362)
* fix: add additional checks for initializing the stats job collector to minimize jvm usage Signed-off-by: Pavan Yekbote <[email protected]> * fix: spotless apply and add debug logs Signed-off-by: Pavan Yekbote <[email protected]> * fix: tests Signed-off-by: Pavan Yekbote <[email protected]> * fix: spotless Signed-off-by: Pavan Yekbote <[email protected]> --------- Signed-off-by: Pavan Yekbote <[email protected]> (cherry picked from commit 5ac2984)
1 parent f65674c commit c8f73b9

File tree

3 files changed

+32
-25
lines changed

3 files changed

+32
-25
lines changed

plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterEventListener.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
package org.opensearch.ml.cluster;
77

8-
import static org.opensearch.ml.common.CommonValue.TASK_POLLING_JOB_INDEX;
8+
import static org.opensearch.ml.common.CommonValue.ML_JOBS_INDEX;
99
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_MONITORING_REQUEST_COUNT;
1010

1111
import java.util.List;
@@ -39,6 +39,7 @@ public class MLCommonsClusterEventListener implements ClusterStateListener {
3939
private final MLModelAutoReDeployer mlModelAutoReDeployer;
4040
private final Client client;
4141
private final MLFeatureEnabledSetting mlFeatureEnabledSetting;
42+
private boolean startedStatsJob;
4243

4344
public MLCommonsClusterEventListener(
4445
ClusterService clusterService,
@@ -90,12 +91,13 @@ public void clusterChanged(ClusterChangedEvent event) {
9091
*/
9192
for (DiscoveryNode node : state.nodes()) {
9293
if (node.isDataNode() && node.getVersion().onOrAfter(Version.V_3_1_0)) {
93-
if (mlFeatureEnabledSetting.isMetricCollectionEnabled() && mlFeatureEnabledSetting.isStaticMetricCollectionEnabled()) {
94+
if (mlFeatureEnabledSetting.isMetricCollectionEnabled()
95+
&& mlFeatureEnabledSetting.isStaticMetricCollectionEnabled()
96+
&& !clusterService.state().getMetadata().hasIndex(ML_JOBS_INDEX)
97+
&& !this.startedStatsJob) {
9498
mlTaskManager.indexStatsCollectorJob(true);
95-
}
96-
97-
if (clusterService.state().getMetadata().hasIndex(TASK_POLLING_JOB_INDEX)) {
98-
mlTaskManager.startTaskPollingJob();
99+
// using this variable in case if same node has a cluster state change event and the state is not updated yet
100+
this.startedStatsJob = true;
99101
}
100102

101103
break;

plugin/src/main/java/org/opensearch/ml/task/MLTaskManager.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,11 @@ public void onStaticMetricCollectionEnabledChanged(boolean isEnabled) {
577577
}
578578

579579
public void indexStatsCollectorJob(boolean enabled) {
580+
if (this.statsCollectorJobStarted && enabled) {
581+
log.debug("Stats collector job already in desired state: {}", enabled);
582+
return;
583+
}
584+
580585
try {
581586
MLJobParameter jobParameter = new MLJobParameter(
582587
MLJobType.STATS_COLLECTOR.name(),
@@ -593,7 +598,10 @@ public void indexStatsCollectorJob(boolean enabled) {
593598
.source(jobParameter.toXContent(JsonXContent.contentBuilder(), null))
594599
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
595600

596-
indexJob(indexRequest, MLJobType.STATS_COLLECTOR, () -> {});
601+
indexJob(indexRequest, MLJobType.STATS_COLLECTOR, () -> {
602+
this.statsCollectorJobStarted = enabled;
603+
log.debug("Stats collector job {} successfully", enabled ? "started" : "stopped");
604+
});
597605
} catch (IOException e) {
598606
log.error("Failed to index stats collection job", e);
599607
}

plugin/src/test/java/org/opensearch/ml/cluster/MLCommonsClusterEventListenerTests.java

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import static org.mockito.Mockito.never;
1111
import static org.mockito.Mockito.verify;
1212
import static org.mockito.Mockito.when;
13-
import static org.opensearch.ml.common.CommonValue.TASK_POLLING_JOB_INDEX;
13+
import static org.opensearch.ml.common.CommonValue.ML_JOBS_INDEX;
1414

1515
import java.util.Collections;
1616

@@ -82,45 +82,42 @@ public void testClusterChanged_WithV31DataNode_MetricCollectionEnabled() {
8282
listener.clusterChanged(event);
8383

8484
verify(mlTaskManager).indexStatsCollectorJob(true);
85-
verify(mlTaskManager, never()).startTaskPollingJob();
8685
}
8786

88-
public void testClusterChanged_WithV31DataNode_TaskPollingIndexExists() {
89-
DiscoveryNode dataNode = createDataNode(Version.V_3_1_0);
90-
setupClusterState(dataNode, true);
87+
public void testClusterChanged_WithPreV31DataNode_NoJobsStarted() {
88+
DiscoveryNode dataNode = createDataNode(Version.V_3_0_0);
89+
setupClusterState(dataNode, false);
9190

92-
when(mlFeatureEnabledSetting.isMetricCollectionEnabled()).thenReturn(false);
91+
when(mlFeatureEnabledSetting.isMetricCollectionEnabled()).thenReturn(true);
92+
when(mlFeatureEnabledSetting.isStaticMetricCollectionEnabled()).thenReturn(true);
9393

9494
listener.clusterChanged(event);
9595

9696
verify(mlTaskManager, never()).indexStatsCollectorJob(anyBoolean());
97-
verify(mlTaskManager).startTaskPollingJob();
9897
}
9998

100-
public void testClusterChanged_WithPreV31DataNode_NoJobsStarted() {
101-
DiscoveryNode dataNode = createDataNode(Version.V_3_0_0);
102-
setupClusterState(dataNode, true);
99+
public void testClusterChanged_WithPostV31DataNode_JobsStarted() {
100+
DiscoveryNode dataNode = createDataNode(Version.V_3_2_0);
101+
setupClusterState(dataNode, false);
103102

104103
when(mlFeatureEnabledSetting.isMetricCollectionEnabled()).thenReturn(true);
105104
when(mlFeatureEnabledSetting.isStaticMetricCollectionEnabled()).thenReturn(true);
106105

107106
listener.clusterChanged(event);
108107

109-
verify(mlTaskManager, never()).indexStatsCollectorJob(anyBoolean());
110-
verify(mlTaskManager, never()).startTaskPollingJob();
108+
verify(mlTaskManager).indexStatsCollectorJob(true);
111109
}
112110

113-
public void testClusterChanged_WithPostV31DataNode_JobsStarted() {
114-
DiscoveryNode dataNode = createDataNode(Version.V_3_2_0);
111+
public void testClusterChanged_IndexAlreadyPresent_JobNotStarted() {
112+
DiscoveryNode dataNode = createDataNode(Version.V_3_1_0);
115113
setupClusterState(dataNode, true);
116114

117115
when(mlFeatureEnabledSetting.isMetricCollectionEnabled()).thenReturn(true);
118116
when(mlFeatureEnabledSetting.isStaticMetricCollectionEnabled()).thenReturn(true);
119117

120118
listener.clusterChanged(event);
121119

122-
verify(mlTaskManager).indexStatsCollectorJob(true);
123-
verify(mlTaskManager).startTaskPollingJob();
120+
verify(mlTaskManager, never()).indexStatsCollectorJob(anyBoolean());
124121
}
125122

126123
private DiscoveryNode createDataNode(Version version) {
@@ -134,7 +131,7 @@ private DiscoveryNode createDataNode(Version version) {
134131
);
135132
}
136133

137-
private void setupClusterState(DiscoveryNode node, boolean hasTaskPollingIndex) {
134+
private void setupClusterState(DiscoveryNode node, boolean hasMLJobsIndex) {
138135
DiscoveryNodes nodes = DiscoveryNodes.builder().add(node).build();
139136

140137
when(event.state()).thenReturn(clusterState);
@@ -143,7 +140,7 @@ private void setupClusterState(DiscoveryNode node, boolean hasTaskPollingIndex)
143140
when(clusterState.nodes()).thenReturn(nodes);
144141
when(clusterState.getMetadata()).thenReturn(metadata);
145142
when(clusterService.state()).thenReturn(clusterState);
146-
when(metadata.hasIndex(TASK_POLLING_JOB_INDEX)).thenReturn(hasTaskPollingIndex);
143+
when(metadata.hasIndex(ML_JOBS_INDEX)).thenReturn(hasMLJobsIndex);
147144
when(metadata.settings()).thenReturn(org.opensearch.common.settings.Settings.EMPTY);
148145
}
149146
}

0 commit comments

Comments
 (0)