Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,31 +108,41 @@ public class SimpleLoadBalancer implements LoadBalancer, HashRingProvider, Clien
private final Random _random = new Random();
private final FailoutConfigProvider _failoutConfigProvider;
private final ExecutorService _d2CallbackExecutorService;
private final Long _cacheExpiryInMS;
private final HashMap<String, Pair<Long, TrackerClientSubsetItem>> _cachePerClusterPartition = new HashMap<>();

public SimpleLoadBalancer(LoadBalancerState state, ScheduledExecutorService executorService)
{
this(state, new Stats(1000), new Stats(1000), 0, TimeUnit.SECONDS, executorService, null);
this(state, 0, TimeUnit.SECONDS, executorService, null);
}

public SimpleLoadBalancer(LoadBalancerState state, long timeout, TimeUnit unit, ScheduledExecutorService executor)
{
this(state, new Stats(1000), new Stats(1000), timeout, unit, executor, null);
this(state, timeout, unit, executor, null);
}

public SimpleLoadBalancer(LoadBalancerState state, long timeout, TimeUnit unit, ScheduledExecutorService executor,
FailoutConfigProviderFactory failoutConfigProviderFactory)
FailoutConfigProviderFactory failoutConfigProviderFactory)
{
this(state, new Stats(1000), new Stats(1000), timeout, unit, executor, failoutConfigProviderFactory);
this(state, timeout, unit, executor, failoutConfigProviderFactory, 0, TimeUnit.MILLISECONDS);
}

public SimpleLoadBalancer(LoadBalancerState state, long timeout, TimeUnit unit, ScheduledExecutorService executor,
FailoutConfigProviderFactory failoutConfigProviderFactory, long stateUpdateCooldown, TimeUnit stateUpdateCooldownUnit)
{
this(state, new Stats(1000), new Stats(1000), timeout, unit, executor, failoutConfigProviderFactory,
stateUpdateCooldown, stateUpdateCooldownUnit);
}

