From 61a6f1e5fd0e318402da60271f6e624e06cf3362 Mon Sep 17 00:00:00 2001 From: adiraj_linkedin Date: Thu, 27 Nov 2025 16:15:24 +0530 Subject: [PATCH] Added otel support for clusterInfo sensor --- .../com/linkedin/d2/jmx/ClusterInfoJmx.java | 24 ++++++++++++++--- .../jmx/ClusterInfoOtelMetricsProvider.java | 17 ++++++++++++ .../java/com/linkedin/d2/jmx/JmxManager.java | 9 ++++++- .../NoOpClusterInfoOtelMetricsProvider.java | 10 +++++++ .../linkedin/d2/jmx/ClusterInfoJmxTest.java | 26 +++++++++++++++++++ 5 files changed, 81 insertions(+), 5 deletions(-) create mode 100644 d2/src/main/java/com/linkedin/d2/jmx/ClusterInfoOtelMetricsProvider.java create mode 100644 d2/src/main/java/com/linkedin/d2/jmx/NoOpClusterInfoOtelMetricsProvider.java diff --git a/d2/src/main/java/com/linkedin/d2/jmx/ClusterInfoJmx.java b/d2/src/main/java/com/linkedin/d2/jmx/ClusterInfoJmx.java index 2f52c3255d..dee95a496e 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/ClusterInfoJmx.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/ClusterInfoJmx.java @@ -18,14 +18,22 @@ import com.linkedin.d2.balancer.LoadBalancerStateItem; import com.linkedin.d2.balancer.simple.ClusterInfoItem; +import com.linkedin.d2.jmx.ClusterInfoOtelMetricsProvider; public class ClusterInfoJmx implements ClusterInfoJmxMBean { private final ClusterInfoItem _clusterInfoItem; + private final ClusterInfoOtelMetricsProvider _clusterInfoOtelMetricsProvider; - public ClusterInfoJmx(ClusterInfoItem clusterInfoItem) { + public ClusterInfoJmx(ClusterInfoItem clusterInfoItem) + { + this(clusterInfoItem, new NoOpClusterInfoOtelMetricsProvider()); + } + + public ClusterInfoJmx(ClusterInfoItem clusterInfoItem, ClusterInfoOtelMetricsProvider clusterInfoOtelMetricsProvider) { _clusterInfoItem = clusterInfoItem; + _clusterInfoOtelMetricsProvider = clusterInfoOtelMetricsProvider; } @Override @@ -37,11 +45,19 @@ public ClusterInfoItem getClusterInfoItem() @Override public int getCanaryDistributionPolicy() { + String clusterName = _clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName(); + switch (_clusterInfoItem.getClusterPropertiesItem().getDistribution()) { - case STABLE: return 0; - case CANARY: return 1; - default: return -1; + case STABLE: + _clusterInfoOtelMetricsProvider.recordCanaryDistributionPolicy(clusterName, 0); + return 0; + case CANARY: + _clusterInfoOtelMetricsProvider.recordCanaryDistributionPolicy(clusterName, 1); + return 1; + default: + _clusterInfoOtelMetricsProvider.recordCanaryDistributionPolicy(clusterName, -1); + return -1; } } } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/ClusterInfoOtelMetricsProvider.java b/d2/src/main/java/com/linkedin/d2/jmx/ClusterInfoOtelMetricsProvider.java new file mode 100644 index 0000000000..caace24930 --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/ClusterInfoOtelMetricsProvider.java @@ -0,0 +1,17 @@ +package com.linkedin.d2.jmx; + +/** + * Interface for OpenTelemetry metrics collection for ClusterInfo. + */ +public interface ClusterInfoOtelMetricsProvider { + + /** + * Records the current canary distribution policy for the cluster + * (0=STABLE, 1=CANARY, -1=UNSPECIFIED) + * + * @param clusterName the name of the cluster + * @param policy the canary distribution policy + */ + void recordCanaryDistributionPolicy(String clusterName, int policy); + +} diff --git a/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java b/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java index c4b9eef977..744eb82bbf 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java @@ -50,10 +50,17 @@ public class JmxManager private final MBeanServer _server; private final Set _registeredNames = new HashSet<>(); + private ClusterInfoOtelMetricsProvider _clusterInfoOtelMetricsProvider; public JmxManager() + { + this(new NoOpClusterInfoOtelMetricsProvider()); + } + + public JmxManager(ClusterInfoOtelMetricsProvider clusterInfoOtelMetricsProvider) { _server = ManagementFactory.getPlatformMBeanServer(); + _clusterInfoOtelMetricsProvider = clusterInfoOtelMetricsProvider; } MBeanServer getMBeanServer() @@ -118,7 +125,7 @@ public synchronized JmxManager registerServicePropertiesJmxBean(String name, Ser public synchronized JmxManager registerClusterInfo(String name, ClusterInfoItem clusterInfo) { - checkReg(new ClusterInfoJmx(clusterInfo), name); + checkReg(new ClusterInfoJmx(clusterInfo, _clusterInfoOtelMetricsProvider), name); return this; } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/NoOpClusterInfoOtelMetricsProvider.java b/d2/src/main/java/com/linkedin/d2/jmx/NoOpClusterInfoOtelMetricsProvider.java new file mode 100644 index 0000000000..c2f306e07d --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/NoOpClusterInfoOtelMetricsProvider.java @@ -0,0 +1,10 @@ +package com.linkedin.d2.jmx; + +public class NoOpClusterInfoOtelMetricsProvider implements ClusterInfoOtelMetricsProvider +{ + @Override + public void recordCanaryDistributionPolicy(String clusterName, int policy) + { + // No-op + } +} diff --git a/d2/src/test/java/com/linkedin/d2/jmx/ClusterInfoJmxTest.java b/d2/src/test/java/com/linkedin/d2/jmx/ClusterInfoJmxTest.java index ecdbf00462..d77db8e601 100644 --- a/d2/src/test/java/com/linkedin/d2/jmx/ClusterInfoJmxTest.java +++ b/d2/src/test/java/com/linkedin/d2/jmx/ClusterInfoJmxTest.java @@ -31,6 +31,8 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import static org.mockito.Mockito.*; + public class ClusterInfoJmxTest { @@ -71,4 +73,28 @@ public int getPartitionId(URI uri) { ); Assert.assertEquals(clusterInfoJmx.getCanaryDistributionPolicy(), expectedValue); } + + @Test(dataProvider = "getCanaryDistributionPoliciesTestData") + public void testGetCanaryDistributionPolicyWithMetricsProvider(CanaryDistributionProvider.Distribution distribution, int expectedValue) + { + ClusterInfoOtelMetricsProvider mockProvider = mock(ClusterInfoOtelMetricsProvider.class); + ClusterInfoItem clusterInfoItem = new ClusterInfoItem(_mockedSimpleBalancerState, new ClusterProperties("Foo"), new PartitionAccessor() { + @Override + public int getMaxPartitionId() { + return 0; + } + + @Override + public int getPartitionId(URI uri) { + return 0; + } + }, distribution); + + ClusterInfoJmx clusterInfoJmx = new ClusterInfoJmx(clusterInfoItem, mockProvider); + + int actualValue = clusterInfoJmx.getCanaryDistributionPolicy(); + + Assert.assertEquals(actualValue, expectedValue); + verify(mockProvider, times(1)).recordCanaryDistributionPolicy("Foo", expectedValue); + } }