Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,58 @@ int getClusterCountFromCache(String clusterName, String scheme, int partitionId)
return -1;
}

@Override
public int getClusterCountAcrossPartitions(String clusterName, String scheme) throws ServiceUnavailableException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that all partitions should be handled independently. As in, this method should return Map<Integer, Integer>. The overall host count might not be a good indicator of the traffic multipliers. The same server host can register with multiple partitions at the service owners' discretion. It's possible that some partitions have 10x the number of hosts in the dark cluster while others have only 1x.

{
FutureCallback<Integer> clusterCountFutureCallback = new FutureCallback<>();

_state.listenToCluster(clusterName, (type, name) ->
{
if (_state.getUriProperties(clusterName) != null && _state.getUriProperties(clusterName).getProperty() != null)
{
Set<URI> allUris = _state.getUriProperties(clusterName).getProperty().Uris();
int count = (int) allUris.stream().filter(uri -> scheme.equalsIgnoreCase(uri.getScheme())).count();
clusterCountFutureCallback.onSuccess(count);
}
else
{
// there won't be a UriProperties if there are no Uris announced. Return zero in this case.
clusterCountFutureCallback.onSuccess(0);
}
});

try
{
return clusterCountFutureCallback.get(_timeout, _unit);
}
catch (ExecutionException | TimeoutException | IllegalStateException | InterruptedException e)
{
if (e instanceof TimeoutException || e.getCause() instanceof TimeoutException)
{
int clusterCount = getClusterCountAcrossPartitionsFromCache(clusterName, scheme);
if (clusterCount >= 0)
{
return clusterCount;
}
}
die("ClusterInfo",
"PEGA_1017, unable to retrieve cluster count across partitions for cluster: " + clusterName + ", scheme: " + scheme
+ ", exception: " + e);
return -1;
}
}

@VisibleForTesting
int getClusterCountAcrossPartitionsFromCache(String clusterName, String scheme)
{
if (_state.getUriProperties(clusterName) != null && _state.getUriProperties(clusterName).getProperty() != null)
{
Set<URI> allUris = _state.getUriProperties(clusterName).getProperty().Uris();
return (int) allUris.stream().filter(uri -> scheme.equalsIgnoreCase(uri.getScheme())).count();
}
return -1;
}

@Override
public DarkClusterConfigMap getDarkClusterConfigMap(String clusterName) throws ServiceUnavailableException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,31 @@ public interface ClusterInfoProvider
*/
int getClusterCount(String clusterName, String scheme, int partitionId) throws ServiceUnavailableException;

/**
* Obtain d2 cluster count across all partitions for HTTPS scheme.
* Default implementation delegates to the scheme-specific method using HTTPS.
*
* @return int
*/
default int getClusterCountAcrossPartitions(String clusterName) throws ServiceUnavailableException
{
return getClusterCountAcrossPartitions(clusterName, PropertyKeys.HTTPS_SCHEME);
}

/**
* Obtain d2 cluster count across all partitions for the given scheme.
*
* Note: Default implementation preserves backwards compatibility by delegating
* to {@link #getClusterCount(String, String, int)} with the default partition id.
* Implementations that can compute counts across all partitions should override this.
*
* @return int
*/
default int getClusterCountAcrossPartitions(String clusterName, String scheme) throws ServiceUnavailableException
{
throw new UnsupportedOperationException("getClusterCountAcrossPartitions(String, String) is not supported");
}

