Skip to content

Commit 62cf474

Browse files
committed
feat: add settings to control metric collection
Signed-off-by: Pavan Yekbote <[email protected]>
1 parent c978466 commit 62cf474

File tree

8 files changed

+94
-16
lines changed

8 files changed

+94
-16
lines changed

common/src/main/java/org/opensearch/ml/common/settings/MLCommonsSettings.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,4 +342,12 @@ private MLCommonsSettings() {}
342342
/** This setting sets the remote metadata service name */
343343
public static final Setting<String> REMOTE_METADATA_SERVICE_NAME = Setting
344344
.simpleString("plugins.ml_commons." + REMOTE_METADATA_SERVICE_NAME_KEY, Setting.Property.NodeScope, Setting.Property.Final);
345+
346+
// Feature flag for enabling telemetry metric collection via metrics framework
347+
public static final Setting<Boolean> ML_COMMONS_METRIC_COLLECTION_ENABLED = Setting
348+
.boolSetting("plugins.ml_commons.metrics_collection_enabled", false, Setting.Property.NodeScope, Setting.Property.Dynamic);
349+
350+
// Feature flag for enabling telemetry static metric collection job -- MLStatsJobProcessor
351+
public static final Setting<Boolean> ML_COMMONS_STATIC_METRIC_COLLECTION_ENABLED = Setting
352+
.boolSetting("plugins.ml_commons.metrics_static_collection_enabled", false, Setting.Property.NodeScope, Setting.Property.Dynamic);
345353
}

common/src/main/java/org/opensearch/ml/common/settings/MLFeatureEnabledSetting.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,13 @@
1111
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_CONNECTOR_PRIVATE_IP_ENABLED;
1212
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_CONTROLLER_ENABLED;
1313
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_LOCAL_MODEL_ENABLED;
14+
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_METRIC_COLLECTION_ENABLED;
1415
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_MULTI_TENANCY_ENABLED;
1516
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_OFFLINE_BATCH_INFERENCE_ENABLED;
1617
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_OFFLINE_BATCH_INGESTION_ENABLED;
18+
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_RAG_PIPELINE_FEATURE_ENABLED;
1719
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_REMOTE_INFERENCE_ENABLED;
20+
import static org.opensearch.ml.common.settings.MLCommonsSettings.ML_COMMONS_STATIC_METRIC_COLLECTION_ENABLED;
1821

