Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.linkedin.d2.discovery.stores.zk.ZKPersistentConnection;
import com.linkedin.d2.discovery.stores.zk.ZooKeeper;
import com.linkedin.d2.jmx.XdsServerMetricsProvider;
import com.linkedin.d2.jmx.XdsClientOtelMetricsProvider;
import com.linkedin.d2.jmx.JmxManager;
import com.linkedin.d2.xds.XdsClientValidator;
import com.linkedin.d2.jmx.NoOpJmxManager;
Expand Down Expand Up @@ -237,6 +238,7 @@ public D2Client build()
_config.xdsChannelLoadBalancingPolicyConfig,
_config.subscribeToUriGlobCollection,
_config._xdsServerMetricsProvider,
_config._xdsClientOtelMetricsProvider,
_config.loadBalanceStreamException,
_config.xdsInitialResourceVersionsEnabled,
_config.disableDetectLiRawD2Client,
Expand Down Expand Up @@ -856,6 +858,11 @@ public D2ClientBuilder setXdsServerMetricsProvider(XdsServerMetricsProvider xdsS
return this;
}

public D2ClientBuilder setXdsClientOtelMetricsProvider(XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider) {
_config._xdsClientOtelMetricsProvider = xdsClientOtelMetricsProvider;
return this;
}

public D2ClientBuilder setLoadBalanceStreamException(boolean loadBalanceStreamException) {
_config.loadBalanceStreamException = loadBalanceStreamException;
return this;
Expand Down
143 changes: 143 additions & 0 deletions d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import com.linkedin.d2.jmx.JmxManager;
import com.linkedin.d2.jmx.NoOpXdsServerMetricsProvider;
import com.linkedin.d2.jmx.NoOpJmxManager;
import com.linkedin.d2.jmx.XdsClientOtelMetricsProvider;
import com.linkedin.d2.jmx.NoOpXdsClientOtelMetricsProvider;
import com.linkedin.r2.transport.common.TransportClientFactory;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import java.time.Duration;
Expand Down Expand Up @@ -181,6 +183,7 @@ public class D2ClientConfig

public boolean subscribeToUriGlobCollection = false;
public XdsServerMetricsProvider _xdsServerMetricsProvider = new NoOpXdsServerMetricsProvider();
public XdsClientOtelMetricsProvider _xdsClientOtelMetricsProvider = new NoOpXdsClientOtelMetricsProvider();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For public variables, we shouldn't prefix with _, that's the convention for private ones

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have followed the existing logic same as _xdsServerMetricsProvider which is basically for recording the xdsServer latency.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but this is new code and should follow better standards.. that last one was likely incorrectly committed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add docstring above this variable to explain how it is used here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

public boolean loadBalanceStreamException = false;
public boolean xdsInitialResourceVersionsEnabled = false;
public Integer xdsStreamMaxRetryBackoffSeconds = null;
Expand Down Expand Up @@ -293,6 +296,145 @@ public D2ClientConfig()
D2CalleeInfoRecorder d2CalleeInfoRecorder,
Boolean enableIndisDownstreamServicesFetcher,
Duration indisDownstreamServicesFetchTimeout)
{
this(zkHosts, xdsServer, hostName, zkSessionTimeoutInMs, zkStartupTimeoutInMs, lbWaitTimeout, lbWaitUnit,
flagFile, basePath, fsBasePath, indisFsBasePath, componentFactory, clientFactories, lbWithFacilitiesFactory,
sslContext, grpcSslContext, sslParameters, isSSLEnabled, shutdownAsynchronously, isSymlinkAware,
clientServicesConfig, d2ServicePath, useNewEphemeralStoreWatcher, healthCheckOperations, executorService,
retry, restRetryEnabled, streamRetryEnabled, retryLimit, retryUpdateIntervalMs, retryAggregatedIntervalNum,
warmUp, warmUpTimeoutSeconds, indisWarmUpTimeoutSeconds, warmUpConcurrentRequests,
indisWarmUpConcurrentRequests, downstreamServicesFetcher, indisDownstreamServicesFetcher,
backupRequestsEnabled, backupRequestsStrategyStatsConsumer,
backupRequestsLatencyNotificationInterval,
backupRequestsLatencyNotificationIntervalUnit,
enableBackupRequestsClientAsync,
backupRequestsExecutorService,
emitter,
partitionAccessorRegistry,
zooKeeperDecorator,
enableSaveUriDataOnDisk,
loadBalancerStrategyFactories,
requestTimeoutHandlerEnabled,
sslSessionValidatorFactory,
zkConnection,
startUpExecutorService,
indisStartUpExecutorService,
jmxManager,
d2JmxManagerPrefix,
zookeeperReadWindowMs,
enableRelativeLoadBalancer,
deterministicSubsettingMetadataProvider,
canaryDistributionProvider,
enableClusterFailout,
failoutConfigProviderFactory,
failoutRedirectStrategy,
serviceDiscoveryEventEmitter,
dualReadStateManager,
xdsExecutorService,
xdsStreamReadyTimeout,
dualReadNewLbExecutor,
xdsChannelLoadBalancingPolicy,
xdsChannelLoadBalancingPolicyConfig,
subscribeToUriGlobCollection,
xdsServerMetricsProvider,
new NoOpXdsClientOtelMetricsProvider(),
loadBalanceStreamException,
xdsInitialResourceVersionsEnabled,
disableDetectLiRawD2Client,
isLiRawD2Client,
xdsStreamMaxRetryBackoffSeconds,
xdsChannelKeepAliveTimeMins,
xdsMinimumJavaVersion,
actionOnPrecheckFailure,
d2CalleeInfoRecorder,
enableIndisDownstreamServicesFetcher,
indisDownstreamServicesFetchTimeout);
}