/**
* Helpful utility method for default behavior
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ public int getClusterCount(String clusterName, String scheme, int partitionId) t
return _clusterInfoProvider.getClusterCount(clusterName, scheme, partitionId);
}

@Override
public int getClusterCountAcrossPartitions(String clusterName, String scheme) throws ServiceUnavailableException {
return _clusterInfoProvider.getClusterCountAcrossPartitions(clusterName, scheme);
}

@Override
public DarkClusterConfigMap getDarkClusterConfigMap(String clusterName)
throws ServiceUnavailableException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,63 @@ public void testGetClusterCountTimeout() throws Exception
verify(loadBalancer).getClusterCountFromCache(CLUSTER1_NAME, PropertyKeys.HTTP_SCHEME, partitionId);
}

@Test
public void testGetClusterCountAcrossPartitions() throws Exception
{
MockStore<ServiceProperties> serviceRegistry = new MockStore<>();
MockStore<ClusterProperties> clusterRegistry = new MockStore<>();
MockStore<UriProperties> uriRegistry = new MockStore<>();

SimpleLoadBalancerState state =
new SimpleLoadBalancerState(new SynchronousExecutorService(), uriRegistry, clusterRegistry, serviceRegistry,
new HashMap<>(), new HashMap<>());
SimpleLoadBalancer loadBalancer = spy(new SimpleLoadBalancer(state, 5, TimeUnit.SECONDS, _d2Executor));

// two HTTPS URIs in different partitions and one HTTP URI; across-partitions HTTPS should be 2
Map<URI, Map<Integer, PartitionData>> uriData = new HashMap<>();
Map<Integer, PartitionData> p0 = new HashMap<>();
p0.put(0, new PartitionData(1d));
Map<Integer, PartitionData> p1 = new HashMap<>();
p1.put(1, new PartitionData(1d));
uriData.put(URI.create("https://h1:1"), p0);
uriData.put(URI.create("https://h2:1"), p1);
uriData.put(URI.create("http://h3:1"), p0);

clusterRegistry.put(CLUSTER1_NAME, new ClusterProperties(CLUSTER1_NAME));
uriRegistry.put(CLUSTER1_NAME, new UriProperties(CLUSTER1_NAME, uriData));

assertEquals(loadBalancer.getClusterCountAcrossPartitions(CLUSTER1_NAME, PropertyKeys.HTTPS_SCHEME), 2);
assertEquals(loadBalancer.getClusterCountAcrossPartitions(CLUSTER1_NAME, PropertyKeys.HTTP_SCHEME), 1);
}

@Test
public void testGetClusterCountAcrossPartitionsTimeoutFallsBackToCache() throws Exception
{
MockStore<ServiceProperties> serviceRegistry = new MockStore<>();
MockStore<ClusterProperties> clusterRegistry = new MockStore<>();
MockStore<UriProperties> uriRegistry = new MockStore<>();

SimpleLoadBalancerState state =
spy(new SimpleLoadBalancerState(new SynchronousExecutorService(), uriRegistry, clusterRegistry, serviceRegistry,
new HashMap<>(), new HashMap<>()));
doAnswer(invocation -> { Thread.sleep(10); return null; }).when(state).listenToCluster(any(), any());
SimpleLoadBalancer loadBalancer = spy(new SimpleLoadBalancer(state, 1, TimeUnit.MILLISECONDS, _d2Executor));

Map<URI, Map<Integer, PartitionData>> uriData = new HashMap<>();
Map<Integer, PartitionData> p0 = new HashMap<>();
p0.put(0, new PartitionData(1d));
Map<Integer, PartitionData> p1 = new HashMap<>();
p1.put(1, new PartitionData(1d));
uriData.put(URI.create("https://h1:1"), p0);
uriData.put(URI.create("https://h2:1"), p1);

when(state.getUriProperties(CLUSTER1_NAME)).thenReturn(new LoadBalancerStateItem<>(new UriProperties(CLUSTER1_NAME, uriData), 1, 1));

int result = loadBalancer.getClusterCountAcrossPartitions(CLUSTER1_NAME, PropertyKeys.HTTPS_SCHEME);
verify(loadBalancer).getClusterCountAcrossPartitionsFromCache(CLUSTER1_NAME, PropertyKeys.HTTPS_SCHEME);
assertEquals(result, 2);
}

@Test
public void testGetDarkClusterConfigMapTimeout() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class MockClusterInfoProvider implements ClusterInfoProvider
Map<String, DarkClusterConfigMap> lookupMap = new HashMap<>();
List<LoadBalancerClusterListener> clusterListeners = new ArrayList<>();
Map<String, Integer> clusterHttpsCount = new HashMap<>();
Map<String, Integer> clusterHttpsCountAcrossPartitions = new HashMap<>();

@Override
public int getClusterCount(String clusterName, String scheme, int partitionId)
Expand All @@ -50,6 +51,13 @@ public int getHttpsClusterCount(String clusterName)
return clusterHttpsCount.getOrDefault(clusterName, 1);
}

@Override
public int getClusterCountAcrossPartitions(String clusterName)
throws ServiceUnavailableException
{
return clusterHttpsCountAcrossPartitions.getOrDefault(clusterName, getHttpsClusterCount(clusterName));
}

@Override
public DarkClusterConfigMap getDarkClusterConfigMap(String clusterName)
throws ServiceUnavailableException
Expand Down Expand Up @@ -125,4 +133,9 @@ void putHttpsClusterCount(String clusterName, Integer httpsCount)
// overwrites if anything is already there for this clusterName
clusterHttpsCount.put(clusterName, httpsCount);
}

void putHttpsClusterCountAcrossPartitions(String clusterName, Integer httpsCount)
{
clusterHttpsCountAcrossPartitions.put(clusterName, httpsCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ private float getSendRate()
{
// Only support https for now. http support can be added later if truly needed, but would be non-ideal
// because potentially both dark and source would have to be configured.
int numDarkClusterInstances = _clusterInfoProvider.getHttpsClusterCount(_darkClusterName);
int numSourceClusterInstances = _clusterInfoProvider.getHttpsClusterCount(_originalClusterName);
int numDarkClusterInstances = _clusterInfoProvider.getClusterCountAcrossPartitions(_darkClusterName);
int numSourceClusterInstances = _clusterInfoProvider.getClusterCountAcrossPartitions(_originalClusterName);
if (numSourceClusterInstances != 0)
{
return (numDarkClusterInstances * _darkClusterPerHostQps) / numSourceClusterInstances;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ private int getNumDuplicateRequests(RequestContext requestContext)
{
// Only support https for now. http support can be added later if truly needed, but would be non-ideal
// because potentially both dark and source would have to be configured.
int numDarkClusterInstances = _clusterInfoProvider.getHttpsClusterCount(_darkClusterName);
int numSourceClusterInstances = _clusterInfoProvider.getHttpsClusterCount(_originalClusterName);
int numDarkClusterInstances = _clusterInfoProvider.getClusterCountAcrossPartitions(_darkClusterName);
int numSourceClusterInstances = _clusterInfoProvider.getClusterCountAcrossPartitions(_originalClusterName);
float randomNumber;
if (requestContext.getLocalAttr(RANDOM_NUMBER_KEY) == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ private int getNumDuplicateRequests()
{
// Only support https for now. http support can be added later if truly needed, but would be non-ideal
// because potentially both dark and source would have to be configured.
int numDarkClusterInstances = _clusterInfoProvider.getHttpsClusterCount(_darkClusterName);
int numSourceClusterInstances = _clusterInfoProvider.getHttpsClusterCount(_originalClusterName);
int numDarkClusterInstances = _clusterInfoProvider.getClusterCountAcrossPartitions(_darkClusterName);
int numSourceClusterInstances = _clusterInfoProvider.getClusterCountAcrossPartitions(_originalClusterName);
if (numSourceClusterInstances != 0)
{
float avgNumDarkRequests = (numDarkClusterInstances * _multiplier) / numSourceClusterInstances;
Expand Down