Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -65,7 +66,7 @@

/**
* Implementation of {@link LoadBalancerStrategy} with additional supports partitioning of services whereas
* the the prior implementations do not.
* the prior implementations do not.
*
* @author David Hoa ([email protected])
* @author Oby Sumampouw ([email protected])
Expand Down Expand Up @@ -116,26 +117,6 @@ public String getName()
return DEGRADER_STRATEGY_NAME;
}

private List<DegraderTrackerClient> castToDegraderTrackerClients(Map<URI, TrackerClient> trackerClients)
{
List<DegraderTrackerClient> degraderTrackerClients = new ArrayList<>(trackerClients.size());
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory allocation followed by O(N) operations.


for (TrackerClient trackerClient: trackerClients.values())
{
if (trackerClient instanceof DegraderTrackerClient)
{
degraderTrackerClients.add((DegraderTrackerClient) trackerClient);
}
else
{
warn(_log,
"Client passed to DegraderV3 not an instance of DegraderTrackerClient, will not load balance to it.",
trackerClient);
}
}

return degraderTrackerClients;
}

@Override
public TrackerClient getTrackerClient(Request request,
Expand Down Expand Up @@ -165,17 +146,19 @@ public TrackerClient getTrackerClient(Request request,

if (trackerClients == null || trackerClients.size() == 0)
{
warn(_log,
"getTrackerClient called with null/empty trackerClients, so returning null");

warn(_log, "getTrackerClient called with null/empty trackerClients, so returning null");
return null;
}

List<DegraderTrackerClient> degraderTrackerClients = castToDegraderTrackerClients(trackerClients);
Collection<TrackerClient> trackerClientList = trackerClients.values();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

castToDegraderTrackerClients return an empty list if trackerClients cannot be cast to DegradedTrackerClient. This is to mimic that behavior.

if (!(trackerClientList.iterator().next() instanceof DegraderTrackerClient))
{
trackerClients = Collections.emptyMap();
}

// only one thread will be allowed to enter updatePartitionState for any partition
TimingContextUtil.markTiming(requestContext, TIMING_KEY);
checkUpdatePartitionState(clusterGenerationId, partitionId, degraderTrackerClients, shouldForceUpdate);
checkUpdatePartitionState(clusterGenerationId, partitionId, trackerClients, shouldForceUpdate);
TimingContextUtil.markTiming(requestContext, TIMING_KEY);

Ring<URI> ring = _state.getRing(partitionId);
Expand All @@ -184,19 +167,19 @@ public TrackerClient getTrackerClient(Request request,
Set<URI> excludedUris = ExcludedHostHints.getRequestContextExcludedHosts(requestContext);
if (excludedUris == null)
{
excludedUris = new HashSet<>();
excludedUris = Collections.emptySet();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid another unnecessary memory allocation.

}

//no valid target host header was found in the request
DegraderTrackerClient client;
if (targetHostUri == null)
{
client = findValidClientFromRing(request, ring, degraderTrackerClients, excludedUris, requestContext);
client = (DegraderTrackerClient) findValidClientFromRing(request, ring, trackerClients, excludedUris, requestContext);
}
else
{
debug(_log, "Degrader honoring target host header in request, skipping hashing. URI: ", targetHostUri);
client = searchClientFromUri(targetHostUri, degraderTrackerClients);
client = searchClientFromUri(targetHostUri, trackerClients);
if (client == null)
{
warn(_log, "No client found for ", targetHostUri, ". Target host specified is no longer part of cluster");
Expand Down Expand Up @@ -237,7 +220,8 @@ public TrackerClient getTrackerClient(Request request,
return client;
}

private DegraderTrackerClient findValidClientFromRing(Request request, Ring<URI> ring, List<DegraderTrackerClient> trackerClients, Set<URI> excludedUris, RequestContext requestContext)
private TrackerClient findValidClientFromRing(Request request, Ring<URI> ring, Map<URI, TrackerClient> trackerClients,
Set<URI> excludedUris, RequestContext requestContext)
{
// Compute the hash code
int hashCode = _hashFunction.hash(request);
Expand All @@ -247,17 +231,10 @@ private DegraderTrackerClient findValidClientFromRing(Request request, Ring<URI>
warn(_log, "Can not find hash ring to use");
}

Map<URI, DegraderTrackerClient> trackerClientMap = new HashMap<>(trackerClients.size());
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another memory allocation followed by O(N) complexity.


for (DegraderTrackerClient trackerClient : trackerClients)
{
trackerClientMap.put(trackerClient.getUri(), trackerClient);
}

// we operate only on URIs to ensure that we never hold on to an old tracker client
// that the cluster manager has removed
URI mostWantedURI = ring.get(hashCode);
DegraderTrackerClient client = trackerClientMap.get(mostWantedURI);
TrackerClient client = trackerClients.get(mostWantedURI);

if (client != null && !excludedUris.contains(mostWantedURI))
{
Expand All @@ -280,7 +257,7 @@ private DegraderTrackerClient findValidClientFromRing(Request request, Ring<URI>
while (iterator.hasNext())
{
targetHostUri = iterator.next();
client = trackerClientMap.get(targetHostUri);
client = trackerClients.get(targetHostUri);

if (targetHostUri != mostWantedURI && !excludedUris.contains(targetHostUri) && client != null)
{
Expand Down Expand Up @@ -318,7 +295,7 @@ else if (excludedUris.contains(targetHostUri))
* @param shouldForceUpdate
*/
private void checkUpdatePartitionState(long clusterGenerationId, int partitionId,
List<DegraderTrackerClient> trackerClients, boolean shouldForceUpdate)
Map<URI, TrackerClient> trackerClients, boolean shouldForceUpdate)
{
DegraderLoadBalancerStrategyConfig config = getConfig();
final Partition partition = _state.getPartition(partitionId);
Expand Down Expand Up @@ -387,24 +364,20 @@ else if(shouldUpdatePartition(clusterGenerationId, partition.getState(), config,
}
}

private DegraderTrackerClient searchClientFromUri(URI uri, List<DegraderTrackerClient> trackerClients)
private DegraderTrackerClient searchClientFromUri(URI uri, Map<URI, TrackerClient> trackerClients)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it is a map, we do the search in O(1) time rather than O(N).

{
for (DegraderTrackerClient trackerClient : trackerClients) {
if (trackerClient.getUri().equals(uri)) {
return trackerClient;
}
}
return null;
return (DegraderTrackerClient) trackerClients.get(uri);
}

private void updatePartitionState(long clusterGenerationId, Partition partition, List<DegraderTrackerClient> trackerClients, DegraderLoadBalancerStrategyConfig config)
private void updatePartitionState(long clusterGenerationId, Partition partition,
Map<URI, TrackerClient> trackerClients, DegraderLoadBalancerStrategyConfig config)
{
PartitionDegraderLoadBalancerState partitionState = partition.getState();

List<DegraderTrackerClientUpdater> clientUpdaters = new ArrayList<>();
for (DegraderTrackerClient client: trackerClients)
for (TrackerClient client: trackerClients.values())
{
clientUpdaters.add(new DegraderTrackerClientUpdater(client, partition.getId()));
clientUpdaters.add(new DegraderTrackerClientUpdater((DegraderTrackerClient) client, partition.getId()));
}

boolean quarantineEnabled = _state.isQuarantineEnabled();
Expand Down Expand Up @@ -1049,7 +1022,7 @@ public static void overrideMinCallCount(int partitionId, double newOverrideDropR
* @return True if we should update, and false otherwise.
*/
protected static boolean shouldUpdatePartition(long clusterGenerationId, PartitionDegraderLoadBalancerState partitionState,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we create a new method and call the new one from the old? This is a protected method and we'd like backwards compatibility.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kept the old one and created the overloaded version of it. The old one does the conversion (inefficient but still accessible).

DegraderLoadBalancerStrategyConfig config, boolean updateEnabled, boolean shouldForceUpdate, List<DegraderTrackerClient> trackerClients)
DegraderLoadBalancerStrategyConfig config, boolean updateEnabled, boolean shouldForceUpdate, Map<URI, TrackerClient> trackerClients)
{
boolean trackerClientInconsistency = trackerClients.size() != partitionState.getPointsMap().size();
return updateEnabled
Expand Down Expand Up @@ -1115,7 +1088,7 @@ public Ring<URI> getRing(long clusterGenerationId, int partitionId, Map<URI, Tra
return new DelegatingRingFactory<URI>(_config).createRing(Collections.emptyMap(), Collections.emptyMap());
}

checkUpdatePartitionState(clusterGenerationId, partitionId, castToDegraderTrackerClients(trackerClients), shouldForceUpdate);
checkUpdatePartitionState(clusterGenerationId, partitionId, trackerClients, shouldForceUpdate);
return _state.getRing(partitionId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -77,6 +79,7 @@
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;

import javax.sound.midi.Track;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -1560,6 +1563,13 @@ public void TestGetTrackerClients()
assertTrue(i == numberOfClients);
}

private static Map<URI, TrackerClient> createTrackerClientMap(List<URI> uris) throws URISyntaxException{
Map<URI, TrackerClient> clients = new HashMap<>();
for (URI uri : uris) {
clients.put(uri, getClient(uri));
}
return clients;
}

@Test(groups = { "small", "back-end" })
public void testshouldUpdatePartition() throws URISyntaxException
Expand All @@ -1570,17 +1580,16 @@ public void testshouldUpdatePartition() throws URISyntaxException
myConfig.put(PropertyKeys.HTTP_LB_STRATEGY_PROPERTIES_UPDATE_INTERVAL_MS, 5000L);
myConfig.put(PropertyKeys.HTTP_LB_STRATEGY_PROPERTIES_MAX_CLUSTER_LATENCY_WITHOUT_DEGRADING, 100d);
DegraderLoadBalancerStrategyV3 strategy = getStrategy(myConfig);
List<DegraderTrackerClient> clients = new ArrayList<>();
Map<URI, Integer> pointsMap = new HashMap<>();
long clusterCallCount = 15;
RingFactory<URI> ringFactory = new DelegatingRingFactory<>(new DegraderLoadBalancerStrategyConfig(1L));

URI uri1 = URI.create("http://test.linkedin.com:3242/fdsaf");
URI uri2 = URI.create("http://test.linkedin.com:3243/fdsaf");
clients.add(getClient(uri1));
clients.add(getClient(uri2));
pointsMap.put(uri1, 1);
pointsMap.put(uri2, 1);
List<URI> uris = Arrays.asList(uri1, uri2);

Map<URI, TrackerClient> clients = createTrackerClientMap(uris);
pointsMap.put(uris.get(0), 1);
pointsMap.put(uris.get(1), 1);

// state is default initialized, new cluster generation
assertTrue(DegraderLoadBalancerStrategyV3.shouldUpdatePartition(0,
Expand Down Expand Up @@ -1695,17 +1704,17 @@ public void testshouldUpdatePartitionOnlyAtInterval() throws URISyntaxException
myConfig.put(PropertyKeys.HTTP_LB_STRATEGY_PROPERTIES_MAX_CLUSTER_LATENCY_WITHOUT_DEGRADING, 100d);
myConfig.put(PropertyKeys.HTTP_LB_STRATEGY_PROPERTIES_UPDATE_ONLY_AT_INTERVAL, true);
DegraderLoadBalancerStrategyV3 strategy = getStrategy(myConfig);
List<DegraderTrackerClient> clients = new ArrayList<>();
Map<URI, Integer> pointsMap = new HashMap<>();
long clusterCallCount = 15;
RingFactory<URI> ringFactory = new DelegatingRingFactory<>(new DegraderLoadBalancerStrategyConfig(1L));

URI uri1 = URI.create("http://test.linkedin.com:3242/fdsaf");
URI uri2 = URI.create("http://test.linkedin.com:3243/fdsaf");
clients.add(getClient(uri1));
clients.add(getClient(uri2));
pointsMap.put(uri1, 1);
pointsMap.put(uri2, 1);
List<URI> uris = Arrays.asList(uri1, uri2);

Map<URI, TrackerClient> clients = createTrackerClientMap(uris);
pointsMap.put(uris.get(0), 1);
pointsMap.put(uris.get(1), 1);

PartitionDegraderLoadBalancerState current =
strategy.getState().getPartitionState(DEFAULT_PARTITION_ID);
Expand Down
Loading