diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java index 62c59ab8e4..88dba60f21 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java @@ -26,6 +26,8 @@ import com.linkedin.d2.balancer.clients.FailoutRedirectStrategy; import com.linkedin.d2.balancer.clients.DynamicClient; import com.linkedin.d2.balancer.clients.RequestTimeoutClient; +import com.linkedin.d2.balancer.util.D2CalleeInfoRecorder; +import java.time.Duration; import javax.annotation.Nonnull; import com.linkedin.d2.balancer.clients.RetryClient; import com.linkedin.d2.balancer.clusterfailout.FailoutConfigProviderFactory; @@ -242,7 +244,10 @@ public D2Client build() _config.xdsStreamMaxRetryBackoffSeconds, _config.xdsChannelKeepAliveTimeMins, _config.xdsMinimumJavaVersion, - _config.actionOnPrecheckFailure + _config.actionOnPrecheckFailure, + _config.d2CalleeInfoRecorder, + _config.enableIndisDownstreamServicesFetcher, + _config.indisDownstreamServicesFetchTimeout ); final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ? @@ -893,6 +898,22 @@ public D2ClientBuilder setDisableDetectLiRawD2Client(boolean disableDetectLiRawD return this; } + public D2ClientBuilder setD2CalleeInfoRecorder(D2CalleeInfoRecorder d2CalleeInfoRecorder) + { + _config.d2CalleeInfoRecorder = d2CalleeInfoRecorder; + return this; + } + + public D2ClientBuilder setIndisDownstreamServicesFetchTimeout(Duration indisDownstreamServicesFetchTimeout) { + _config.indisDownstreamServicesFetchTimeout = indisDownstreamServicesFetchTimeout; + return this; + } + + public D2ClientBuilder setEnableIndisDownstreamServicesFetcher(boolean enableIndisDownstreamServicesFetcher) { + _config.enableIndisDownstreamServicesFetcher = enableIndisDownstreamServicesFetcher; + return this; + } + private Map createDefaultTransportClientFactories() { final Map clientFactories = new HashMap<>(); diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java index 126980a3b4..d0ffa9e826 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java @@ -25,6 +25,7 @@ import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy; import com.linkedin.d2.balancer.strategies.LoadBalancerStrategyFactory; import com.linkedin.d2.balancer.subsetting.DeterministicSubsettingMetadataProvider; +import com.linkedin.d2.balancer.util.D2CalleeInfoRecorder; import com.linkedin.d2.balancer.util.canary.CanaryDistributionProvider; import com.linkedin.d2.balancer.util.WarmUpLoadBalancer; import com.linkedin.d2.balancer.util.downstreams.DownstreamServicesFetcher; @@ -45,6 +46,7 @@ import com.linkedin.d2.jmx.NoOpJmxManager; import com.linkedin.r2.transport.common.TransportClientFactory; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; +import java.time.Duration; import java.util.Collections; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -196,6 +198,13 @@ public class D2ClientConfig * apposed to created by standard LinkedIn d2 client factory in container library). */ public boolean isLiRawD2Client = false; + // Callee info recorder for d2 warmup + public D2CalleeInfoRecorder d2CalleeInfoRecorder = null; + /** + * Whether to enable fetching downstream services from INDIS for warmup. + */ + public boolean enableIndisDownstreamServicesFetcher = false; + public Duration indisDownstreamServicesFetchTimeout = Duration.ofSeconds(5); public D2ClientConfig() { @@ -280,7 +289,10 @@ public D2ClientConfig() Integer xdsStreamMaxRetryBackoffSeconds, Long xdsChannelKeepAliveTimeMins, String xdsMinimumJavaVersion, - XdsClientValidator.ActionOnPrecheckFailure actionOnPrecheckFailure) + XdsClientValidator.ActionOnPrecheckFailure actionOnPrecheckFailure, + D2CalleeInfoRecorder d2CalleeInfoRecorder, + Boolean enableIndisDownstreamServicesFetcher, + Duration indisDownstreamServicesFetchTimeout) { this.zkHosts = zkHosts; this.xdsServer = xdsServer; @@ -362,5 +374,8 @@ public D2ClientConfig() this.xdsStreamMaxRetryBackoffSeconds = xdsStreamMaxRetryBackoffSeconds; this.xdsMinimumJavaVersion = xdsMinimumJavaVersion; this.actionOnPrecheckFailure = actionOnPrecheckFailure; + this.d2CalleeInfoRecorder = d2CalleeInfoRecorder; + this.indisDownstreamServicesFetchTimeout = indisDownstreamServicesFetchTimeout; + this.enableIndisDownstreamServicesFetcher = enableIndisDownstreamServicesFetcher; } } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java index 55411a9292..fcaa9d2578 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java @@ -125,7 +125,7 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) { balancer = new WarmUpLoadBalancer(balancer, lastSeenLoadBalancer, config.startUpExecutorService, config.fsBasePath, config.d2ServicePath, config.downstreamServicesFetcher, config.warmUpTimeoutSeconds, - config.warmUpConcurrentRequests, config.dualReadStateManager, false); + config.warmUpConcurrentRequests, config.dualReadStateManager, false, config.d2CalleeInfoRecorder); } return balancer; diff --git a/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java index 472c2ea32d..332fd7d4aa 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java @@ -62,7 +62,7 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) { balancer = new WarmUpLoadBalancer(balancer, zkfsLoadBalancer, config.startUpExecutorService, config.fsBasePath, config.d2ServicePath, config.downstreamServicesFetcher, config.warmUpTimeoutSeconds, - config.warmUpConcurrentRequests, config.dualReadStateManager, false); + config.warmUpConcurrentRequests, config.dualReadStateManager, false, config.d2CalleeInfoRecorder); } return balancer; } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancer.java b/d2/src/main/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancer.java index debd3fbb95..12c187c505 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancer.java @@ -95,7 +95,7 @@ public class SimpleLoadBalancer implements LoadBalancer, HashRingProvider, Clien private static final String HOST_OVERRIDE_LIST = "HOST_OVERRIDE_LIST"; private static final Logger _log = LoggerFactory.getLogger(SimpleLoadBalancer.class); - private static final String D2_SCHEME_NAME = "d2"; + public static final String D2_SCHEME_NAME = "d2"; private final LoadBalancerState _state; private final Stats _serviceUnavailableStats; diff --git a/d2/src/main/java/com/linkedin/d2/balancer/util/D2CalleeInfoRecorder.java b/d2/src/main/java/com/linkedin/d2/balancer/util/D2CalleeInfoRecorder.java new file mode 100644 index 0000000000..62a792c668 --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/balancer/util/D2CalleeInfoRecorder.java @@ -0,0 +1,24 @@ +package com.linkedin.d2.balancer.util; + +import javax.annotation.Nonnull; + + +/** + * Records the callee service information and periodically sends out the recorded information to a persistence medium. + */ +public interface D2CalleeInfoRecorder { + /** + * Records a callee service name. + * @param serviceName the callee service name to record + */ + void record(String serviceName); + + @Nonnull + String getAppName(); + + @Nonnull + String getAppInstanceID(); + + @Nonnull + String getScope(); +} \ No newline at end of file diff --git a/d2/src/main/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancer.java b/d2/src/main/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancer.java index e51723c8db..b8a1d889e1 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancer.java @@ -18,6 +18,7 @@ import com.google.common.annotations.VisibleForTesting; import com.linkedin.common.callback.Callback; +import com.linkedin.common.callback.SuccessCallback; import com.linkedin.common.util.None; import com.linkedin.d2.balancer.LoadBalancerWithFacilities; import com.linkedin.d2.balancer.LoadBalancerWithFacilitiesDelegator; @@ -49,9 +50,13 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.linkedin.d2.balancer.simple.SimpleLoadBalancer.*; + + /** * The WarmUpLoadBalancer warms up the internal {@link SimpleLoadBalancer} services/cluster list * before the client is announced as "started". @@ -79,6 +84,7 @@ public class WarmUpLoadBalancer extends LoadBalancerWithFacilitiesDelegator { private final DualReadStateManager _dualReadStateManager; private final boolean _isIndis; // whether warming up for Indis (false means warming up for ZK) private final String _printName; // name of this warmup load balancer based on it's indis or not. + @Nullable private final D2CalleeInfoRecorder _d2CalleeInfoRecorder; private volatile boolean _shuttingDown = false; private long _allStartTime; private List _servicesToWarmUp = null; @@ -103,14 +109,32 @@ public WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService ser DownstreamServicesFetcher downstreamServicesFetcher, int warmUpTimeoutSeconds, int concurrentRequests, DualReadStateManager dualReadStateManager, boolean isIndis) { this(balancer, serviceWarmupper, executorService, d2FsDirPath, d2ServicePath, downstreamServicesFetcher, - warmUpTimeoutSeconds * 1000, concurrentRequests, dualReadStateManager, isIndis, null); + warmUpTimeoutSeconds, concurrentRequests, dualReadStateManager, isIndis, (D2CalleeInfoRecorder) null); } - @VisibleForTesting + public WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService serviceWarmupper, + ScheduledExecutorService executorService, String d2FsDirPath, String d2ServicePath, + DownstreamServicesFetcher downstreamServicesFetcher, int warmUpTimeoutSeconds, int concurrentRequests, + DualReadStateManager dualReadStateManager, boolean isIndis, D2CalleeInfoRecorder d2CalleeInfoRecorder) { + this(balancer, serviceWarmupper, executorService, d2FsDirPath, d2ServicePath, downstreamServicesFetcher, + warmUpTimeoutSeconds * 1000, concurrentRequests, dualReadStateManager, isIndis, null, + d2CalleeInfoRecorder); + } + + @VisibleForTesting WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService serviceWarmupper, ScheduledExecutorService executorService, String d2FsDirPath, String d2ServicePath, DownstreamServicesFetcher downstreamServicesFetcher, int warmUpTimeoutMillis, int concurrentRequests, - DualReadStateManager dualReadStateManager, boolean isIndis, Supplier timeSupplierForTest) + DualReadStateManager dualReadStateManager, boolean isIndis, Supplier timeSupplierForTest) { + this(balancer, serviceWarmupper, executorService, d2FsDirPath, d2ServicePath, downstreamServicesFetcher, + warmUpTimeoutMillis, concurrentRequests, dualReadStateManager, isIndis, timeSupplierForTest, null); + } + + private WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService serviceWarmupper, + ScheduledExecutorService executorService, String d2FsDirPath, String d2ServicePath, + DownstreamServicesFetcher downstreamServicesFetcher, int warmUpTimeoutMillis, int concurrentRequests, + DualReadStateManager dualReadStateManager, boolean isIndis, Supplier timeSupplierForTest, + @Nullable D2CalleeInfoRecorder d2CalleeInfoRecorder) { super(balancer); _serviceWarmupper = serviceWarmupper; @@ -129,6 +153,7 @@ public WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService ser { _timeSupplier = timeSupplierForTest; } + _d2CalleeInfoRecorder = d2CalleeInfoRecorder; } @Override @@ -176,7 +201,7 @@ private void prepareWarmUp(Callback callback) final AtomicBoolean hasTimedOut = new AtomicBoolean(false); try { - _downstreamServicesFetcher.getServiceNames(serviceNames -> { + SuccessCallback> serviceNamesCallback = serviceNames -> { // The downstreamServicesFetcher is the core group of the services that will be used during the lifecycle _usedServices.addAll(serviceNames); @@ -240,7 +265,13 @@ public void onSuccess(ServiceProperties result) { { callback.onSuccess(None.none()); } - }); + }; + if (_d2CalleeInfoRecorder != null) { + _downstreamServicesFetcher.getServiceNames(_d2CalleeInfoRecorder.getAppName(), + _d2CalleeInfoRecorder.getAppInstanceID(), _d2CalleeInfoRecorder.getScope(), serviceNamesCallback); + } else { + _downstreamServicesFetcher.getServiceNames(serviceNamesCallback); + } } catch (Exception e) { @@ -446,6 +477,10 @@ public TransportClient getClient(Request request, RequestContext requestContext) // the call fails, we still *intend* to use serviceName, so it should be in _usedServices. String serviceName = LoadBalancerUtil.getServiceNameFromUri(request.getURI()); _usedServices.add(serviceName); + if (_d2CalleeInfoRecorder != null && D2_SCHEME_NAME.equalsIgnoreCase(request.getURI().getScheme())) + { + _d2CalleeInfoRecorder.record(serviceName); + } return _loadBalancer.getClient(request, requestContext); } } diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java index 2c8a1fdf8f..4c77c81433 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java @@ -20,6 +20,8 @@ import com.linkedin.d2.balancer.LoadBalancerWithFacilities; import com.linkedin.d2.balancer.LoadBalancerWithFacilitiesFactory; import com.linkedin.d2.balancer.util.WarmUpLoadBalancer; +import com.linkedin.d2.balancer.util.downstreams.DownstreamServicesFetcher; +import com.linkedin.d2.balancer.util.downstreams.IndisBasedDownstreamServicesFetcher; import com.linkedin.d2.jmx.D2ClientJmxManager; import com.linkedin.d2.xds.Node; import com.linkedin.d2.xds.XdsChannelFactory; @@ -94,9 +96,13 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) if (config.warmUp) { + DownstreamServicesFetcher downstreamServicesFetcher = config.enableIndisDownstreamServicesFetcher + ? new IndisBasedDownstreamServicesFetcher(xdsClient, config.indisDownstreamServicesFetchTimeout, + config.xdsExecutorService, config.indisDownstreamServicesFetcher) + : config.indisDownstreamServicesFetcher; balancer = new WarmUpLoadBalancer(balancer, xdsLoadBalancer, config.indisStartUpExecutorService, config.indisFsBasePath, - config.d2ServicePath, config.indisDownstreamServicesFetcher, config.indisWarmUpTimeoutSeconds, - config.indisWarmUpConcurrentRequests, config.dualReadStateManager, true); + config.d2ServicePath, downstreamServicesFetcher, config.indisWarmUpConcurrentRequests, config.indisWarmUpTimeoutSeconds, + config.dualReadStateManager, true, config.d2CalleeInfoRecorder); } return balancer;