D2ClientConfig(String zkHosts,
String xdsServer,
String hostName,
long zkSessionTimeoutInMs,
long zkStartupTimeoutInMs,
long lbWaitTimeout,
TimeUnit lbWaitUnit,
String flagFile,
String basePath,
String fsBasePath,
String indisFsBasePath,
ComponentFactory componentFactory,
Map<String, TransportClientFactory> clientFactories,
LoadBalancerWithFacilitiesFactory lbWithFacilitiesFactory,
SSLContext sslContext,
SslContext grpcSslContext,
SSLParameters sslParameters,
boolean isSSLEnabled,
boolean shutdownAsynchronously,
boolean isSymlinkAware,
Map<String, Map<String, Object>> clientServicesConfig,
String d2ServicePath,
boolean useNewEphemeralStoreWatcher,
HealthCheckOperations healthCheckOperations,
ScheduledExecutorService executorService,
boolean retry,
boolean restRetryEnabled,
boolean streamRetryEnabled,
int retryLimit,
long retryUpdateIntervalMs,
int retryAggregatedIntervalNum,
boolean warmUp,
int warmUpTimeoutSeconds,
int indisWarmUpTimeoutSeconds,
int warmUpConcurrentRequests,
int indisWarmUpConcurrentRequests,
DownstreamServicesFetcher downstreamServicesFetcher,
DownstreamServicesFetcher indisDownstreamServicesFetcher,
boolean backupRequestsEnabled,
BackupRequestsStrategyStatsConsumer backupRequestsStrategyStatsConsumer,
long backupRequestsLatencyNotificationInterval,
TimeUnit backupRequestsLatencyNotificationIntervalUnit,
boolean enableBackupRequestsClientAsync,
ScheduledExecutorService backupRequestsExecutorService,
EventEmitter emitter,
PartitionAccessorRegistry partitionAccessorRegistry,
Function<ZooKeeper, ZooKeeper> zooKeeperDecorator,
boolean enableSaveUriDataOnDisk,
Map<String, LoadBalancerStrategyFactory<? extends LoadBalancerStrategy>> loadBalancerStrategyFactories,
boolean requestTimeoutHandlerEnabled,
SslSessionValidatorFactory sslSessionValidatorFactory,
ZKPersistentConnection zkConnection,
ScheduledExecutorService startUpExecutorService,
ScheduledExecutorService indisStartUpExecutorService,
JmxManager jmxManager,
String d2JmxManagerPrefix,
int zookeeperReadWindowMs,
boolean enableRelativeLoadBalancer,
DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider,
CanaryDistributionProvider canaryDistributionProvider,
boolean enableClusterFailout,
FailoutConfigProviderFactory failoutConfigProviderFactory,
FailoutRedirectStrategy failoutRedirectStrategy,
ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter,
DualReadStateManager dualReadStateManager,
ScheduledExecutorService xdsExecutorService,
Long xdsStreamReadyTimeout,
ExecutorService dualReadNewLbExecutor,
String xdsChannelLoadBalancingPolicy,
Map<String, ?> xdsChannelLoadBalancingPolicyConfig,
boolean subscribeToUriGlobCollection,
XdsServerMetricsProvider xdsServerMetricsProvider,
XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider,
boolean loadBalanceStreamException,
boolean xdsInitialResourceVersionsEnabled,
boolean disableDetectLiRawD2Client,
boolean isLiRawD2Client,
Integer xdsStreamMaxRetryBackoffSeconds,
Long xdsChannelKeepAliveTimeMins,
String xdsMinimumJavaVersion,
XdsClientValidator.ActionOnPrecheckFailure actionOnPrecheckFailure,
D2CalleeInfoRecorder d2CalleeInfoRecorder,
Boolean enableIndisDownstreamServicesFetcher,
Duration indisDownstreamServicesFetchTimeout)
{
this.zkHosts = zkHosts;
this.xdsServer = xdsServer;
Expand Down Expand Up @@ -367,6 +509,7 @@ public D2ClientConfig()
this.xdsChannelKeepAliveTimeMins = xdsChannelKeepAliveTimeMins;
this.subscribeToUriGlobCollection = subscribeToUriGlobCollection;
this._xdsServerMetricsProvider = xdsServerMetricsProvider;
this._xdsClientOtelMetricsProvider = xdsClientOtelMetricsProvider;
this.loadBalanceStreamException = loadBalanceStreamException;
this.xdsInitialResourceVersionsEnabled = xdsInitialResourceVersionsEnabled;
this.disableDetectLiRawD2Client = disableDetectLiRawD2Client;
Expand Down
5 changes: 5 additions & 0 deletions d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ public void registerXdsClientJmx(XdsClientJmx xdsClientJmx)
_log.warn("Setting XdsClientJmx for Non-XDS source type: {}", _discoverySourceType);
}
final String jmxName = String.format("%s-XdsClientJmx", getGlobalPrefix(null));

