diff --git a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixClusterManager.java b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixClusterManager.java index 01f1407548..c56bc18a70 100644 --- a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixClusterManager.java +++ b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixClusterManager.java @@ -113,6 +113,7 @@ public class HelixClusterManager implements ClusterMap { private final PartitionSelectionHelper partitionSelectionHelper; private final Map> partitionOverrideInfoMap = new HashMap<>(); private final Map bootstrapReplicas = new ConcurrentHashMap<>(); + private final Map> disksAttemptedForBootstrap = new ConcurrentHashMap<>(); private ZkHelixPropertyStore helixPropertyStoreInLocalDc = null; private HelixAdmin localHelixAdmin = null; // The current xid currently does not change after instantiation. This can change in the future, allowing the cluster @@ -446,6 +447,9 @@ public ReplicaId getBootstrapReplica(String partitionIdStr, DataNodeId dataNodeI if (bootstrapReplica != null && instanceName.equals(selfInstanceName)) { // Note that this method might be called by several state transition threads concurrently. bootstrapReplicas.put(partitionIdStr, bootstrapReplica); + // Add the selected disk so that it is not picked + disksAttemptedForBootstrap.computeIfAbsent(partitionIdStr, k -> new HashSet<>()) + .add(bootstrapReplica.getDiskId()); } return bootstrapReplica; } catch (Exception e) { @@ -643,6 +647,13 @@ Map getBootstrapReplicaMap() { return Collections.unmodifiableMap(bootstrapReplicas); } + /** + * Exposed for testing + */ + void clearBootstrapDiskSelectionMap() { + disksAttemptedForBootstrap.clear(); + } + /** * Exposed for testing * @param dcName data center name @@ -743,6 +754,8 @@ private void addClusterWideRawCapacity(long diskRawCapacityBytes) { * @return bootstrap replica or {@code null} if not found. */ private AmbryReplica fetchBootstrapReplica(String partitionName) { + // Clear disks tried for bootstrapping this replica + disksAttemptedForBootstrap.remove(partitionName); return (AmbryReplica) bootstrapReplicas.remove(partitionName); } @@ -860,7 +873,7 @@ private synchronized ReplicaId getBootstrapReplicaInFullAuto(String partitionIdS } try { - AmbryDisk disk = getDiskForBootstrapReplica((AmbryDataNode) dataNodeId, replicaCapacity); + AmbryDisk disk = getDiskForBootstrapReplica((AmbryDataNode) dataNodeId, replicaCapacity, partitionIdStr); if (disk == null) { logger.error("Failed to create bootstrap replica for partition {} due to insufficient disk space", partitionIdStr); @@ -947,10 +960,11 @@ private int getPartitionDefaultDiskWeight() { * since it can be queried concurrently when multiple replicas are bootstrapped. * @param dataNode the {@link DataNodeId} on which disk is needed * @param replicaCapacity the capacity of the replica in bytes + * @param partitionName partition name * @return {@link AmbryDisk} which has maximum available or free capacity. If none of the disks have free space, * returns null. */ - private AmbryDisk getDiskForBootstrapReplica(AmbryDataNode dataNode, long replicaCapacity) { + private AmbryDisk getDiskForBootstrapReplica(AmbryDataNode dataNode, long replicaCapacity, String partitionName) { Set disks = ambryDataNodeToAmbryDisks.get(dataNode); List potentialDisks = new ArrayList<>(); long maxAvailableDiskSpace = 0; @@ -964,6 +978,12 @@ private AmbryDisk getDiskForBootstrapReplica(AmbryDataNode dataNode, long replic disk, disk.getAvailableSpaceInBytes(), replicaCapacity); continue; } + if (disksAttemptedForBootstrap.containsKey(partitionName) && disksAttemptedForBootstrap.get(partitionName) + .contains(disk)) { + logger.info("Disk {} has already been tried before for this partition {}. Checking another disk", disk, + partitionName); + continue; + } if (disk.getAvailableSpaceInBytes() == maxAvailableDiskSpace) { potentialDisks.add(disk); } else if (disk.getAvailableSpaceInBytes() > maxAvailableDiskSpace) { diff --git a/ambry-clustermap/src/test/java/com/github/ambry/clustermap/HelixClusterManagerTest.java b/ambry-clustermap/src/test/java/com/github/ambry/clustermap/HelixClusterManagerTest.java index dafda62fa9..fe5dc7656b 100644 --- a/ambry-clustermap/src/test/java/com/github/ambry/clustermap/HelixClusterManagerTest.java +++ b/ambry-clustermap/src/test/java/com/github/ambry/clustermap/HelixClusterManagerTest.java @@ -744,6 +744,7 @@ public void getNewReplicaInFullAutoDiskUsageTest() throws Exception { disk.setDiskAvailableSpace(replicaCapacity); } } + helixClusterManager.clearBootstrapDiskSelectionMap(); ReplicaId bootstrapReplica = helixClusterManager.getBootstrapReplica(partition.toPathString(), ambryDataNode); assertEquals("Mismatch in disk used", expectedDisk, bootstrapReplica.getDiskId()); diff --git a/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java b/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java index 32250b56e7..840c8f7401 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java +++ b/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java @@ -16,6 +16,7 @@ import com.github.ambry.account.AccountService; import com.github.ambry.clustermap.DiskId; +import com.github.ambry.clustermap.HardwareState; import com.github.ambry.clustermap.PartitionId; import com.github.ambry.clustermap.ReplicaId; import com.github.ambry.clustermap.ReplicaStatusDelegate; diff --git a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java index cc53b449b1..a4edbd94cf 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java +++ b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java @@ -575,29 +575,43 @@ private class PartitionStateChangeListenerImpl implements PartitionStateChangeLi public void onPartitionBecomeBootstrapFromOffline(String partitionName) { // check if partition exists on current node ReplicaId replica = partitionNameToReplicaId.get(partitionName); - Store store = null; + Store store; if (replica == null) { - // there can be two scenarios: - // 1. this is the first time to add new replica onto current node; - // 2. last replica addition failed at some point before updating InstanceConfig in Helix - // In either case, we should add replica to current node by calling "addBlobStore(ReplicaId replica)" - ReplicaId replicaToAdd = clusterMap.getBootstrapReplica(partitionName, currentNode); - if (replicaToAdd == null) { - logger.error("No new replica found for partition {} in cluster map", partitionName); - throw new StateTransitionException( - "New replica " + partitionName + " is not found in clustermap for " + currentNode, ReplicaNotFound); - } - // Attempt to add store into storage manager. If store already exists on disk (but not in clustermap), make - // sure old store of this replica is deleted (this store may be created in previous replica addition but failed - // at some point). Then a brand new store associated with this replica should be created and started. - if (!addBlobStore(replicaToAdd)) { - // We have decreased the available disk space in HelixClusterManager#getDiskForBootstrapReplica. Increase it - // back since addition of store failed. - replicaToAdd.getDiskId().increaseAvailableSpaceInBytes(replicaToAdd.getCapacityInBytes()); - logger.error("Failed to add store {} into storage manager", partitionName); - throw new StateTransitionException("Failed to add store " + partitionName + " into storage manager", - ReplicaOperationFailure); - } + + ReplicaId replicaToAdd; + boolean replicaAdded = false; + do { + // there can be two scenarios: + // 1. this is the first time to add new replica onto current node; + // 2. last replica addition failed at some point before updating InstanceConfig in Helix + // In either case, we should add replica to current node by calling "addBlobStore(ReplicaId replica)" + replicaToAdd = clusterMap.getBootstrapReplica(partitionName, currentNode); + if (replicaToAdd == null) { + logger.error("No new replica found for partition {} in cluster map", partitionName); + throw new StateTransitionException( + "New replica " + partitionName + " is not found in clustermap for " + currentNode, ReplicaNotFound); + } + // Attempt to add store into storage manager. If store already exists on disk (but not in clustermap), make + // sure old store of this replica is deleted (this store may be created in previous replica addition but failed + // at some point). Then a brand new store associated with this replica should be created and started. + if (!addBlobStore(replicaToAdd)) { + // We have decreased the available disk space in HelixClusterManager#getDiskForBootstrapReplica. Increase it + // back since addition of store failed. + replicaToAdd.getDiskId().increaseAvailableSpaceInBytes(replicaToAdd.getCapacityInBytes()); + if (!clusterMap.isDataNodeInFullAutoMode(currentNode)) { + logger.error("Failed to add store {} into storage manager", partitionName); + throw new StateTransitionException("Failed to add store " + partitionName + " into storage manager", + ReplicaOperationFailure); + } else { + // TODO: Delete any files added in store and reserve directory + logger.info("Failed to add store {} at location {}. Retrying bootstrapping replica at different location", + partitionName, replicaToAdd.getReplicaPath()); + } + } else { + replicaAdded = true; + } + } while (!replicaAdded); + if (primaryClusterParticipant != null) { // update InstanceConfig in Helix try { diff --git a/ambry-store/src/test/java/com/github/ambry/store/StorageManagerTest.java b/ambry-store/src/test/java/com/github/ambry/store/StorageManagerTest.java index b59d4f0e9d..2eca6a7752 100644 --- a/ambry-store/src/test/java/com/github/ambry/store/StorageManagerTest.java +++ b/ambry-store/src/test/java/com/github/ambry/store/StorageManagerTest.java @@ -1343,6 +1343,50 @@ public void residualDirDeletionTest() throws Exception { shutdownAndAssertStoresInaccessible(storageManager, replicas); } + /** + * Test bootstrap retries in Full auto mode + * @throws Exception + */ + @Test + public void replicaFromOfflineToBootstrapFailureRetryTest() throws Exception { + generateConfigs(true, false); + MockClusterMap spyClusterMap = spy(clusterMap); + MockDataNodeId localNode = spyClusterMap.getDataNodes().get(0); + List localReplicas = spyClusterMap.getReplicaIds(localNode); + MockClusterParticipant mockHelixParticipant = new MockClusterParticipant(); + StorageManager storageManager = + new StorageManager(storeConfig, diskManagerConfig, Utils.newScheduler(1, false), metricRegistry, + new MockIdFactory(), spyClusterMap, localNode, new DummyMessageStoreHardDelete(), + Collections.singletonList(mockHelixParticipant), SystemTime.getInstance(), new DummyMessageStoreRecovery(), + new InMemAccountService(false, false)); + storageManager.start(); + + // 0. Mock the cluster to be in Full auto + doReturn(true).when(spyClusterMap).isDataNodeInFullAutoMode(any()); + + // 1. Create "newReplica1" and shutdown its disk + PartitionId newPartition1 = spyClusterMap.createNewPartition(Collections.singletonList(localNode), 0); + ReplicaId newReplica1 = newPartition1.getReplicaIds().get(0); + ReplicaId replicaOnSameDisk = + localReplicas.stream().filter(r -> r.getDiskId().equals(newReplica1.getDiskId())).findFirst().get(); + storageManager.getDiskManager(replicaOnSameDisk.getPartitionId()).shutdown(); + + // 2. Create "newReplica2" which has disk running + PartitionId newPartition2 = spyClusterMap.createNewPartition(Collections.singletonList(localNode), 1); + ReplicaId newReplica2 = newPartition2.getReplicaIds().get(0); + + // 3. Return "newReplica1" on 1st attempt and "newReplica2" on 2nd attempt + doReturn(newReplica1, newReplica2).when(spyClusterMap).getBootstrapReplica(any(), any()); + + // 4. Invoke bootstrap ST. It should pass on 2nd attempt. + mockHelixParticipant.onPartitionBecomeBootstrapFromOffline(newPartition1.toPathString()); + + // 5. Verify getBootstrap replica is called 2 times + verify(spyClusterMap, times(2)).getBootstrapReplica(anyString(), any()); + + shutdownAndAssertStoresInaccessible(storageManager, localReplicas); + } + /** * Test disk failure handler with real helix clustermap and helix participant. */ diff --git a/ambry-test-utils/src/main/java/com/github/ambry/clustermap/MockClusterMap.java b/ambry-test-utils/src/main/java/com/github/ambry/clustermap/MockClusterMap.java index d2541e66d9..75a86c8021 100644 --- a/ambry-test-utils/src/main/java/com/github/ambry/clustermap/MockClusterMap.java +++ b/ambry-test-utils/src/main/java/com/github/ambry/clustermap/MockClusterMap.java @@ -372,7 +372,7 @@ public PartitionId createNewPartition(List dataNodes) { * @param mountPathIndexToUse the mount path index to use when creating new partition * @return new {@link PartitionId} */ - PartitionId createNewPartition(List dataNodes, int mountPathIndexToUse) { + public PartitionId createNewPartition(List dataNodes, int mountPathIndexToUse) { MockPartitionId partitionId = new MockPartitionId(partitions.size(), DEFAULT_PARTITION_CLASS, dataNodes, mountPathIndexToUse); partitions.put((long) partitions.size(), partitionId);