diff --git a/d2/src/main/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancer.java b/d2/src/main/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancer.java index debd3fbb95..3f94a02c71 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancer.java @@ -108,31 +108,41 @@ public class SimpleLoadBalancer implements LoadBalancer, HashRingProvider, Clien private final Random _random = new Random(); private final FailoutConfigProvider _failoutConfigProvider; private final ExecutorService _d2CallbackExecutorService; + private final Long _cacheExpiryInMS; + private final HashMap> _cachePerClusterPartition = new HashMap<>(); public SimpleLoadBalancer(LoadBalancerState state, ScheduledExecutorService executorService) { - this(state, new Stats(1000), new Stats(1000), 0, TimeUnit.SECONDS, executorService, null); + this(state, 0, TimeUnit.SECONDS, executorService, null); } public SimpleLoadBalancer(LoadBalancerState state, long timeout, TimeUnit unit, ScheduledExecutorService executor) { - this(state, new Stats(1000), new Stats(1000), timeout, unit, executor, null); + this(state, timeout, unit, executor, null); } public SimpleLoadBalancer(LoadBalancerState state, long timeout, TimeUnit unit, ScheduledExecutorService executor, - FailoutConfigProviderFactory failoutConfigProviderFactory) + FailoutConfigProviderFactory failoutConfigProviderFactory) { - this(state, new Stats(1000), new Stats(1000), timeout, unit, executor, failoutConfigProviderFactory); + this(state, timeout, unit, executor, failoutConfigProviderFactory, 0, TimeUnit.MILLISECONDS); } + public SimpleLoadBalancer(LoadBalancerState state, long timeout, TimeUnit unit, ScheduledExecutorService executor, + FailoutConfigProviderFactory failoutConfigProviderFactory, long stateUpdateCooldown, TimeUnit stateUpdateCooldownUnit) + { + this(state, new Stats(1000), new Stats(1000), timeout, unit, executor, failoutConfigProviderFactory, + stateUpdateCooldown, stateUpdateCooldownUnit); + } public SimpleLoadBalancer(LoadBalancerState state, - Stats serviceAvailableStats, - Stats serviceUnavailableStats, - long timeout, - TimeUnit unit, - ScheduledExecutorService executor, - FailoutConfigProviderFactory failoutConfigProviderFactory) + Stats serviceAvailableStats, + Stats serviceUnavailableStats, + long timeout, + TimeUnit unit, + ScheduledExecutorService executor, + FailoutConfigProviderFactory failoutConfigProviderFactory, + long clusterUpdateCooldown, + TimeUnit clusterUpdateCooldownUnit) { _state = state; _serviceUnavailableStats = serviceUnavailableStats; @@ -142,6 +152,7 @@ public SimpleLoadBalancer(LoadBalancerState state, _timeout = timeout; _unit = unit; _executor = executor; + _cacheExpiryInMS = clusterUpdateCooldownUnit.toMillis(clusterUpdateCooldown); if (failoutConfigProviderFactory != null) { _failoutConfigProvider = failoutConfigProviderFactory.create(state); @@ -1119,6 +1130,66 @@ private TrackerClient chooseTrackerClient(Request request, RequestContext reques // now try and find a tracker client for the uri TrackerClient trackerClient = null; URI targetHost = KeyMapper.TargetHostHints.getRequestContextTargetHost(requestContext); + int partitionId = getPartitionId(request, serviceName, clusterName, uris, targetHost); + + Map clientsToLoadBalance = null; + + for (LoadBalancerState.SchemeStrategyPair pair : orderedStrategies) + { + LoadBalancerStrategy strategy = pair.getStrategy(); + String scheme = pair.getScheme(); + String key = serviceName + "@" + clusterName + "@" + partitionId + "@" + scheme; + TrackerClientSubsetItem subsetItem = getPotentialClientsFromCache(key); + if (subsetItem == null) + { + subsetItem = getPotentialClients(serviceName, serviceProperties, cluster, + uriItem.getProperty(), scheme, partitionId, uriItem.getVersion()); + _cachePerClusterPartition.put(key, Pair.of(System.currentTimeMillis(), subsetItem)); + } + clientsToLoadBalance = subsetItem.getWeightedSubset(); + + trackerClient = + strategy.getTrackerClient(request, requestContext, uriItem.getVersion(), partitionId, clientsToLoadBalance, + subsetItem.shouldForceUpdate()); + + debug(_log, + "load balancer strategy for ", + serviceName, + " returned: ", + trackerClient); + + // break as soon as we find an available cluster client + if (trackerClient != null) + { + break; + } + } + + if (trackerClient == null) + { + if (clientsToLoadBalance == null || clientsToLoadBalance.isEmpty()) + { + String requestedSchemes = orderedStrategies.stream() + .map(LoadBalancerState.SchemeStrategyPair::getScheme).collect(Collectors.joining(",")); + + die(serviceName, "PEGA_1015. Service: " + serviceName + " unable to find a host to route the request" + + " in partition: " + partitionId + " cluster: " + clusterName + " scheme: [" + requestedSchemes + "]," + + " total hosts in cluster: " + uris.Uris().size() + "." + + " Check what cluster and scheme your servers are announcing to."); + } + else + { + die(serviceName, "PEGA_1016. Service: " + serviceName + " is in a bad state (high latency/high error). " + + "Dropping request. Cluster: " + clusterName + ", partitionId:" + partitionId + + " (choosable: " + clientsToLoadBalance.size() + " hosts, total in cluster: " + uris.Uris().size() + ")"); + } + } + + return trackerClient; + } + + private int getPartitionId(Request request, String serviceName, String clusterName, UriProperties uris, + URI targetHost) throws ServiceUnavailableException { int partitionId = -1; URI requestUri = request.getURI(); @@ -1169,56 +1240,20 @@ private TrackerClient chooseTrackerClient(Request request, RequestContext reques partitionId = iterator.next(); } } + return partitionId; + } - Map clientsToLoadBalance = null; - - for (LoadBalancerState.SchemeStrategyPair pair : orderedStrategies) + @Nullable + private TrackerClientSubsetItem getPotentialClientsFromCache(String key) + { + Pair p = _cachePerClusterPartition.get(key); + if (_cacheExpiryInMS == 0 || p == null || System.currentTimeMillis() > p.getLeft() + _cacheExpiryInMS) { - LoadBalancerStrategy strategy = pair.getStrategy(); - String scheme = pair.getScheme(); - - TrackerClientSubsetItem subsetItem = getPotentialClients(serviceName, serviceProperties, cluster, - uris, scheme, partitionId, uriItem.getVersion()); - clientsToLoadBalance = subsetItem.getWeightedSubset(); - - trackerClient = - strategy.getTrackerClient(request, requestContext, uriItem.getVersion(), partitionId, clientsToLoadBalance, - subsetItem.shouldForceUpdate()); - - debug(_log, - "load balancer strategy for ", - serviceName, - " returned: ", - trackerClient); - - // break as soon as we find an available cluster client - if (trackerClient != null) - { - break; - } - } - - if (trackerClient == null) + return null; + } else { - if (clientsToLoadBalance == null || clientsToLoadBalance.isEmpty()) - { - String requestedSchemes = orderedStrategies.stream() - .map(LoadBalancerState.SchemeStrategyPair::getScheme).collect(Collectors.joining(",")); - - die(serviceName, "PEGA_1015. Service: " + serviceName + " unable to find a host to route the request" - + " in partition: " + partitionId + " cluster: " + clusterName + " scheme: [" + requestedSchemes + "]," + - " total hosts in cluster: " + uris.Uris().size() + "." - + " Check what cluster and scheme your servers are announcing to."); - } - else - { - die(serviceName, "PEGA_1016. Service: " + serviceName + " is in a bad state (high latency/high error). " - + "Dropping request. Cluster: " + clusterName + ", partitionId:" + partitionId - + " (choosable: " + clientsToLoadBalance.size() + " hosts, total in cluster: " + uris.Uris().size() + ")"); - } + return p.getRight(); } - - return trackerClient; } private void die(String serviceName, String message) throws ServiceUnavailableException