// Get the client name from global prefix
String clientName = getGlobalPrefix(null);
Comment on lines 308 to 309

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing i've noticed also is that getGlobalPrefix is also called on line 308. Can we just call that method one time and reuse it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

xdsClientJmx.setClientName(clientName);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it needed to set here? Do we need to set it anywhere else?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I set the client name because I want to use it in the XdsClientImpl file to record the server latency for this specific client. The clientName is one of the dimensions used in Otel for this metric.

_jmxManager.registerXdsClientJmxBean(jmxName, xdsClientJmx);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.linkedin.d2.jmx;

/**
* No-Op implementation of XdsClientOtelMetricsProvider.
* Used when OpenTelemetry metrics are disabled.
*/
public class NoOpXdsClientOtelMetricsProvider implements XdsClientOtelMetricsProvider {

@Override

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docstring {@inheritDoc} for all of these methods

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

public void recordConnectionLost(String clientName) {
// No-op
}

@Override
public void recordConnectionClosed(String clientName) {
// No-op
}

@Override
public void recordReconnection(String clientName) {
// No-op
}

@Override
public void recordRequestSent(String clientName) {
// No-op
}

@Override
public void recordResponseReceived(String clientName) {
// No-op
}

@Override
public void recordInitialResourceVersionSent(String clientName, int count) {
// No-op
}

@Override
public void recordResourceNotFound(String clientName) {
// No-op
}

@Override
public void recordResourceInvalid(String clientName) {
// No-op
}

@Override
public void recordServerLatency(String clientName, long latencyMs) {
// No-op
}

@Override
public void updateConnectionState(String clientName, boolean isConnected) {
// No-op
}

@Override
public void updateActiveInitialWaitTime(String clientName, long waitTimeMs) {
// No-op
}
}
38 changes: 35 additions & 3 deletions d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,39 @@ public class XdsClientJmx implements XdsClientJmxMBean
private final AtomicInteger _resourceNotFoundCount = new AtomicInteger();
private final AtomicInteger _resourceInvalidCount = new AtomicInteger();
private final XdsServerMetricsProvider _xdsServerMetricsProvider;
private final XdsClientOtelMetricsProvider _xdsClientOtelMetricsProvider;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary extra endline here. Additionally, we really should create a common util that exposes "-" in case future sensors will also need to reference it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed extra endline and created new file D2JmxConstants which expose "-".

