diff --git a/d2/src/main/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerStrategyV3.java b/d2/src/main/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerStrategyV3.java index a53a32ab11..3afc0e6d19 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerStrategyV3.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerStrategyV3.java @@ -46,6 +46,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -65,7 +66,7 @@ /** * Implementation of {@link LoadBalancerStrategy} with additional supports partitioning of services whereas - * the the prior implementations do not. + * the prior implementations do not. * * @author David Hoa (dhoa@linkedin.com) * @author Oby Sumampouw (osumampouw@linkedin.com) @@ -116,26 +117,6 @@ public String getName() return DEGRADER_STRATEGY_NAME; } - private List castToDegraderTrackerClients(Map trackerClients) - { - List degraderTrackerClients = new ArrayList<>(trackerClients.size()); - - for (TrackerClient trackerClient: trackerClients.values()) - { - if (trackerClient instanceof DegraderTrackerClient) - { - degraderTrackerClients.add((DegraderTrackerClient) trackerClient); - } - else - { - warn(_log, - "Client passed to DegraderV3 not an instance of DegraderTrackerClient, will not load balance to it.", - trackerClient); - } - } - - return degraderTrackerClients; - } @Override public TrackerClient getTrackerClient(Request request, @@ -165,17 +146,19 @@ public TrackerClient getTrackerClient(Request request, if (trackerClients == null || trackerClients.size() == 0) { - warn(_log, - "getTrackerClient called with null/empty trackerClients, so returning null"); - + warn(_log, "getTrackerClient called with null/empty trackerClients, so returning null"); return null; } - List degraderTrackerClients = castToDegraderTrackerClients(trackerClients); + Collection trackerClientList = trackerClients.values(); + if (!(trackerClientList.iterator().next() instanceof DegraderTrackerClient)) + { + trackerClients = Collections.emptyMap(); + } // only one thread will be allowed to enter updatePartitionState for any partition TimingContextUtil.markTiming(requestContext, TIMING_KEY); - checkUpdatePartitionState(clusterGenerationId, partitionId, degraderTrackerClients, shouldForceUpdate); + checkUpdatePartitionState(clusterGenerationId, partitionId, trackerClients, shouldForceUpdate); TimingContextUtil.markTiming(requestContext, TIMING_KEY); Ring ring = _state.getRing(partitionId); @@ -184,19 +167,19 @@ public TrackerClient getTrackerClient(Request request, Set excludedUris = ExcludedHostHints.getRequestContextExcludedHosts(requestContext); if (excludedUris == null) { - excludedUris = new HashSet<>(); + excludedUris = Collections.emptySet(); } //no valid target host header was found in the request DegraderTrackerClient client; if (targetHostUri == null) { - client = findValidClientFromRing(request, ring, degraderTrackerClients, excludedUris, requestContext); + client = (DegraderTrackerClient) findValidClientFromRing(request, ring, trackerClients, excludedUris, requestContext); } else { debug(_log, "Degrader honoring target host header in request, skipping hashing. URI: ", targetHostUri); - client = searchClientFromUri(targetHostUri, degraderTrackerClients); + client = searchClientFromUri(targetHostUri, trackerClients); if (client == null) { warn(_log, "No client found for ", targetHostUri, ". Target host specified is no longer part of cluster"); @@ -237,7 +220,8 @@ public TrackerClient getTrackerClient(Request request, return client; } - private DegraderTrackerClient findValidClientFromRing(Request request, Ring ring, List trackerClients, Set excludedUris, RequestContext requestContext) + private TrackerClient findValidClientFromRing(Request request, Ring ring, Map trackerClients, + Set excludedUris, RequestContext requestContext) { // Compute the hash code int hashCode = _hashFunction.hash(request); @@ -247,17 +231,10 @@ private DegraderTrackerClient findValidClientFromRing(Request request, Ring warn(_log, "Can not find hash ring to use"); } - Map trackerClientMap = new HashMap<>(trackerClients.size()); - - for (DegraderTrackerClient trackerClient : trackerClients) - { - trackerClientMap.put(trackerClient.getUri(), trackerClient); - } - // we operate only on URIs to ensure that we never hold on to an old tracker client // that the cluster manager has removed URI mostWantedURI = ring.get(hashCode); - DegraderTrackerClient client = trackerClientMap.get(mostWantedURI); + TrackerClient client = trackerClients.get(mostWantedURI); if (client != null && !excludedUris.contains(mostWantedURI)) { @@ -280,7 +257,7 @@ private DegraderTrackerClient findValidClientFromRing(Request request, Ring while (iterator.hasNext()) { targetHostUri = iterator.next(); - client = trackerClientMap.get(targetHostUri); + client = trackerClients.get(targetHostUri); if (targetHostUri != mostWantedURI && !excludedUris.contains(targetHostUri) && client != null) { @@ -318,7 +295,7 @@ else if (excludedUris.contains(targetHostUri)) * @param shouldForceUpdate */ private void checkUpdatePartitionState(long clusterGenerationId, int partitionId, - List trackerClients, boolean shouldForceUpdate) + Map trackerClients, boolean shouldForceUpdate) { DegraderLoadBalancerStrategyConfig config = getConfig(); final Partition partition = _state.getPartition(partitionId); @@ -387,24 +364,20 @@ else if(shouldUpdatePartition(clusterGenerationId, partition.getState(), config, } } - private DegraderTrackerClient searchClientFromUri(URI uri, List trackerClients) + private DegraderTrackerClient searchClientFromUri(URI uri, Map trackerClients) { - for (DegraderTrackerClient trackerClient : trackerClients) { - if (trackerClient.getUri().equals(uri)) { - return trackerClient; - } - } - return null; + return (DegraderTrackerClient) trackerClients.get(uri); } - private void updatePartitionState(long clusterGenerationId, Partition partition, List trackerClients, DegraderLoadBalancerStrategyConfig config) + private void updatePartitionState(long clusterGenerationId, Partition partition, + Map trackerClients, DegraderLoadBalancerStrategyConfig config) { PartitionDegraderLoadBalancerState partitionState = partition.getState(); List clientUpdaters = new ArrayList<>(); - for (DegraderTrackerClient client: trackerClients) + for (TrackerClient client: trackerClients.values()) { - clientUpdaters.add(new DegraderTrackerClientUpdater(client, partition.getId())); + clientUpdaters.add(new DegraderTrackerClientUpdater((DegraderTrackerClient) client, partition.getId())); } boolean quarantineEnabled = _state.isQuarantineEnabled(); @@ -1050,6 +1023,18 @@ public static void overrideMinCallCount(int partitionId, double newOverrideDropR */ protected static boolean shouldUpdatePartition(long clusterGenerationId, PartitionDegraderLoadBalancerState partitionState, DegraderLoadBalancerStrategyConfig config, boolean updateEnabled, boolean shouldForceUpdate, List trackerClients) + { + Map trackerClientMap = new HashMap<>(trackerClients.size()); + + for (TrackerClient trackerClient : trackerClients) + { + trackerClientMap.put(trackerClient.getUri(), trackerClient); + } + return shouldUpdatePartition(clusterGenerationId, partitionState, config, updateEnabled, shouldForceUpdate, trackerClientMap); + } + + protected static boolean shouldUpdatePartition(long clusterGenerationId, PartitionDegraderLoadBalancerState partitionState, + DegraderLoadBalancerStrategyConfig config, boolean updateEnabled, boolean shouldForceUpdate, Map trackerClients) { boolean trackerClientInconsistency = trackerClients.size() != partitionState.getPointsMap().size(); return updateEnabled @@ -1115,7 +1100,7 @@ public Ring getRing(long clusterGenerationId, int partitionId, Map(_config).createRing(Collections.emptyMap(), Collections.emptyMap()); } - checkUpdatePartitionState(clusterGenerationId, partitionId, castToDegraderTrackerClients(trackerClients), shouldForceUpdate); + checkUpdatePartitionState(clusterGenerationId, partitionId, trackerClients, shouldForceUpdate); return _state.getRing(partitionId); } diff --git a/d2/src/test/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerTest.java b/d2/src/test/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerTest.java index 64c84f6e04..d071339992 100644 --- a/d2/src/test/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerTest.java +++ b/d2/src/test/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerTest.java @@ -57,6 +57,8 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -77,6 +79,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nonnull; +import javax.sound.midi.Track; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -1560,6 +1563,13 @@ public void TestGetTrackerClients() assertTrue(i == numberOfClients); } + private static Map createTrackerClientMap(List uris) throws URISyntaxException{ + Map clients = new HashMap<>(); + for (URI uri : uris) { + clients.put(uri, getClient(uri)); + } + return clients; + } @Test(groups = { "small", "back-end" }) public void testshouldUpdatePartition() throws URISyntaxException @@ -1570,17 +1580,16 @@ public void testshouldUpdatePartition() throws URISyntaxException myConfig.put(PropertyKeys.HTTP_LB_STRATEGY_PROPERTIES_UPDATE_INTERVAL_MS, 5000L); myConfig.put(PropertyKeys.HTTP_LB_STRATEGY_PROPERTIES_MAX_CLUSTER_LATENCY_WITHOUT_DEGRADING, 100d); DegraderLoadBalancerStrategyV3 strategy = getStrategy(myConfig); - List clients = new ArrayList<>(); Map pointsMap = new HashMap<>(); long clusterCallCount = 15; RingFactory ringFactory = new DelegatingRingFactory<>(new DegraderLoadBalancerStrategyConfig(1L)); - URI uri1 = URI.create("http://test.linkedin.com:3242/fdsaf"); URI uri2 = URI.create("http://test.linkedin.com:3243/fdsaf"); - clients.add(getClient(uri1)); - clients.add(getClient(uri2)); - pointsMap.put(uri1, 1); - pointsMap.put(uri2, 1); + List uris = Arrays.asList(uri1, uri2); + + Map clients = createTrackerClientMap(uris); + pointsMap.put(uris.get(0), 1); + pointsMap.put(uris.get(1), 1); // state is default initialized, new cluster generation assertTrue(DegraderLoadBalancerStrategyV3.shouldUpdatePartition(0, @@ -1695,17 +1704,17 @@ public void testshouldUpdatePartitionOnlyAtInterval() throws URISyntaxException myConfig.put(PropertyKeys.HTTP_LB_STRATEGY_PROPERTIES_MAX_CLUSTER_LATENCY_WITHOUT_DEGRADING, 100d); myConfig.put(PropertyKeys.HTTP_LB_STRATEGY_PROPERTIES_UPDATE_ONLY_AT_INTERVAL, true); DegraderLoadBalancerStrategyV3 strategy = getStrategy(myConfig); - List clients = new ArrayList<>(); Map pointsMap = new HashMap<>(); long clusterCallCount = 15; RingFactory ringFactory = new DelegatingRingFactory<>(new DegraderLoadBalancerStrategyConfig(1L)); URI uri1 = URI.create("http://test.linkedin.com:3242/fdsaf"); URI uri2 = URI.create("http://test.linkedin.com:3243/fdsaf"); - clients.add(getClient(uri1)); - clients.add(getClient(uri2)); - pointsMap.put(uri1, 1); - pointsMap.put(uri2, 1); + List uris = Arrays.asList(uri1, uri2); + + Map clients = createTrackerClientMap(uris); + pointsMap.put(uris.get(0), 1); + pointsMap.put(uris.get(1), 1); PartitionDegraderLoadBalancerState current = strategy.getState().getPartitionState(DEFAULT_PARTITION_ID);