Skip to content

Commit

Permalink
Retry different disk when bootstrap fails in Full auto mode (#2659)
Browse files Browse the repository at this point in the history
Retry different disk when bootstrap fails
  • Loading branch information
Arun-LinkedIn authored Nov 30, 2023
1 parent e00e244 commit c9537d5
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public class HelixClusterManager implements ClusterMap {
private final PartitionSelectionHelper partitionSelectionHelper;
private final Map<String, Map<String, String>> partitionOverrideInfoMap = new HashMap<>();
private final Map<String, ReplicaId> bootstrapReplicas = new ConcurrentHashMap<>();
private final Map<String, Set<DiskId>> disksAttemptedForBootstrap = new ConcurrentHashMap<>();
private ZkHelixPropertyStore<ZNRecord> helixPropertyStoreInLocalDc = null;
private HelixAdmin localHelixAdmin = null;
// The current xid currently does not change after instantiation. This can change in the future, allowing the cluster
Expand Down Expand Up @@ -446,6 +447,9 @@ public ReplicaId getBootstrapReplica(String partitionIdStr, DataNodeId dataNodeI
if (bootstrapReplica != null && instanceName.equals(selfInstanceName)) {
// Note that this method might be called by several state transition threads concurrently.
bootstrapReplicas.put(partitionIdStr, bootstrapReplica);
// Add the selected disk so that it is not picked
disksAttemptedForBootstrap.computeIfAbsent(partitionIdStr, k -> new HashSet<>())
.add(bootstrapReplica.getDiskId());
}
return bootstrapReplica;
} catch (Exception e) {
Expand Down Expand Up @@ -643,6 +647,13 @@ Map<String, ReplicaId> getBootstrapReplicaMap() {
return Collections.unmodifiableMap(bootstrapReplicas);
}

/**
* Exposed for testing
*/
void clearBootstrapDiskSelectionMap() {
disksAttemptedForBootstrap.clear();
}

/**
* Exposed for testing
* @param dcName data center name
Expand Down Expand Up @@ -743,6 +754,8 @@ private void addClusterWideRawCapacity(long diskRawCapacityBytes) {
* @return bootstrap replica or {@code null} if not found.
*/
private AmbryReplica fetchBootstrapReplica(String partitionName) {
// Clear disks tried for bootstrapping this replica
disksAttemptedForBootstrap.remove(partitionName);
return (AmbryReplica) bootstrapReplicas.remove(partitionName);
}

Expand Down Expand Up @@ -860,7 +873,7 @@ private synchronized ReplicaId getBootstrapReplicaInFullAuto(String partitionIdS
}

try {
AmbryDisk disk = getDiskForBootstrapReplica((AmbryDataNode) dataNodeId, replicaCapacity);
AmbryDisk disk = getDiskForBootstrapReplica((AmbryDataNode) dataNodeId, replicaCapacity, partitionIdStr);
if (disk == null) {
logger.error("Failed to create bootstrap replica for partition {} due to insufficient disk space",
partitionIdStr);
Expand Down Expand Up @@ -947,10 +960,11 @@ private int getPartitionDefaultDiskWeight() {
* since it can be queried concurrently when multiple replicas are bootstrapped.
* @param dataNode the {@link DataNodeId} on which disk is needed
* @param replicaCapacity the capacity of the replica in bytes
* @param partitionName partition name
* @return {@link AmbryDisk} which has maximum available or free capacity. If none of the disks have free space,
* returns null.
*/
private AmbryDisk getDiskForBootstrapReplica(AmbryDataNode dataNode, long replicaCapacity) {
private AmbryDisk getDiskForBootstrapReplica(AmbryDataNode dataNode, long replicaCapacity, String partitionName) {
Set<AmbryDisk> disks = ambryDataNodeToAmbryDisks.get(dataNode);
List<AmbryDisk> potentialDisks = new ArrayList<>();
long maxAvailableDiskSpace = 0;
Expand All @@ -964,6 +978,12 @@ private AmbryDisk getDiskForBootstrapReplica(AmbryDataNode dataNode, long replic
disk, disk.getAvailableSpaceInBytes(), replicaCapacity);
continue;
}
if (disksAttemptedForBootstrap.containsKey(partitionName) && disksAttemptedForBootstrap.get(partitionName)
.contains(disk)) {
logger.info("Disk {} has already been tried before for this partition {}. Checking another disk", disk,
partitionName);
continue;
}
if (disk.getAvailableSpaceInBytes() == maxAvailableDiskSpace) {
potentialDisks.add(disk);
} else if (disk.getAvailableSpaceInBytes() > maxAvailableDiskSpace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,7 @@ public void getNewReplicaInFullAutoDiskUsageTest() throws Exception {
disk.setDiskAvailableSpace(replicaCapacity);
}
}
helixClusterManager.clearBootstrapDiskSelectionMap();
ReplicaId bootstrapReplica = helixClusterManager.getBootstrapReplica(partition.toPathString(), ambryDataNode);
assertEquals("Mismatch in disk used", expectedDisk, bootstrapReplica.getDiskId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.github.ambry.account.AccountService;
import com.github.ambry.clustermap.DiskId;
import com.github.ambry.clustermap.HardwareState;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.clustermap.ReplicaStatusDelegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,29 +575,43 @@ private class PartitionStateChangeListenerImpl implements PartitionStateChangeLi
public void onPartitionBecomeBootstrapFromOffline(String partitionName) {
// check if partition exists on current node
ReplicaId replica = partitionNameToReplicaId.get(partitionName);
Store store = null;
Store store;
if (replica == null) {
// there can be two scenarios:
// 1. this is the first time to add new replica onto current node;
// 2. last replica addition failed at some point before updating InstanceConfig in Helix
// In either case, we should add replica to current node by calling "addBlobStore(ReplicaId replica)"
ReplicaId replicaToAdd = clusterMap.getBootstrapReplica(partitionName, currentNode);
if (replicaToAdd == null) {
logger.error("No new replica found for partition {} in cluster map", partitionName);
throw new StateTransitionException(
"New replica " + partitionName + " is not found in clustermap for " + currentNode, ReplicaNotFound);
}
// Attempt to add store into storage manager. If store already exists on disk (but not in clustermap), make
// sure old store of this replica is deleted (this store may be created in previous replica addition but failed
// at some point). Then a brand new store associated with this replica should be created and started.
if (!addBlobStore(replicaToAdd)) {
// We have decreased the available disk space in HelixClusterManager#getDiskForBootstrapReplica. Increase it
// back since addition of store failed.
replicaToAdd.getDiskId().increaseAvailableSpaceInBytes(replicaToAdd.getCapacityInBytes());
logger.error("Failed to add store {} into storage manager", partitionName);
throw new StateTransitionException("Failed to add store " + partitionName + " into storage manager",
ReplicaOperationFailure);
}

ReplicaId replicaToAdd;
boolean replicaAdded = false;
do {
// there can be two scenarios:
// 1. this is the first time to add new replica onto current node;
// 2. last replica addition failed at some point before updating InstanceConfig in Helix
// In either case, we should add replica to current node by calling "addBlobStore(ReplicaId replica)"
replicaToAdd = clusterMap.getBootstrapReplica(partitionName, currentNode);
if (replicaToAdd == null) {
logger.error("No new replica found for partition {} in cluster map", partitionName);
throw new StateTransitionException(
"New replica " + partitionName + " is not found in clustermap for " + currentNode, ReplicaNotFound);
}
// Attempt to add store into storage manager. If store already exists on disk (but not in clustermap), make
// sure old store of this replica is deleted (this store may be created in previous replica addition but failed
// at some point). Then a brand new store associated with this replica should be created and started.
if (!addBlobStore(replicaToAdd)) {
// We have decreased the available disk space in HelixClusterManager#getDiskForBootstrapReplica. Increase it
// back since addition of store failed.
replicaToAdd.getDiskId().increaseAvailableSpaceInBytes(replicaToAdd.getCapacityInBytes());
if (!clusterMap.isDataNodeInFullAutoMode(currentNode)) {
logger.error("Failed to add store {} into storage manager", partitionName);
throw new StateTransitionException("Failed to add store " + partitionName + " into storage manager",
ReplicaOperationFailure);
} else {
// TODO: Delete any files added in store and reserve directory
logger.info("Failed to add store {} at location {}. Retrying bootstrapping replica at different location",
partitionName, replicaToAdd.getReplicaPath());
}
} else {
replicaAdded = true;
}
} while (!replicaAdded);

if (primaryClusterParticipant != null) {
// update InstanceConfig in Helix
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,50 @@ public void residualDirDeletionTest() throws Exception {
shutdownAndAssertStoresInaccessible(storageManager, replicas);
}

/**
* Test bootstrap retries in Full auto mode
* @throws Exception
*/
@Test
public void replicaFromOfflineToBootstrapFailureRetryTest() throws Exception {
generateConfigs(true, false);
MockClusterMap spyClusterMap = spy(clusterMap);
MockDataNodeId localNode = spyClusterMap.getDataNodes().get(0);
List<ReplicaId> localReplicas = spyClusterMap.getReplicaIds(localNode);
MockClusterParticipant mockHelixParticipant = new MockClusterParticipant();
StorageManager storageManager =
new StorageManager(storeConfig, diskManagerConfig, Utils.newScheduler(1, false), metricRegistry,
new MockIdFactory(), spyClusterMap, localNode, new DummyMessageStoreHardDelete(),
Collections.singletonList(mockHelixParticipant), SystemTime.getInstance(), new DummyMessageStoreRecovery(),
new InMemAccountService(false, false));
storageManager.start();

// 0. Mock the cluster to be in Full auto
doReturn(true).when(spyClusterMap).isDataNodeInFullAutoMode(any());

// 1. Create "newReplica1" and shutdown its disk
PartitionId newPartition1 = spyClusterMap.createNewPartition(Collections.singletonList(localNode), 0);
ReplicaId newReplica1 = newPartition1.getReplicaIds().get(0);
ReplicaId replicaOnSameDisk =
localReplicas.stream().filter(r -> r.getDiskId().equals(newReplica1.getDiskId())).findFirst().get();
storageManager.getDiskManager(replicaOnSameDisk.getPartitionId()).shutdown();

// 2. Create "newReplica2" which has disk running
PartitionId newPartition2 = spyClusterMap.createNewPartition(Collections.singletonList(localNode), 1);
ReplicaId newReplica2 = newPartition2.getReplicaIds().get(0);

// 3. Return "newReplica1" on 1st attempt and "newReplica2" on 2nd attempt
doReturn(newReplica1, newReplica2).when(spyClusterMap).getBootstrapReplica(any(), any());

// 4. Invoke bootstrap ST. It should pass on 2nd attempt.
mockHelixParticipant.onPartitionBecomeBootstrapFromOffline(newPartition1.toPathString());

// 5. Verify getBootstrap replica is called 2 times
verify(spyClusterMap, times(2)).getBootstrapReplica(anyString(), any());

shutdownAndAssertStoresInaccessible(storageManager, localReplicas);
}

/**
* Test disk failure handler with real helix clustermap and helix participant.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ public PartitionId createNewPartition(List<MockDataNodeId> dataNodes) {
* @param mountPathIndexToUse the mount path index to use when creating new partition
* @return new {@link PartitionId}
*/
PartitionId createNewPartition(List<MockDataNodeId> dataNodes, int mountPathIndexToUse) {
public PartitionId createNewPartition(List<MockDataNodeId> dataNodes, int mountPathIndexToUse) {
MockPartitionId partitionId =
new MockPartitionId(partitions.size(), DEFAULT_PARTITION_CLASS, dataNodes, mountPathIndexToUse);
partitions.put((long) partitions.size(), partitionId);
Expand Down

0 comments on commit c9537d5

Please sign in to comment.