Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.linkedin.d2.balancer.clients.FailoutRedirectStrategy;
import com.linkedin.d2.balancer.clients.DynamicClient;
import com.linkedin.d2.balancer.clients.RequestTimeoutClient;
import com.linkedin.d2.balancer.util.D2CalleeInfoRecorder;
import java.time.Duration;
import javax.annotation.Nonnull;
import com.linkedin.d2.balancer.clients.RetryClient;
import com.linkedin.d2.balancer.clusterfailout.FailoutConfigProviderFactory;
Expand Down Expand Up @@ -242,7 +244,10 @@ public D2Client build()
_config.xdsStreamMaxRetryBackoffSeconds,
_config.xdsChannelKeepAliveTimeMins,
_config.xdsMinimumJavaVersion,
_config.actionOnPrecheckFailure
_config.actionOnPrecheckFailure,
_config.d2CalleeInfoRecorder,
_config.enableIndisDownstreamServicesFetcher,
_config.indisDownstreamServicesFetchTimeout
);

final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ?
Expand Down Expand Up @@ -893,6 +898,22 @@ public D2ClientBuilder setDisableDetectLiRawD2Client(boolean disableDetectLiRawD
return this;
}

public D2ClientBuilder setD2CalleeInfoRecorder(D2CalleeInfoRecorder d2CalleeInfoRecorder)
{
_config.d2CalleeInfoRecorder = d2CalleeInfoRecorder;
return this;
}

public D2ClientBuilder setIndisDownstreamServicesFetchTimeout(Duration indisDownstreamServicesFetchTimeout) {
_config.indisDownstreamServicesFetchTimeout = indisDownstreamServicesFetchTimeout;
return this;
}

public D2ClientBuilder setEnableIndisDownstreamServicesFetcher(boolean enableIndisDownstreamServicesFetcher) {
_config.enableIndisDownstreamServicesFetcher = enableIndisDownstreamServicesFetcher;
return this;
}

private Map<String, TransportClientFactory> createDefaultTransportClientFactories()
{
final Map<String, TransportClientFactory> clientFactories = new HashMap<>();
Expand Down
17 changes: 16 additions & 1 deletion d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy;
import com.linkedin.d2.balancer.strategies.LoadBalancerStrategyFactory;
import com.linkedin.d2.balancer.subsetting.DeterministicSubsettingMetadataProvider;
import com.linkedin.d2.balancer.util.D2CalleeInfoRecorder;
import com.linkedin.d2.balancer.util.canary.CanaryDistributionProvider;
import com.linkedin.d2.balancer.util.WarmUpLoadBalancer;
import com.linkedin.d2.balancer.util.downstreams.DownstreamServicesFetcher;
Expand All @@ -45,6 +46,7 @@
import com.linkedin.d2.jmx.NoOpJmxManager;
import com.linkedin.r2.transport.common.TransportClientFactory;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -196,6 +198,13 @@ public class D2ClientConfig
* apposed to created by standard LinkedIn d2 client factory in container library).
*/
public boolean isLiRawD2Client = false;
// Callee info recorder for d2 warmup
public D2CalleeInfoRecorder d2CalleeInfoRecorder = null;
/**
* Whether to enable fetching downstream services from INDIS for warmup.
*/
public boolean enableIndisDownstreamServicesFetcher = false;
public Duration indisDownstreamServicesFetchTimeout = Duration.ofSeconds(5);

public D2ClientConfig()
{
Expand Down Expand Up @@ -280,7 +289,10 @@ public D2ClientConfig()
Integer xdsStreamMaxRetryBackoffSeconds,
Long xdsChannelKeepAliveTimeMins,
String xdsMinimumJavaVersion,
XdsClientValidator.ActionOnPrecheckFailure actionOnPrecheckFailure)
XdsClientValidator.ActionOnPrecheckFailure actionOnPrecheckFailure,
D2CalleeInfoRecorder d2CalleeInfoRecorder,
Boolean enableIndisDownstreamServicesFetcher,
Duration indisDownstreamServicesFetchTimeout)
{
this.zkHosts = zkHosts;
this.xdsServer = xdsServer;
Expand Down Expand Up @@ -362,5 +374,8 @@ public D2ClientConfig()
this.xdsStreamMaxRetryBackoffSeconds = xdsStreamMaxRetryBackoffSeconds;
this.xdsMinimumJavaVersion = xdsMinimumJavaVersion;
this.actionOnPrecheckFailure = actionOnPrecheckFailure;
this.d2CalleeInfoRecorder = d2CalleeInfoRecorder;
this.indisDownstreamServicesFetchTimeout = indisDownstreamServicesFetchTimeout;
this.enableIndisDownstreamServicesFetcher = enableIndisDownstreamServicesFetcher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public LoadBalancerWithFacilities create(D2ClientConfig config)
{
balancer = new WarmUpLoadBalancer(balancer, lastSeenLoadBalancer, config.startUpExecutorService, config.fsBasePath,
config.d2ServicePath, config.downstreamServicesFetcher, config.warmUpTimeoutSeconds,
config.warmUpConcurrentRequests, config.dualReadStateManager, false);
config.warmUpConcurrentRequests, config.dualReadStateManager, false, config.d2CalleeInfoRecorder);
}