public SimpleLoadBalancer(LoadBalancerState state,
Stats serviceAvailableStats,
Stats serviceUnavailableStats,
long timeout,
TimeUnit unit,
ScheduledExecutorService executor,
FailoutConfigProviderFactory failoutConfigProviderFactory)
Stats serviceAvailableStats,
Stats serviceUnavailableStats,
long timeout,
TimeUnit unit,
ScheduledExecutorService executor,
FailoutConfigProviderFactory failoutConfigProviderFactory,
long clusterUpdateCooldown,
TimeUnit clusterUpdateCooldownUnit)
{
_state = state;
_serviceUnavailableStats = serviceUnavailableStats;
Expand All @@ -142,6 +152,7 @@ public SimpleLoadBalancer(LoadBalancerState state,
_timeout = timeout;
_unit = unit;
_executor = executor;
_cacheExpiryInMS = clusterUpdateCooldownUnit.toMillis(clusterUpdateCooldown);
if (failoutConfigProviderFactory != null)
{
_failoutConfigProvider = failoutConfigProviderFactory.create(state);
Expand Down Expand Up @@ -1119,6 +1130,66 @@ private TrackerClient chooseTrackerClient(Request request, RequestContext reques
// now try and find a tracker client for the uri
TrackerClient trackerClient = null;
URI targetHost = KeyMapper.TargetHostHints.getRequestContextTargetHost(requestContext);
int partitionId = getPartitionId(request, serviceName, clusterName, uris, targetHost);

Map<URI, TrackerClient> clientsToLoadBalance = null;

for (LoadBalancerState.SchemeStrategyPair pair : orderedStrategies)
{
LoadBalancerStrategy strategy = pair.getStrategy();
String scheme = pair.getScheme();
String key = serviceName + "@" + clusterName + "@" + partitionId + "@" + scheme;
TrackerClientSubsetItem subsetItem = getPotentialClientsFromCache(key);
if (subsetItem == null)
{
subsetItem = getPotentialClients(serviceName, serviceProperties, cluster,
uriItem.getProperty(), scheme, partitionId, uriItem.getVersion());
_cachePerClusterPartition.put(key, Pair.of(System.currentTimeMillis(), subsetItem));
}
clientsToLoadBalance = subsetItem.getWeightedSubset();

trackerClient =
strategy.getTrackerClient(request, requestContext, uriItem.getVersion(), partitionId, clientsToLoadBalance,
subsetItem.shouldForceUpdate());

debug(_log,
"load balancer strategy for ",
serviceName,
" returned: ",
trackerClient);

// break as soon as we find an available cluster client
if (trackerClient != null)
{
break;
}
}

if (trackerClient == null)
{
if (clientsToLoadBalance == null || clientsToLoadBalance.isEmpty())
{
String requestedSchemes = orderedStrategies.stream()
.map(LoadBalancerState.SchemeStrategyPair::getScheme).collect(Collectors.joining(","));

die(serviceName, "PEGA_1015. Service: " + serviceName + " unable to find a host to route the request"
+ " in partition: " + partitionId + " cluster: " + clusterName + " scheme: [" + requestedSchemes + "]," +
" total hosts in cluster: " + uris.Uris().size() + "."
+ " Check what cluster and scheme your servers are announcing to.");
}
else
{
die(serviceName, "PEGA_1016. Service: " + serviceName + " is in a bad state (high latency/high error). "
+ "Dropping request. Cluster: " + clusterName + ", partitionId:" + partitionId
+ " (choosable: " + clientsToLoadBalance.size() + " hosts, total in cluster: " + uris.Uris().size() + ")");
}
}

return trackerClient;
}

private int getPartitionId(Request request, String serviceName, String clusterName, UriProperties uris,
URI targetHost) throws ServiceUnavailableException {
int partitionId = -1;
URI requestUri = request.getURI();

Expand Down Expand Up @@ -1169,56 +1240,20 @@ private TrackerClient chooseTrackerClient(Request request, RequestContext reques
partitionId = iterator.next();
}
}
return partitionId;
}

Map<URI, TrackerClient> clientsToLoadBalance = null;

for (LoadBalancerState.SchemeStrategyPair pair : orderedStrategies)
@Nullable
private TrackerClientSubsetItem getPotentialClientsFromCache(String key)
{
Pair<Long, TrackerClientSubsetItem> p = _cachePerClusterPartition.get(key);
if (_cacheExpiryInMS == 0 || p == null || System.currentTimeMillis() > p.getLeft() + _cacheExpiryInMS)
{
LoadBalancerStrategy strategy = pair.getStrategy();
String scheme = pair.getScheme();

TrackerClientSubsetItem subsetItem = getPotentialClients(serviceName, serviceProperties, cluster,
uris, scheme, partitionId, uriItem.getVersion());
clientsToLoadBalance = subsetItem.getWeightedSubset();

trackerClient =
strategy.getTrackerClient(request, requestContext, uriItem.getVersion(), partitionId, clientsToLoadBalance,
subsetItem.shouldForceUpdate());

debug(_log,
"load balancer strategy for ",
serviceName,
" returned: ",
trackerClient);

// break as soon as we find an available cluster client
if (trackerClient != null)
{
break;
}
}

if (trackerClient == null)
return null;
} else
{
if (clientsToLoadBalance == null || clientsToLoadBalance.isEmpty())
{
String requestedSchemes = orderedStrategies.stream()
.map(LoadBalancerState.SchemeStrategyPair::getScheme).collect(Collectors.joining(","));

die(serviceName, "PEGA_1015. Service: " + serviceName + " unable to find a host to route the request"
+ " in partition: " + partitionId + " cluster: " + clusterName + " scheme: [" + requestedSchemes + "]," +
" total hosts in cluster: " + uris.Uris().size() + "."
+ " Check what cluster and scheme your servers are announcing to.");
}
else
{
die(serviceName, "PEGA_1016. Service: " + serviceName + " is in a bad state (high latency/high error). "
+ "Dropping request. Cluster: " + clusterName + ", partitionId:" + partitionId
+ " (choosable: " + clientsToLoadBalance.size() + " hosts, total in cluster: " + uris.Uris().size() + ")");
}
return p.getRight();
}

return trackerClient;
}

private void die(String serviceName, String message) throws ServiceUnavailableException
Expand Down