From 90659b59fef049e82818624c859b670e177ea217 Mon Sep 17 00:00:00 2001 From: Gaurav Chandani Date: Tue, 16 Apr 2024 21:09:59 +0530 Subject: [PATCH] Refactored to reuse duplicate code Signed-off-by: Gaurav Chandani --- .../routing/allocation/AllocationService.java | 14 +- .../gateway/ShardsBatchGatewayAllocator.java | 140 ++++++++---------- 2 files changed, 67 insertions(+), 87 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index 822497521e2f3..d6b364887b560 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -574,11 +574,7 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { If we do not have any custom allocator set then we will be using ShardsBatchGatewayAllocator Currently AllocationService will not run any custom Allocator that implements allocateAllUnassignedShards */ - ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME); - allocator.allocateAllUnassignedShards(allocation, true); - allocator.afterPrimariesBeforeReplicas(allocation); - // Replicas Assignment - allocator.allocateAllUnassignedShards(allocation, false); + allocateAllUnassignedShards(allocation); return; } logger.warn("Falling back to single shard assignment since batch mode disable or multiple custom allocators set"); @@ -604,6 +600,14 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { } } + private void allocateAllUnassignedShards(RoutingAllocation allocation) { + ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME); + allocator.allocateAllUnassignedShards(allocation, true); + allocator.afterPrimariesBeforeReplicas(allocation); + // Replicas Assignment + allocator.allocateAllUnassignedShards(allocation, false); + } + private void disassociateDeadNodes(RoutingAllocation allocation) { for (Iterator it = allocation.routingNodes().mutableIterator(); it.hasNext();) { RoutingNode node = it.next(); diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index e5367883d9434..7683ab6ed75dc 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -480,48 +480,12 @@ protected AsyncShardFetch.FetchResult inEligibleShards, RoutingAllocation allocation ) { - ShardRouting shardRouting = eligibleShards.iterator().hasNext() ? eligibleShards.iterator().next() : null; - shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() - ? inEligibleShards.iterator().next() - : shardRouting; - if (shardRouting == null) { - return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap()); - } - String batchId = getBatchId(shardRouting, shardRouting.primary()); - if (batchId == null) { - logger.debug("Shard {} has no batch id", shardRouting); - throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching"); - } - - if (batchIdToStartedShardBatch.containsKey(batchId) == false) { - logger.debug("Batch {} has no started shard batch", batchId); - throw new IllegalStateException("Batch " + batchId + " has no started shard batch"); - } - - ShardsBatch shardsBatch = batchIdToStartedShardBatch.get(batchId); - // remove in eligible shards which allocator is not responsible for - inEligibleShards.forEach(sr -> safelyRemoveShardFromBatch(sr, sr.primary())); - - if (shardsBatch.getBatchedShards().isEmpty() && eligibleShards.isEmpty()) { - logger.debug("Batch {} is empty", batchId); - return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap()); - } - - Map> shardToIgnoreNodes = new HashMap<>(); - - for (ShardId shardId : shardsBatch.asyncBatch.shardAttributesMap.keySet()) { - shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId)); - } - AsyncShardBatchFetch asyncFetcher = shardsBatch.getAsyncFetcher(); - AsyncShardFetch.FetchResult shardBatchState = asyncFetcher.fetchData( - allocation.nodes(), - shardToIgnoreNodes - ); - - if (shardBatchState.hasData()) { - shardBatchState.processAllocation(allocation); - } - return (AsyncShardFetch.FetchResult) shardBatchState; + return (AsyncShardFetch.FetchResult< + TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch>) fetchDataAndCleanIneligibleShards( + eligibleShards, + inEligibleShards, + allocation + ); } } @@ -534,46 +498,12 @@ protected AsyncShardFetch.FetchResult inEligibleShards, RoutingAllocation allocation ) { - // get batch id for anyone given shard. We are assuming all shards will have same batchId - ShardRouting shardRouting = eligibleShards.iterator().hasNext() ? eligibleShards.iterator().next() : null; - shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() - ? inEligibleShards.iterator().next() - : shardRouting; - if (shardRouting == null) { - return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap()); - } - String batchId = getBatchId(shardRouting, shardRouting.primary()); - if (batchId == null) { - logger.debug("Shard {} has no batch id", shardRouting); - throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching"); - } - - if (batchIdToStoreShardBatch.containsKey(batchId) == false) { - logger.debug("Batch {} has no store shard batch", batchId); - throw new IllegalStateException("Batch " + batchId + " has no shard store batch"); - } - - ShardsBatch shardsBatch = batchIdToStoreShardBatch.get(batchId); - // remove in eligible shards which allocator is not responsible for - inEligibleShards.forEach(sr -> safelyRemoveShardFromBatch(sr, sr.primary())); - - if (shardsBatch.getBatchedShards().isEmpty() && eligibleShards.isEmpty()) { - logger.debug("Batch {} is empty", batchId); - return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap()); - } - Map> shardToIgnoreNodes = new HashMap<>(); - for (ShardId shardId : shardsBatch.asyncBatch.shardAttributesMap.keySet()) { - shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId)); - } - AsyncShardBatchFetch asyncFetcher = shardsBatch.getAsyncFetcher(); - AsyncShardFetch.FetchResult shardBatchStores = asyncFetcher.fetchData( - allocation.nodes(), - shardToIgnoreNodes - ); - if (shardBatchStores.hasData()) { - shardBatchStores.processAllocation(allocation); - } - return (AsyncShardFetch.FetchResult) shardBatchStores; + return (AsyncShardFetch.FetchResult< + TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch>) fetchDataAndCleanIneligibleShards( + eligibleShards, + inEligibleShards, + allocation + ); } @Override @@ -583,6 +513,52 @@ protected boolean hasInitiatedFetching(ShardRouting shard) { } } + AsyncShardFetch.FetchResult fetchDataAndCleanIneligibleShards( + List eligibleShards, + List inEligibleShards, + RoutingAllocation allocation + ) { + // get batch id for anyone given shard. We are assuming all shards will have same batchId + ShardRouting shardRouting = eligibleShards.iterator().hasNext() ? eligibleShards.iterator().next() : null; + shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting; + if (shardRouting == null) { + return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap()); + } + String batchId = getBatchId(shardRouting, shardRouting.primary()); + if (batchId == null) { + logger.debug("Shard {} has no batch id", shardRouting); + throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching"); + } + ConcurrentMap batches = shardRouting.primary() ? batchIdToStartedShardBatch : batchIdToStoreShardBatch; + if (batches.containsKey(batchId) == false) { + logger.debug("Batch {} has no shards batch", batchId); + throw new IllegalStateException("Batch " + batchId + " has no shards batch"); + } + + ShardsBatch shardsBatch = batches.get(batchId); + // remove in eligible shards which allocator is not responsible for + inEligibleShards.forEach(sr -> safelyRemoveShardFromBatch(sr, sr.primary())); + + if (shardsBatch.getBatchedShards().isEmpty() && eligibleShards.isEmpty()) { + logger.debug("Batch {} is empty", batchId); + return new AsyncShardFetch.FetchResult<>(null, Collections.emptyMap()); + } + Map> shardToIgnoreNodes = new HashMap<>(); + for (ShardId shardId : shardsBatch.asyncBatch.shardAttributesMap.keySet()) { + shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId)); + } + AsyncShardBatchFetch asyncFetcher = shardsBatch.getAsyncFetcher(); + AsyncShardFetch.FetchResult fetchResult = asyncFetcher.fetchData( + allocation.nodes(), + shardToIgnoreNodes + ); + if (fetchResult.hasData()) { + fetchResult.processAllocation(allocation); + } + + return fetchResult; + } + /** * Holds information about a batch of shards to be allocated. * Async fetcher is used to fetch the data for the batch.