private String _clientName = "-";

@Nullable private XdsClientImpl _xdsClient = null;

@Deprecated
public XdsClientJmx()
{
this(new NoOpXdsServerMetricsProvider());
this(new NoOpXdsServerMetricsProvider(), null);
}

public XdsClientJmx(XdsServerMetricsProvider xdsServerMetricsProvider)
{
this(xdsServerMetricsProvider, null);
}

public XdsClientJmx(XdsServerMetricsProvider xdsServerMetricsProvider,
XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider)
{
_xdsServerMetricsProvider = xdsServerMetricsProvider == null ?
new NoOpXdsServerMetricsProvider() : xdsServerMetricsProvider;
_xdsClientOtelMetricsProvider = xdsClientOtelMetricsProvider == null ?
new NoOpXdsClientOtelMetricsProvider() : xdsClientOtelMetricsProvider;
}

// Method to set client name (called from D2ClientJmxManager)
public void setClientName(String clientName) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Is there a reason for this to be able to be set multiple times for a single instance? If not, we should have it be a one-way set. i.e. via atomic string or something.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used atomic string.

_clientName = clientName;
}

public String getClientName() {
return _clientName;
}
Comment on lines 71 to 82

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing docstring

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.


public void setXdsClient(XdsClientImpl xdsClient)
Expand Down Expand Up @@ -146,55 +167,66 @@ public int isDisconnected()
@Override
public long getActiveInitialWaitTimeMillis()
{
long waitTime = -1;
if (_xdsClient != null)
{
return _xdsClient.getActiveInitialWaitTimeMillis();
waitTime = _xdsClient.getActiveInitialWaitTimeMillis();
_xdsClientOtelMetricsProvider.updateActiveInitialWaitTime(_clientName, waitTime);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically, can't all these usages of _clientName pass a null or empty _clientName if it's not set or improperly set?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

}
return -1;
return waitTime;
}

public void incrementConnectionLostCount()
{
_connectionLostCount.incrementAndGet();
_xdsClientOtelMetricsProvider.recordConnectionLost(_clientName);
}

public void incrementConnectionClosedCount()
{
_connectionClosedCount.incrementAndGet();
_xdsClientOtelMetricsProvider.recordConnectionClosed(_clientName);
}

public void incrementReconnectionCount()
{
_reconnectionCount.incrementAndGet();
_xdsClientOtelMetricsProvider.recordReconnection(_clientName);
}

public void incrementRequestSentCount()
{
_resquestSentCount.incrementAndGet();
_xdsClientOtelMetricsProvider.recordRequestSent(_clientName);
}

public void addToIrvSentCount(int delta)
{
_irvSentCount.addAndGet(delta);
_xdsClientOtelMetricsProvider.recordInitialResourceVersionSent(_clientName, delta);
}

public void incrementResponseReceivedCount()
{
_responseReceivedCount.incrementAndGet();
_xdsClientOtelMetricsProvider.recordResponseReceived(_clientName);
}

public void setIsConnected(boolean connected)
{
_isConnected.getAndSet(connected);
_xdsClientOtelMetricsProvider.updateConnectionState(_clientName, connected);
}

public void incrementResourceNotFoundCount()
{
_resourceNotFoundCount.incrementAndGet();
_xdsClientOtelMetricsProvider.recordResourceNotFound(_clientName);
}

public void incrementResourceInvalidCount()
{
_resourceInvalidCount.incrementAndGet();
_xdsClientOtelMetricsProvider.recordResourceInvalid(_clientName);
}
}
Loading