Skip to content

Commit

Permalink
Refactored to reuse duplicate code
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Chandani <[email protected]>
  • Loading branch information
Gaurav614 committed Apr 16, 2024
1 parent 8153a69 commit 90659b5
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -574,11 +574,7 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
If we do not have any custom allocator set then we will be using ShardsBatchGatewayAllocator
Currently AllocationService will not run any custom Allocator that implements allocateAllUnassignedShards
*/
ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME);
allocator.allocateAllUnassignedShards(allocation, true);
allocator.afterPrimariesBeforeReplicas(allocation);
// Replicas Assignment
allocator.allocateAllUnassignedShards(allocation, false);
allocateAllUnassignedShards(allocation);
return;
}
logger.warn("Falling back to single shard assignment since batch mode disable or multiple custom allocators set");
Expand All @@ -604,6 +600,14 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
}
}

private void allocateAllUnassignedShards(RoutingAllocation allocation) {
ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME);
allocator.allocateAllUnassignedShards(allocation, true);
allocator.afterPrimariesBeforeReplicas(allocation);
// Replicas Assignment
allocator.allocateAllUnassignedShards(allocation, false);
}

private void disassociateDeadNodes(RoutingAllocation allocation) {
for (Iterator<RoutingNode> it = allocation.routingNodes().mutableIterator(); it.hasNext();) {
RoutingNode node = it.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,48 +480,12 @@ protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatc
List<ShardRouting> inEligibleShards,
RoutingAllocation allocation
) {
ShardRouting shardRouting = eligibleShards.iterator().hasNext() ? eligibleShards.iterator().next() : null;
shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext()
? inEligibleShards.iterator().next()
: shardRouting;
if (shardRouting == null) {
return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap());
}
String batchId = getBatchId(shardRouting, shardRouting.primary());
if (batchId == null) {
logger.debug("Shard {} has no batch id", shardRouting);
throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching");
}

if (batchIdToStartedShardBatch.containsKey(batchId) == false) {
logger.debug("Batch {} has no started shard batch", batchId);
throw new IllegalStateException("Batch " + batchId + " has no started shard batch");
}

ShardsBatch shardsBatch = batchIdToStartedShardBatch.get(batchId);
// remove in eligible shards which allocator is not responsible for
inEligibleShards.forEach(sr -> safelyRemoveShardFromBatch(sr, sr.primary()));

if (shardsBatch.getBatchedShards().isEmpty() && eligibleShards.isEmpty()) {
logger.debug("Batch {} is empty", batchId);
return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap());
}

Map<ShardId, Set<String>> shardToIgnoreNodes = new HashMap<>();

for (ShardId shardId : shardsBatch.asyncBatch.shardAttributesMap.keySet()) {
shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId));
}
AsyncShardBatchFetch<? extends BaseNodeResponse, ?> asyncFetcher = shardsBatch.getAsyncFetcher();
AsyncShardFetch.FetchResult<? extends BaseNodeResponse> shardBatchState = asyncFetcher.fetchData(
allocation.nodes(),
shardToIgnoreNodes
);

if (shardBatchState.hasData()) {
shardBatchState.processAllocation(allocation);
}
return (AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch>) shardBatchState;
return (AsyncShardFetch.FetchResult<
TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch>) fetchDataAndCleanIneligibleShards(
eligibleShards,
inEligibleShards,
allocation
);
}

}
Expand All @@ -534,46 +498,12 @@ protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadataBatch.
List<ShardRouting> inEligibleShards,
RoutingAllocation allocation
) {
// get batch id for anyone given shard. We are assuming all shards will have same batchId
ShardRouting shardRouting = eligibleShards.iterator().hasNext() ? eligibleShards.iterator().next() : null;
shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext()
? inEligibleShards.iterator().next()
: shardRouting;
if (shardRouting == null) {
return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap());
}
String batchId = getBatchId(shardRouting, shardRouting.primary());
if (batchId == null) {
logger.debug("Shard {} has no batch id", shardRouting);
throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching");
}

