diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java index 88dba60f21..2ac56bb0f5 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java @@ -49,6 +49,7 @@ import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl; import com.linkedin.d2.balancer.zkfs.ZKFSUtil; import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter; +import com.linkedin.d2.jmx.LoadBalancerStateOtelMetricsProvider; import com.linkedin.d2.discovery.stores.zk.ZKPersistentConnection; import com.linkedin.d2.discovery.stores.zk.ZooKeeper; import com.linkedin.d2.jmx.XdsServerMetricsProvider; @@ -237,6 +238,7 @@ public D2Client build() _config.xdsChannelLoadBalancingPolicyConfig, _config.subscribeToUriGlobCollection, _config._xdsServerMetricsProvider, + _config._loadBalancerStateOtelMetricsProvider, _config.loadBalanceStreamException, _config.xdsInitialResourceVersionsEnabled, _config.disableDetectLiRawD2Client, @@ -339,6 +341,12 @@ else if (_config.restRetryEnabled || _config.streamRetryEnabled) return d2Client; } + public D2ClientBuilder setLoadBalancerStateOtelMetricsProvider(LoadBalancerStateOtelMetricsProvider provider) + { + _config._loadBalancerStateOtelMetricsProvider = provider; + return this; + } + /** * Check if the d2 client builder is to build a LI raw D2 client. When LI container D2ClientFactory is used, it sets * hostName and d2JmxManagerPrefix with LI-specific values (app name, machine name, etc) at runtime. All LI raw D2 diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java index d0ffa9e826..f0e6633144 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java @@ -44,6 +44,8 @@ import com.linkedin.d2.jmx.JmxManager; import com.linkedin.d2.jmx.NoOpXdsServerMetricsProvider; import com.linkedin.d2.jmx.NoOpJmxManager; +import com.linkedin.d2.jmx.LoadBalancerStateOtelMetricsProvider; +import com.linkedin.d2.jmx.NoOpLoadBalancerStateOtelMetricsProvider; import com.linkedin.r2.transport.common.TransportClientFactory; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; import java.time.Duration; @@ -181,6 +183,7 @@ public class D2ClientConfig public boolean subscribeToUriGlobCollection = false; public XdsServerMetricsProvider _xdsServerMetricsProvider = new NoOpXdsServerMetricsProvider(); + public LoadBalancerStateOtelMetricsProvider _loadBalancerStateOtelMetricsProvider = new NoOpLoadBalancerStateOtelMetricsProvider(); public boolean loadBalanceStreamException = false; public boolean xdsInitialResourceVersionsEnabled = false; public Integer xdsStreamMaxRetryBackoffSeconds = null; @@ -282,6 +285,7 @@ public D2ClientConfig() Map xdsChannelLoadBalancingPolicyConfig, boolean subscribeToUriGlobCollection, XdsServerMetricsProvider xdsServerMetricsProvider, + LoadBalancerStateOtelMetricsProvider loadBalancerStateOtelMetricsProvider, boolean loadBalanceStreamException, boolean xdsInitialResourceVersionsEnabled, boolean disableDetectLiRawD2Client, @@ -294,7 +298,7 @@ public D2ClientConfig() Boolean enableIndisDownstreamServicesFetcher, Duration indisDownstreamServicesFetchTimeout) { - this.zkHosts = zkHosts; + this.zkHosts = zkHosts; this.xdsServer = xdsServer; this.hostName = hostName; this.zkSessionTimeoutInMs = zkSessionTimeoutInMs; @@ -366,7 +370,8 @@ public D2ClientConfig() this.xdsChannelLoadBalancingPolicyConfig = xdsChannelLoadBalancingPolicyConfig; this.xdsChannelKeepAliveTimeMins = xdsChannelKeepAliveTimeMins; this.subscribeToUriGlobCollection = subscribeToUriGlobCollection; - this._xdsServerMetricsProvider = xdsServerMetricsProvider; + this._xdsServerMetricsProvider = xdsServerMetricsProvider; + this._loadBalancerStateOtelMetricsProvider = loadBalancerStateOtelMetricsProvider == null ? new NoOpLoadBalancerStateOtelMetricsProvider() : loadBalancerStateOtelMetricsProvider; this.loadBalanceStreamException = loadBalanceStreamException; this.xdsInitialResourceVersionsEnabled = xdsInitialResourceVersionsEnabled; this.disableDetectLiRawD2Client = disableDetectLiRawD2Client; diff --git a/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java index fcaa9d2578..5acc52212a 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java @@ -66,8 +66,8 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) logAppProps(LOG); } - D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager, - D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager); + D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager, + D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager, config._loadBalancerStateOtelMetricsProvider); // init connection ZKConnectionBuilder zkConnectionBuilder = new ZKConnectionBuilder(config.zkHosts); diff --git a/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java index 332fd7d4aa..867fd37aa8 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java @@ -80,8 +80,8 @@ private ZKFSLoadBalancer.TogglingLoadBalancerFactory createLoadBalancerFactory(D loadBalancerComponentFactory = config.componentFactory; } - D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager, - D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager); + D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager, + D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager, config._loadBalancerStateOtelMetricsProvider); return new ZKFSTogglingLoadBalancerFactoryImpl(loadBalancerComponentFactory, config.lbWaitTimeout, diff --git a/d2/src/main/java/com/linkedin/d2/balancer/zkfs/ZKFSTogglingLoadBalancerFactoryImpl.java b/d2/src/main/java/com/linkedin/d2/balancer/zkfs/ZKFSTogglingLoadBalancerFactoryImpl.java index 3306521a34..d79ab9d64b 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/zkfs/ZKFSTogglingLoadBalancerFactoryImpl.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/zkfs/ZKFSTogglingLoadBalancerFactoryImpl.java @@ -164,7 +164,7 @@ public ZKFSTogglingLoadBalancerFactoryImpl(ComponentFactory factory, new PartitionAccessorRegistryImpl(), false, validationStrings -> null, - new D2ClientJmxManager("notSpecified", new NoOpJmxManager()), + new D2ClientJmxManager("notSpecified", new NoOpJmxManager(), D2ClientJmxManager.DiscoverySourceType.ZK, null, null), ZooKeeperEphemeralStore.DEFAULT_READ_WINDOW_MS); } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java index 7795d8c80f..9ab2db2d79 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java @@ -29,6 +29,8 @@ import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState; import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState.SimpleLoadBalancerStateListener; import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy; +import com.linkedin.d2.jmx.LoadBalancerStateOtelMetricsProvider; +import com.linkedin.d2.jmx.NoOpLoadBalancerStateOtelMetricsProvider; import com.linkedin.d2.discovery.stores.file.FileStore; import com.linkedin.d2.discovery.stores.zk.ZooKeeperEphemeralStore; import com.linkedin.d2.discovery.stores.zk.ZooKeeperPermanentStore; @@ -72,6 +74,7 @@ When dual read state manager is null, only one discovery source is working (coul private final String _secondaryPrefixForLbPropertyJmxName; private final D2ClientJmxDualReadModeWatcherManager _watcherManager; + private final LoadBalancerStateOtelMetricsProvider _loadBalancerStateOtelMetricsProvider; public enum DiscoverySourceType @@ -94,19 +97,21 @@ public String getPrintName() public D2ClientJmxManager(String prefix, @Nonnull JmxManager jmxManager) { - this(prefix, jmxManager, DiscoverySourceType.ZK, null); + this(prefix, jmxManager, DiscoverySourceType.ZK, null, null); } public D2ClientJmxManager(String prefix, @Nonnull JmxManager jmxManager, @Nonnull DiscoverySourceType discoverySourceType, - @Nullable DualReadStateManager dualReadStateManager) + @Nullable DualReadStateManager dualReadStateManager, + @Nullable LoadBalancerStateOtelMetricsProvider loadBalancerStateOtelMetricsProvider) { ArgumentUtil.ensureNotNull(jmxManager,"jmxManager"); _primaryGlobalPrefix = prefix; _jmxManager = jmxManager; _discoverySourceType = discoverySourceType; _dualReadStateManager = dualReadStateManager; + _loadBalancerStateOtelMetricsProvider = loadBalancerStateOtelMetricsProvider == null ? new NoOpLoadBalancerStateOtelMetricsProvider() : loadBalancerStateOtelMetricsProvider; _secondaryGlobalPrefix = String.format("%s-%s", _primaryGlobalPrefix, _discoverySourceType.getPrintName()); _secondaryPrefixForLbPropertyJmxName = String.format("%s-", _discoverySourceType.getPrintName()); _watcherManager = _dualReadStateManager == null ? new NoOpD2ClientJmxDualReadModeWatcherManagerImpl() @@ -317,8 +322,14 @@ private void doRegisterLoadBalancer(SimpleLoadBalancer balancer, @Nullable DualR private void doRegisterLoadBalancerState(SimpleLoadBalancerState state, @Nullable DualReadModeProvider.DualReadMode mode) { - final String jmxName = String.format("%s-LoadBalancerState", getGlobalPrefix(mode)); - _jmxManager.registerLoadBalancerState(jmxName, state); + final String jmxName = String.format("%s-LoadBalancerState", getGlobalPrefix(mode)); + // First call the existing API that tests expect (registerLoadBalancerState with the state) + _jmxManager.registerLoadBalancerState(jmxName, state); + // Then create the JMX bean with OTel provider and set client name before replacing the registration + SimpleLoadBalancerStateJmx bean = new SimpleLoadBalancerStateJmx(state, _loadBalancerStateOtelMetricsProvider); + String clientName = getGlobalPrefix(mode); + bean.setClientName(clientName); + _jmxManager.checkReg(bean, jmxName); } private void doRegisterUriFileStore(FileStore uriStore, @Nullable DualReadModeProvider.DualReadMode mode) diff --git a/d2/src/main/java/com/linkedin/d2/jmx/LoadBalancerStateOtelMetricsProvider.java b/d2/src/main/java/com/linkedin/d2/jmx/LoadBalancerStateOtelMetricsProvider.java new file mode 100644 index 0000000000..bd61851376 --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/LoadBalancerStateOtelMetricsProvider.java @@ -0,0 +1,24 @@ +package com.linkedin.d2.jmx; + +/** + * Interface for OpenTelemetry metrics collection for LoadBalancerState sensor. + */ +public interface LoadBalancerStateOtelMetricsProvider { + + /** + * Records both regular and symlink cluster counts for a client. + * + * @param clientName the client name + * @param regularClusterCount regular cluster count + * @param symlinkClusterCount symlink cluster count + */ + void recordClusterCount(String clientName, long regularClusterCount, long symlinkClusterCount); + + /** + * Records service count for a client. + * + * @param clientName the client name + * @param serviceCount service count + */ + void recordServiceCount(String clientName, long serviceCount); +} diff --git a/d2/src/main/java/com/linkedin/d2/jmx/NoOpLoadBalancerStateOtelMetricsProvider.java b/d2/src/main/java/com/linkedin/d2/jmx/NoOpLoadBalancerStateOtelMetricsProvider.java new file mode 100644 index 0000000000..f81c5e3cb4 --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/NoOpLoadBalancerStateOtelMetricsProvider.java @@ -0,0 +1,18 @@ +package com.linkedin.d2.jmx; + +/** + * No-Op implementation of {@link LoadBalancerStateOtelMetricsProvider}. + * Used when OpenTelemetry metrics are disabled. + */ +public class NoOpLoadBalancerStateOtelMetricsProvider implements LoadBalancerStateOtelMetricsProvider { + + @Override + public void recordClusterCount(String clientName, long regularClusterCount, long symlinkClusterCount) { + // no-op + } + + @Override + public void recordServiceCount(String clientName, long serviceCount) { + // no-op + } +} diff --git a/d2/src/main/java/com/linkedin/d2/jmx/SimpleLoadBalancerStateJmx.java b/d2/src/main/java/com/linkedin/d2/jmx/SimpleLoadBalancerStateJmx.java index b826388676..e857b9ef1a 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/SimpleLoadBalancerStateJmx.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/SimpleLoadBalancerStateJmx.java @@ -30,22 +30,46 @@ public class SimpleLoadBalancerStateJmx implements SimpleLoadBalancerStateJmxMBean { private final SimpleLoadBalancerState _state; + private final LoadBalancerStateOtelMetricsProvider _otelMetricsProvider; + private String _clientName = "-"; + @Deprecated public SimpleLoadBalancerStateJmx(SimpleLoadBalancerState state) + { + this(state, new NoOpLoadBalancerStateOtelMetricsProvider()); + } + + public SimpleLoadBalancerStateJmx(SimpleLoadBalancerState state, LoadBalancerStateOtelMetricsProvider otelMetricsProvider) { _state = state; + _otelMetricsProvider = otelMetricsProvider == null ? new NoOpLoadBalancerStateOtelMetricsProvider() : otelMetricsProvider; + } + + public void setClientName(String clientName) + { + _clientName = clientName == null ? "-" : clientName; + } + + public String getClientName() + { + return _clientName; } @Override public int getClusterCount() { - return _state.getClusterCount(); + int regular = _state.getClusterCount(); + long symlink = _state.getClusters().stream().filter(SymlinkUtil::isSymlinkNodeOrPath).count(); + _otelMetricsProvider.recordClusterCount(_clientName, (long) regular, symlink); + return regular; } @Override public long getSymlinkClusterCount() { - return _state.getClusters().stream().filter(SymlinkUtil::isSymlinkNodeOrPath).count(); + long symlink = _state.getClusters().stream().filter(SymlinkUtil::isSymlinkNodeOrPath).count(); + _otelMetricsProvider.recordClusterCount(_clientName, (long) _state.getClusterCount(), symlink); + return symlink; } @Override @@ -63,7 +87,9 @@ public int getListenerCount() @Override public int getServiceCount() { - return _state.getServiceCount(); + int count = _state.getServiceCount(); + _otelMetricsProvider.recordServiceCount(_clientName, (long) count); + return count; } @Override diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java index 4c77c81433..48bfdfc7e3 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java @@ -50,7 +50,7 @@ public boolean isIndisOnly() public LoadBalancerWithFacilities create(D2ClientConfig config) { D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager, - D2ClientJmxManager.DiscoverySourceType.XDS, config.dualReadStateManager); + D2ClientJmxManager.DiscoverySourceType.XDS, config.dualReadStateManager, config._loadBalancerStateOtelMetricsProvider); if (config.dualReadStateManager != null) { diff --git a/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java b/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java index 7f5747bad5..623400a878 100644 --- a/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java +++ b/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java @@ -157,11 +157,11 @@ D2ClientJmxManager getD2ClientJmxManager(String prefix, D2ClientJmxManager.Disco { if (sourceType == null) { // default to ZK source type, null dualReadStateManager - _d2ClientJmxManager = new D2ClientJmxManager(prefix, _jmxManager); + _d2ClientJmxManager = new D2ClientJmxManager(prefix, _jmxManager, D2ClientJmxManager.DiscoverySourceType.ZK, null, null); } else { - _d2ClientJmxManager = new D2ClientJmxManager(prefix, _jmxManager, sourceType, isDualReadLB ? _dualReadStateManager : null); + _d2ClientJmxManager = new D2ClientJmxManager(prefix, _jmxManager, sourceType, isDualReadLB ? _dualReadStateManager : null, null); } return _d2ClientJmxManager; } diff --git a/d2/src/test/java/com/linkedin/d2/jmx/D2LoadBalancerJmxTest.java b/d2/src/test/java/com/linkedin/d2/jmx/D2LoadBalancerJmxTest.java index 437fff4a1f..f70acb9a0e 100644 --- a/d2/src/test/java/com/linkedin/d2/jmx/D2LoadBalancerJmxTest.java +++ b/d2/src/test/java/com/linkedin/d2/jmx/D2LoadBalancerJmxTest.java @@ -114,7 +114,7 @@ private D2Client getD2Client(LoadBalancerWithFacilitiesFactory lbWithFacilitiesF private void testD2ClientJmxManagerRegisteringStrategies() { JmxManager mockJmxManager = mock(JmxManager.class); - D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(DUMMY_STRING, mockJmxManager); + D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(DUMMY_STRING, mockJmxManager, D2ClientJmxManager.DiscoverySourceType.ZK, null, null); SimpleLoadBalancerState simpleLoadBalancerState = mock(SimpleLoadBalancerState.class); d2ClientJmxManager.setSimpleLoadBalancerState(simpleLoadBalancerState); diff --git a/d2/src/test/java/com/linkedin/d2/jmx/TestSimpleLoadBalancerStateJmx.java b/d2/src/test/java/com/linkedin/d2/jmx/TestSimpleLoadBalancerStateJmx.java new file mode 100644 index 0000000000..7968cbaf0b --- /dev/null +++ b/d2/src/test/java/com/linkedin/d2/jmx/TestSimpleLoadBalancerStateJmx.java @@ -0,0 +1,62 @@ +package com.linkedin.d2.jmx; + +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; + +import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState; +import java.util.Arrays; +import java.util.HashSet; +import org.testng.annotations.Test; + +/** + * Unit tests for {@link SimpleLoadBalancerStateJmx}. + */ +public class TestSimpleLoadBalancerStateJmx +{ + @Test + public void testDefaultNoOpProviderDoesNotThrow() + { + SimpleLoadBalancerState state = mock(SimpleLoadBalancerState.class); + when(state.getClusterCount()).thenReturn(3); + when(state.getClusters()).thenReturn(new HashSet<>(Arrays.asList("a","$b"))); + when(state.getServiceCount()).thenReturn(5); + + SimpleLoadBalancerStateJmx jmx = new SimpleLoadBalancerStateJmx(state); + + assertEquals(jmx.getClusterCount(), 3); + assertEquals(jmx.getSymlinkClusterCount(), 1L); + assertEquals(jmx.getServiceCount(), 5); + } + + @Test + public void testWithMockProviderReceivesCallbacksAndClientName() + { + SimpleLoadBalancerState state = mock(SimpleLoadBalancerState.class); + when(state.getClusterCount()).thenReturn(2); + when(state.getClusters()).thenReturn(new HashSet<>(Arrays.asList("c1","$c2"))); + when(state.getServiceCount()).thenReturn(7); + + LoadBalancerStateOtelMetricsProvider provider = mock(LoadBalancerStateOtelMetricsProvider.class); + + SimpleLoadBalancerStateJmx jmx = new SimpleLoadBalancerStateJmx(state, provider); + + // default client name is "-" + jmx.getClusterCount(); + jmx.getSymlinkClusterCount(); + jmx.getServiceCount(); + + // cluster count method is called twice (both getters call recordClusterCount) + verify(provider, times(2)).recordClusterCount("-", 2L, 1L); + verify(provider, times(1)).recordServiceCount("-", 7L); + + // set client name and verify subsequent calls use it + jmx.setClientName("MyClient"); + // call both cluster getters and the service getter once — matches implementation + jmx.getClusterCount(); + jmx.getSymlinkClusterCount(); + jmx.getServiceCount(); + + verify(provider, times(2)).recordClusterCount("MyClient", 2L, 1L); + verify(provider, times(1)).recordServiceCount("MyClient", 7L); + } +}