return balancer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public LoadBalancerWithFacilities create(D2ClientConfig config)
{
balancer = new WarmUpLoadBalancer(balancer, zkfsLoadBalancer, config.startUpExecutorService, config.fsBasePath,
config.d2ServicePath, config.downstreamServicesFetcher, config.warmUpTimeoutSeconds,
config.warmUpConcurrentRequests, config.dualReadStateManager, false);
config.warmUpConcurrentRequests, config.dualReadStateManager, false, config.d2CalleeInfoRecorder);
}
return balancer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public class SimpleLoadBalancer implements LoadBalancer, HashRingProvider, Clien
private static final String HOST_OVERRIDE_LIST = "HOST_OVERRIDE_LIST";
private static final Logger _log =
LoggerFactory.getLogger(SimpleLoadBalancer.class);
private static final String D2_SCHEME_NAME = "d2";
public static final String D2_SCHEME_NAME = "d2";

private final LoadBalancerState _state;
private final Stats _serviceUnavailableStats;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.linkedin.d2.balancer.util;

import javax.annotation.Nonnull;


/**
* Records the callee service information and periodically sends out the recorded information to a persistence medium.
*/
public interface D2CalleeInfoRecorder {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface should've been added here and re-used in grpc infra. However, now I'll add an adapter internally to re-use that one for its implementation.

/**
* Records a callee service name.
* @param serviceName the callee service name to record
*/
void record(String serviceName);

@Nonnull
String getAppName();

@Nonnull
String getAppInstanceID();

@Nonnull
String getScope();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.SuccessCallback;
import com.linkedin.common.util.None;
import com.linkedin.d2.balancer.LoadBalancerWithFacilities;
import com.linkedin.d2.balancer.LoadBalancerWithFacilitiesDelegator;
Expand Down Expand Up @@ -49,9 +50,13 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.d2.balancer.simple.SimpleLoadBalancer.*;


/**
* The WarmUpLoadBalancer warms up the internal {@link SimpleLoadBalancer} services/cluster list
* before the client is announced as "started".
Expand Down Expand Up @@ -79,6 +84,7 @@ public class WarmUpLoadBalancer extends LoadBalancerWithFacilitiesDelegator {
private final DualReadStateManager _dualReadStateManager;
private final boolean _isIndis; // whether warming up for Indis (false means warming up for ZK)
private final String _printName; // name of this warmup load balancer based on it's indis or not.
@Nullable private final D2CalleeInfoRecorder _d2CalleeInfoRecorder;
private volatile boolean _shuttingDown = false;
private long _allStartTime;
private List<String> _servicesToWarmUp = null;
Expand All @@ -103,14 +109,32 @@ public WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService ser
DownstreamServicesFetcher downstreamServicesFetcher, int warmUpTimeoutSeconds, int concurrentRequests,
DualReadStateManager dualReadStateManager, boolean isIndis) {
this(balancer, serviceWarmupper, executorService, d2FsDirPath, d2ServicePath, downstreamServicesFetcher,
warmUpTimeoutSeconds * 1000, concurrentRequests, dualReadStateManager, isIndis, null);
warmUpTimeoutSeconds, concurrentRequests, dualReadStateManager, isIndis, (D2CalleeInfoRecorder) null);
}

@VisibleForTesting
public WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService serviceWarmupper,
ScheduledExecutorService executorService, String d2FsDirPath, String d2ServicePath,
DownstreamServicesFetcher downstreamServicesFetcher, int warmUpTimeoutSeconds, int concurrentRequests,
DualReadStateManager dualReadStateManager, boolean isIndis, D2CalleeInfoRecorder d2CalleeInfoRecorder) {
this(balancer, serviceWarmupper, executorService, d2FsDirPath, d2ServicePath, downstreamServicesFetcher,
warmUpTimeoutSeconds * 1000, concurrentRequests, dualReadStateManager, isIndis, null,
d2CalleeInfoRecorder);
}

@VisibleForTesting
WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService serviceWarmupper,
ScheduledExecutorService executorService, String d2FsDirPath, String d2ServicePath,
DownstreamServicesFetcher downstreamServicesFetcher, int warmUpTimeoutMillis, int concurrentRequests,
DualReadStateManager dualReadStateManager, boolean isIndis, Supplier<Long> timeSupplierForTest)
DualReadStateManager dualReadStateManager, boolean isIndis, Supplier<Long> timeSupplierForTest) {
this(balancer, serviceWarmupper, executorService, d2FsDirPath, d2ServicePath, downstreamServicesFetcher,
warmUpTimeoutMillis, concurrentRequests, dualReadStateManager, isIndis, timeSupplierForTest, null);
}

private WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService serviceWarmupper,
ScheduledExecutorService executorService, String d2FsDirPath, String d2ServicePath,
DownstreamServicesFetcher downstreamServicesFetcher, int warmUpTimeoutMillis, int concurrentRequests,
DualReadStateManager dualReadStateManager, boolean isIndis, Supplier<Long> timeSupplierForTest,
@Nullable D2CalleeInfoRecorder d2CalleeInfoRecorder)
{
super(balancer);
_serviceWarmupper = serviceWarmupper;
Expand All @@ -129,6 +153,7 @@ public WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService ser
{
_timeSupplier = timeSupplierForTest;
}
_d2CalleeInfoRecorder = d2CalleeInfoRecorder;
}

@Override
Expand Down Expand Up @@ -176,7 +201,7 @@ private void prepareWarmUp(Callback<None> callback)
final AtomicBoolean hasTimedOut = new AtomicBoolean(false);

try {
_downstreamServicesFetcher.getServiceNames(serviceNames -> {
SuccessCallback<List<String>> serviceNamesCallback = serviceNames -> {
// The downstreamServicesFetcher is the core group of the services that will be used during the lifecycle
_usedServices.addAll(serviceNames);

Expand Down Expand Up @@ -240,7 +265,13 @@ public void onSuccess(ServiceProperties result) {
{
callback.onSuccess(None.none());
}
});
};
if (_d2CalleeInfoRecorder != null) {
_downstreamServicesFetcher.getServiceNames(_d2CalleeInfoRecorder.getAppName(),
_d2CalleeInfoRecorder.getAppInstanceID(), _d2CalleeInfoRecorder.getScope(), serviceNamesCallback);
} else {
_downstreamServicesFetcher.getServiceNames(serviceNamesCallback);
}
}
catch (Exception e)
{
Expand Down Expand Up @@ -446,6 +477,10 @@ public TransportClient getClient(Request request, RequestContext requestContext)
// the call fails, we still *intend* to use serviceName, so it should be in _usedServices.
String serviceName = LoadBalancerUtil.getServiceNameFromUri(request.getURI());
_usedServices.add(serviceName);
if (_d2CalleeInfoRecorder != null && D2_SCHEME_NAME.equalsIgnoreCase(request.getURI().getScheme()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, what is the reason for checking the scheme?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this decorator LB is only ever used for d2:// requests. Currently, the recording of warmup service names doesn't happen here. Those are fetched from the d2 data stored on disk, which happens asynchronously via the property buses. I just wanna be sure that we don't record any unneeded information here.

{
_d2CalleeInfoRecorder.record(serviceName);
}
return _loadBalancer.getClient(request, requestContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.linkedin.d2.balancer.LoadBalancerWithFacilities;
import com.linkedin.d2.balancer.LoadBalancerWithFacilitiesFactory;
import com.linkedin.d2.balancer.util.WarmUpLoadBalancer;
import com.linkedin.d2.balancer.util.downstreams.DownstreamServicesFetcher;
import com.linkedin.d2.balancer.util.downstreams.IndisBasedDownstreamServicesFetcher;
import com.linkedin.d2.jmx.D2ClientJmxManager;
import com.linkedin.d2.xds.Node;
import com.linkedin.d2.xds.XdsChannelFactory;
Expand Down Expand Up @@ -94,9 +96,13 @@ public LoadBalancerWithFacilities create(D2ClientConfig config)

if (config.warmUp)
{
DownstreamServicesFetcher downstreamServicesFetcher = config.enableIndisDownstreamServicesFetcher
? new IndisBasedDownstreamServicesFetcher(xdsClient, config.indisDownstreamServicesFetchTimeout,
config.xdsExecutorService, config.indisDownstreamServicesFetcher)
: config.indisDownstreamServicesFetcher;
balancer = new WarmUpLoadBalancer(balancer, xdsLoadBalancer, config.indisStartUpExecutorService, config.indisFsBasePath,
config.d2ServicePath, config.indisDownstreamServicesFetcher, config.indisWarmUpTimeoutSeconds,
config.indisWarmUpConcurrentRequests, config.dualReadStateManager, true);
config.d2ServicePath, downstreamServicesFetcher, config.indisWarmUpConcurrentRequests, config.indisWarmUpTimeoutSeconds,
config.dualReadStateManager, true, config.d2CalleeInfoRecorder);
}

return balancer;
Expand Down