if (batchIdToStoreShardBatch.containsKey(batchId) == false) {
logger.debug("Batch {} has no store shard batch", batchId);
throw new IllegalStateException("Batch " + batchId + " has no shard store batch");
}

ShardsBatch shardsBatch = batchIdToStoreShardBatch.get(batchId);
// remove in eligible shards which allocator is not responsible for
inEligibleShards.forEach(sr -> safelyRemoveShardFromBatch(sr, sr.primary()));

if (shardsBatch.getBatchedShards().isEmpty() && eligibleShards.isEmpty()) {
logger.debug("Batch {} is empty", batchId);
return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap());
}
Map<ShardId, Set<String>> shardToIgnoreNodes = new HashMap<>();
for (ShardId shardId : shardsBatch.asyncBatch.shardAttributesMap.keySet()) {
shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId));
}
AsyncShardBatchFetch<? extends BaseNodeResponse, ?> asyncFetcher = shardsBatch.getAsyncFetcher();
AsyncShardFetch.FetchResult<? extends BaseNodeResponse> shardBatchStores = asyncFetcher.fetchData(
allocation.nodes(),
shardToIgnoreNodes
);
if (shardBatchStores.hasData()) {
shardBatchStores.processAllocation(allocation);
}
return (AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch>) shardBatchStores;
return (AsyncShardFetch.FetchResult<
TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch>) fetchDataAndCleanIneligibleShards(
eligibleShards,
inEligibleShards,
allocation
);
}

@Override
Expand All @@ -583,6 +513,52 @@ protected boolean hasInitiatedFetching(ShardRouting shard) {
}
}

AsyncShardFetch.FetchResult<? extends BaseNodeResponse> fetchDataAndCleanIneligibleShards(
List<ShardRouting> eligibleShards,
List<ShardRouting> inEligibleShards,
RoutingAllocation allocation
) {
// get batch id for anyone given shard. We are assuming all shards will have same batchId
ShardRouting shardRouting = eligibleShards.iterator().hasNext() ? eligibleShards.iterator().next() : null;
shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting;
if (shardRouting == null) {
return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap());
}
String batchId = getBatchId(shardRouting, shardRouting.primary());
if (batchId == null) {
logger.debug("Shard {} has no batch id", shardRouting);
throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching");
}
ConcurrentMap<String, ShardsBatch> batches = shardRouting.primary() ? batchIdToStartedShardBatch : batchIdToStoreShardBatch;
if (batches.containsKey(batchId) == false) {
logger.debug("Batch {} has no shards batch", batchId);
throw new IllegalStateException("Batch " + batchId + " has no shards batch");
}

ShardsBatch shardsBatch = batches.get(batchId);
// remove in eligible shards which allocator is not responsible for
inEligibleShards.forEach(sr -> safelyRemoveShardFromBatch(sr, sr.primary()));

if (shardsBatch.getBatchedShards().isEmpty() && eligibleShards.isEmpty()) {
logger.debug("Batch {} is empty", batchId);
return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap());
}
Map<ShardId, Set<String>> shardToIgnoreNodes = new HashMap<>();
for (ShardId shardId : shardsBatch.asyncBatch.shardAttributesMap.keySet()) {
shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId));
}
AsyncShardBatchFetch<? extends BaseNodeResponse, ?> asyncFetcher = shardsBatch.getAsyncFetcher();
AsyncShardFetch.FetchResult<? extends BaseNodeResponse> fetchResult = asyncFetcher.fetchData(
allocation.nodes(),
shardToIgnoreNodes
);
if (fetchResult.hasData()) {
fetchResult.processAllocation(allocation);
}

return fetchResult;
}

/**
* Holds information about a batch of shards to be allocated.
* Async fetcher is used to fetch the data for the batch.
Expand Down

0 comments on commit 90659b5

Please sign in to comment.