1922
import java.util.ArrayList;
2023
import java.util.List;
@@ -42,6 +45,11 @@ public class MLFeatureEnabledSetting {
4245

4346
private volatile Boolean isRagSearchPipelineEnabled;
4447

48+
// block any push
49+
private volatile Boolean isMetricCollectionEnabled;
50+
// block static push
51+
private volatile Boolean isStaticMetricCollectionEnabled;
52+
4553
private final List<SettingsChangeListener> listeners = new ArrayList<>();
4654

4755
public MLFeatureEnabledSetting(ClusterService clusterService, Settings settings) {
@@ -53,6 +61,9 @@ public MLFeatureEnabledSetting(ClusterService clusterService, Settings settings)
5361
isBatchIngestionEnabled = ML_COMMONS_OFFLINE_BATCH_INGESTION_ENABLED.get(settings);
5462
isBatchInferenceEnabled = ML_COMMONS_OFFLINE_BATCH_INFERENCE_ENABLED.get(settings);
5563
isMultiTenancyEnabled = ML_COMMONS_MULTI_TENANCY_ENABLED.get(settings);
64+
isRagSearchPipelineEnabled = ML_COMMONS_RAG_PIPELINE_FEATURE_ENABLED.get(settings);
65+
isMetricCollectionEnabled = ML_COMMONS_METRIC_COLLECTION_ENABLED.get(settings);
66+
isStaticMetricCollectionEnabled = ML_COMMONS_STATIC_METRIC_COLLECTION_ENABLED.get(settings);
5667

5768
clusterService
5869
.getClusterSettings()
@@ -74,6 +85,12 @@ public MLFeatureEnabledSetting(ClusterService clusterService, Settings settings)
7485
clusterService
7586
.getClusterSettings()
7687
.addSettingsUpdateConsumer(MLCommonsSettings.ML_COMMONS_RAG_PIPELINE_FEATURE_ENABLED, it -> isRagSearchPipelineEnabled = it);
88+
clusterService
89+
.getClusterSettings()
90+
.addSettingsUpdateConsumer(ML_COMMONS_METRIC_COLLECTION_ENABLED, it -> isMetricCollectionEnabled = it);
91+
clusterService
92+
.getClusterSettings()
93+
.addSettingsUpdateConsumer(ML_COMMONS_STATIC_METRIC_COLLECTION_ENABLED, it -> isStaticMetricCollectionEnabled = it);
7794
}
7895

7996
/**
@@ -148,6 +165,14 @@ public boolean isRagSearchPipelineEnabled() {
148165
return isRagSearchPipelineEnabled;
149166
}
150167

168+
public boolean isMetricCollectionEnabled() {
169+
return isMetricCollectionEnabled;
170+
}
171+
172+
public boolean isStaticMetricCollectionEnabled() {
173+
return isStaticMetricCollectionEnabled;
174+
}
175+
151176
@VisibleForTesting
152177
public void notifyMultiTenancyListeners(boolean isEnabled) {
153178
for (SettingsChangeListener listener : listeners) {

plugin/src/main/java/org/opensearch/ml/jobs/MLJobRunner.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.opensearch.jobscheduler.spi.JobExecutionContext;
1010
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
1111
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
12+
import org.opensearch.ml.common.settings.MLFeatureEnabledSetting;
1213
import org.opensearch.ml.helper.ConnectorAccessControlHelper;
1314
import org.opensearch.ml.jobs.processors.MLBatchTaskUpdateProcessor;
1415
import org.opensearch.ml.jobs.processors.MLStatsJobProcessor;
@@ -52,6 +53,9 @@ public static MLJobRunner getInstance() {
5253
@Setter
5354
private ConnectorAccessControlHelper connectorAccessControlHelper;
5455

56+
@Setter
57+
private MLFeatureEnabledSetting mlFeatureEnabledSetting;
58+
5559
private boolean initialized;
5660

5761
private MLJobRunner() {
@@ -63,14 +67,16 @@ public void initialize(
6367
final ThreadPool threadPool,
6468
final Client client,
6569
final SdkClient sdkClient,
66-
final ConnectorAccessControlHelper connectorAccessControlHelper
70+
final ConnectorAccessControlHelper connectorAccessControlHelper,
71+
final MLFeatureEnabledSetting mlFeatureEnabledSetting
6772
) {
6873
this.clusterService = clusterService;
6974
this.threadPool = threadPool;
7075
this.client = client;
7176
this.sdkClient = sdkClient;
7277
this.connectorAccessControlHelper = connectorAccessControlHelper;
7378
this.initialized = true;
79+
this.mlFeatureEnabledSetting = mlFeatureEnabledSetting;
7480
}
7581

7682
@Override
@@ -84,7 +90,7 @@ public void runJob(ScheduledJobParameter scheduledJobParameter, JobExecutionCont
8490
case STATS_COLLECTOR:
8591
MLStatsJobProcessor
8692
.getInstance(clusterService, client, threadPool, connectorAccessControlHelper, sdkClient)
87-
.process(jobParameter, jobExecutionContext);
93+
.process(jobParameter, jobExecutionContext, mlFeatureEnabledSetting.isStaticMetricCollectionEnabled());
8894
break;
8995
case BATCH_TASK_UPDATE:
9096
MLBatchTaskUpdateProcessor.getInstance(clusterService, client, threadPool).process(jobParameter, jobExecutionContext);

plugin/src/main/java/org/opensearch/ml/jobs/processors/MLJobProcessor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.opensearch.jobscheduler.spi.JobExecutionContext;
1515
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
1616
import org.opensearch.jobscheduler.spi.utils.LockService;
17+
import org.opensearch.ml.common.exception.MLException;
1718
import org.opensearch.threadpool.ThreadPool;
1819
import org.opensearch.transport.client.Client;
1920

@@ -33,6 +34,14 @@ public MLJobProcessor(ClusterService clusterService, Client client, ThreadPool t
3334

3435
public abstract void run();
3536

37+
public void process(ScheduledJobParameter scheduledJobParameter, JobExecutionContext jobExecutionContext, boolean isProcessorEnabled) {
38+
if (!isProcessorEnabled) {
39+
throw new MLException(scheduledJobParameter.getName() + " not enabled.");
40+
}
41+
42+
process(scheduledJobParameter, jobExecutionContext);
43+
}
44+
3645
public void process(ScheduledJobParameter scheduledJobParameter, JobExecutionContext jobExecutionContext) {
3746
final LockService lockService = jobExecutionContext.getLockService();
3847

plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -764,11 +764,11 @@ public Collection<Object> createComponents(
764764
mlFeatureEnabledSetting
765765
);
766766

767-
MLJobRunner.getInstance().initialize(clusterService, threadPool, client, sdkClient, connectorAccessControlHelper);
768-
769-
// todo: add setting
770-
MLOperationalMetricsCounter.initialize(clusterService.getClusterName().toString(), metricsRegistry);
771-
MLAdoptionMetricsCounter.initialize(clusterService.getClusterName().toString(), metricsRegistry);
767+
MLJobRunner
768+
.getInstance()
769+
.initialize(clusterService, threadPool, client, sdkClient, connectorAccessControlHelper, mlFeatureEnabledSetting);
770+
MLOperationalMetricsCounter.initialize(clusterService.getClusterName().toString(), metricsRegistry, mlFeatureEnabledSetting);
771+
MLAdoptionMetricsCounter.initialize(clusterService.getClusterName().toString(), metricsRegistry, mlFeatureEnabledSetting);
772772

773773
return ImmutableList
774774
.of(

plugin/src/main/java/org/opensearch/ml/stats/otel/counters/AbstractMLMetricsCounter.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,43 @@
99
import java.util.concurrent.ConcurrentHashMap;
1010
import java.util.stream.Stream;
1111

12+
import org.opensearch.ml.common.settings.MLFeatureEnabledSetting;
1213
import org.opensearch.telemetry.metrics.Counter;
1314
import org.opensearch.telemetry.metrics.MetricsRegistry;
1415
import org.opensearch.telemetry.metrics.tags.Tags;
1516

17+
import lombok.extern.log4j.Log4j2;
18+
19+
@Log4j2
1620
public abstract class AbstractMLMetricsCounter<T extends Enum<T>> {
1721
private static final String PREFIX = "ml.commons.";
1822
private static final String UNIT = "1";
1923
private static final String CLUSTER_NAME_TAG = "cluster_name";
2024

25+
private final MLFeatureEnabledSetting mlFeatureEnabledSetting;
26+
2127
protected final String clusterName;
2228
protected final MetricsRegistry metricsRegistry;
2329
protected final Map<T, Counter> metricCounterMap;
2430

25-
protected AbstractMLMetricsCounter(String clusterName, MetricsRegistry metricsRegistry, Class<T> metricClass) {
31+
protected AbstractMLMetricsCounter(
32+
String clusterName,
33+
MetricsRegistry metricsRegistry,
34+
Class<T> metricClass,
35+
MLFeatureEnabledSetting mlFeatureEnabledSetting
36+
) {
2637
this.clusterName = clusterName;
2738
this.metricsRegistry = metricsRegistry;
2839
this.metricCounterMap = new ConcurrentHashMap<>();
40+
this.mlFeatureEnabledSetting = mlFeatureEnabledSetting;
2941
Stream.of(metricClass.getEnumConstants()).forEach(metric -> metricCounterMap.computeIfAbsent(metric, this::createMetricCounter));
3042
}
3143

3244
public void incrementCounter(T metric, Tags customTags) {
45+
if (!mlFeatureEnabledSetting.isMetricCollectionEnabled()) {
46+
return;
47+
}
48+
3349
Counter counter = metricCounterMap.computeIfAbsent(metric, this::createMetricCounter);
3450
Tags metricsTags = (customTags == null ? Tags.create() : customTags).addTag(CLUSTER_NAME_TAG, clusterName);
3551
counter.add(1, metricsTags);

plugin/src/main/java/org/opensearch/ml/stats/otel/counters/MLAdoptionMetricsCounter.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,24 @@
55

66
package org.opensearch.ml.stats.otel.counters;
77

8+
import org.opensearch.ml.common.settings.MLFeatureEnabledSetting;
89
import org.opensearch.ml.stats.otel.metrics.AdoptionMetric;
910
import org.opensearch.telemetry.metrics.MetricsRegistry;
1011

1112
public class MLAdoptionMetricsCounter extends AbstractMLMetricsCounter<AdoptionMetric> {
1213

1314
private static MLAdoptionMetricsCounter instance;
1415

15-
private MLAdoptionMetricsCounter(String clusterName, MetricsRegistry metricsRegistry) {
16-
super(clusterName, metricsRegistry, AdoptionMetric.class);
16+
private MLAdoptionMetricsCounter(String clusterName, MetricsRegistry metricsRegistry, MLFeatureEnabledSetting mlFeatureEnabledSetting) {
17+
super(clusterName, metricsRegistry, AdoptionMetric.class, mlFeatureEnabledSetting);
1718
}
1819

19-
public static synchronized void initialize(String clusterName, MetricsRegistry metricsRegistry) {
20-
instance = new MLAdoptionMetricsCounter(clusterName, metricsRegistry);
20+
public static synchronized void initialize(
21+
String clusterName,
22+
MetricsRegistry metricsRegistry,
23+
MLFeatureEnabledSetting mlFeatureEnabledSetting
24+
) {
25+
instance = new MLAdoptionMetricsCounter(clusterName, metricsRegistry, mlFeatureEnabledSetting);
2126
}
2227

2328
public static synchronized MLAdoptionMetricsCounter getInstance() {

plugin/src/main/java/org/opensearch/ml/stats/otel/counters/MLOperationalMetricsCounter.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,28 @@
55

66
package org.opensearch.ml.stats.otel.counters;
77

8+
import org.opensearch.ml.common.settings.MLFeatureEnabledSetting;
89
import org.opensearch.ml.stats.otel.metrics.OperationalMetric;
910
import org.opensearch.telemetry.metrics.MetricsRegistry;
1011

1112
public class MLOperationalMetricsCounter extends AbstractMLMetricsCounter<OperationalMetric> {
1213

1314
private static MLOperationalMetricsCounter instance;
1415

15-
private MLOperationalMetricsCounter(String clusterName, MetricsRegistry metricsRegistry) {
16-
super(clusterName, metricsRegistry, OperationalMetric.class);
16+
private MLOperationalMetricsCounter(
17+
String clusterName,
18+
MetricsRegistry metricsRegistry,
19+
MLFeatureEnabledSetting mlFeatureEnabledSetting
20+
) {
21+
super(clusterName, metricsRegistry, OperationalMetric.class, mlFeatureEnabledSetting);
1722
}
1823

19-
public static synchronized void initialize(String clusterName, MetricsRegistry metricsRegistry) {
20-
instance = new MLOperationalMetricsCounter(clusterName, metricsRegistry);
24+
public static synchronized void initialize(
25+
String clusterName,
26+
MetricsRegistry metricsRegistry,
27+
MLFeatureEnabledSetting mlFeatureEnabledSetting
28+
) {
29+
instance = new MLOperationalMetricsCounter(clusterName, metricsRegistry, mlFeatureEnabledSetting);
2130
}
2231

2332
public static synchronized MLOperationalMetricsCounter getInstance() {

0 commit comments

Comments
 (0)