Skip to content

Commit

Permalink
improvement: Quickly delete local or HDFS data at the shuffleId level..
Browse files Browse the repository at this point in the history
  • Loading branch information
yl09099 committed Nov 28, 2024
1 parent 9947af7 commit 75aaa4f
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public void registerShuffle(
long start = System.currentTimeMillis();
shuffleServer
.getShuffleTaskManager()
.removeShuffleDataSyncTwoPhases(appId, shuffleId);
.removeShuffleDataSyncRenameAndDelete(appId, shuffleId);
LOG.info(
"Deleted the previous stage attempt data due to stage recomputing for app: {}, "
+ "shuffleId: {}. It costs {} ms",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,8 @@ public class ShuffleServerMetrics {
public static final String REPORTED_BLOCK_COUNT = "reported_block_count";
public static final String CACHED_BLOCK_COUNT = "cached_block_count";

private static final String TOTAL_HADOOP_TWO_PHASES_DELETION_FAILED =
"total_hadoop_two_phases_deletion_failed";
private static final String TOTAL_LOCAL_TWO_PHASES_DELETION_FAILED =
"total_local_two_phases_deletion_failed";
private static final String TOTAL_LOCAL_RENAME_AND_DELETION_FAILED =
"total_local_rename_and_deletion_failed";

public static Counter.Child counterTotalAppNum;
public static Counter.Child counterTotalAppWithHugePartitionNum;
Expand Down Expand Up @@ -250,8 +248,7 @@ public class ShuffleServerMetrics {
public static Gauge.Child gaugeReadLocalDataFileBufferSize;
public static Gauge.Child gaugeReadLocalIndexFileBufferSize;
public static Gauge.Child gaugeReadMemoryDataBufferSize;
public static Counter.Child counterLocalTwoPhasesDeletionFaileTd;
public static Counter.Child counterHadoopTwoPhasesDeletionFailed;
public static Counter.Child counterLocalRenameAndDeletionFaileTd;

public static Gauge gaugeTotalDataSizeUsage;
public static Gauge gaugeInMemoryDataSizeUsage;
Expand Down Expand Up @@ -447,10 +444,8 @@ private static void setUpMetrics(ShuffleServerConf serverConf) {
counterTotalHugePartitionNum = metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_NUM);
counterTotalHugePartitionExceedHardLimitNum =
metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_EXCEED_HARD_LIMIT_NUM);
counterLocalTwoPhasesDeletionFaileTd =
metricsManager.addLabeledCounter(TOTAL_LOCAL_TWO_PHASES_DELETION_FAILED);
counterHadoopTwoPhasesDeletionFailed =
metricsManager.addLabeledCounter(TOTAL_HADOOP_TWO_PHASES_DELETION_FAILED);
counterLocalRenameAndDeletionFaileTd =
metricsManager.addLabeledCounter(TOTAL_LOCAL_RENAME_AND_DELETION_FAILED);

gaugeLocalStorageIsWritable =
metricsManager.addGauge(LOCAL_STORAGE_IS_WRITABLE, LOCAL_DISK_PATH_LABEL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ public void removeResourcesByShuffleIds(String appId, List<Integer> shuffleIds)
}

public void removeResourcesByShuffleIds(
String appId, List<Integer> shuffleIds, boolean isTwoPhases) {
String appId, List<Integer> shuffleIds, boolean isRenameAndDelete) {
Lock writeLock = getAppWriteLock(appId);
writeLock.lock();
try {
Expand Down Expand Up @@ -871,7 +871,7 @@ public void removeResourcesByShuffleIds(
withTimeoutExecution(
() -> {
storageManager.removeResources(
new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds, isTwoPhases));
new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds, isRenameAndDelete));
return null;
},
storageRemoveOperationTimeoutSec,
Expand Down Expand Up @@ -1079,7 +1079,7 @@ public void removeShuffleDataSync(String appId, int shuffleId) {
removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId));
}

public void removeShuffleDataSyncTwoPhases(String appId, int shuffleId) {
public void removeShuffleDataSyncRenameAndDelete(String appId, int shuffleId) {
removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId), true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ public abstract class PurgeEvent {
private String appId;
private String user;
private List<Integer> shuffleIds;
// Quick Delete or not.
private boolean isTwoPhasesDeletion;
// Whether to enable the deletion mode: Rename files and then delete them asynchronously.
private boolean isRenameAndDelete;

public PurgeEvent(String appId, String user, List<Integer> shuffleIds) {
this(appId, user, shuffleIds, false);
}

public PurgeEvent(
String appId, String user, List<Integer> shuffleIds, boolean isTwoPhasesDeletion) {
String appId, String user, List<Integer> shuffleIds, boolean isRenameAndDelete) {
this.appId = appId;
this.user = user;
this.shuffleIds = shuffleIds;
this.isTwoPhasesDeletion = isTwoPhasesDeletion;
this.isRenameAndDelete = isRenameAndDelete;
}

public String getAppId() {
Expand All @@ -52,8 +52,8 @@ public List<Integer> getShuffleIds() {
return shuffleIds;
}

public boolean isTwoPhasesDeletion() {
return isTwoPhasesDeletion;
public boolean isRenameAndDelete() {
return isRenameAndDelete;
}

@Override
Expand All @@ -68,8 +68,8 @@ public String toString() {
+ '\''
+ ", shuffleIds="
+ shuffleIds
+ ", isTwoPhasesDeletion="
+ isTwoPhasesDeletion
+ ", isRenameAndDelete="
+ isRenameAndDelete
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public ShufflePurgeEvent(String appId, String user, List<Integer> shuffleIds) {
}

public ShufflePurgeEvent(
String appId, String user, List<Integer> shuffleIds, boolean isTwoPhases) {
super(appId, user, shuffleIds, isTwoPhases);
String appId, String user, List<Integer> shuffleIds, boolean isRenameAndDelete) {
super(appId, user, shuffleIds, isRenameAndDelete);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void removeResources(PurgeEvent event) {
StorageType.HDFS.name(),
storage.getConf(),
purgeForExpired ? shuffleServerId : null,
event.isTwoPhasesDeletion()));
event.isRenameAndDelete()));

String basicPath =
ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);
Expand Down Expand Up @@ -150,11 +150,7 @@ public void removeResources(PurgeEvent event) {
storage.getStoragePath()));
}
}
boolean isSuccess =
deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser());
if (!isSuccess && event.isTwoPhasesDeletion()) {
ShuffleServerMetrics.counterLocalTwoPhasesDeletionFaileTd.inc();
}
deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser());
removeAppStorageInfo(event);
} else {
LOG.warn("Storage gotten is null when removing resources for event: {}", event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,7 @@ public void removeResources(PurgeEvent event) {
ShuffleHandlerFactory.getInstance()
.createShuffleDeleteHandler(
new CreateShuffleDeleteHandlerRequest(
StorageType.LOCALFILE.name(),
new Configuration(),
event.isTwoPhasesDeletion()));
StorageType.LOCALFILE.name(), new Configuration(), event.isRenameAndDelete()));

List<String> deletePaths =
localStorages.stream()
Expand Down Expand Up @@ -356,8 +354,8 @@ public void removeResources(PurgeEvent event) {

boolean isSuccess =
deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user);
if (!isSuccess && event.isTwoPhasesDeletion()) {
ShuffleServerMetrics.counterHadoopTwoPhasesDeletionFailed.inc();
if (!isSuccess && event.isRenameAndDelete()) {
ShuffleServerMetrics.counterLocalRenameAndDeletionFaileTd.inc();
}
removeAppStorageInfo(event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.uniffle.storage.handler.impl.AsynDeletionEventManager;
import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
import org.apache.uniffle.storage.handler.impl.HadoopClientReadHandler;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleAsyncDeleteHandler;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleDeleteHandler;
import org.apache.uniffle.storage.handler.impl.LocalFileAsyncDeleteHandler;
import org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler;
Expand Down Expand Up @@ -187,10 +186,7 @@ private ClientReadHandler getHadoopClientReadHandler(

public ShuffleDeleteHandler createShuffleDeleteHandler(
CreateShuffleDeleteHandlerRequest request) {
if (StorageType.HDFS.name().equals(request.getStorageType()) && request.isAsync()) {
return new HadoopShuffleAsyncDeleteHandler(
request.getConf(), request.getShuffleServerId(), AsynDeletionEventManager.getInstance());
} else if (StorageType.HDFS.name().equals(request.getStorageType()) && !request.isAsync()) {
if (StorageType.HDFS.name().equals(request.getStorageType())) {
return new HadoopShuffleDeleteHandler(request.getConf(), request.getShuffleServerId());
} else if (StorageType.LOCALFILE.name().equals(request.getStorageType()) && request.isAsync()) {
return new LocalFileAsyncDeleteHandler(AsynDeletionEventManager.getInstance());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
import org.apache.uniffle.storage.request.CreateShuffleDeleteHandlerRequest;
import org.apache.uniffle.storage.util.StorageType;

/**
* To quickly delete the Shuffle Data that has been dropped to the disk, you need to rename the data
* first and then encapsulate the data into an asynchronous deletion event. This function is used to
* manage the actual execution of the asynchronous deletion event.
*/
public class AsynDeletionEventManager {
private static final Logger LOG = LoggerFactory.getLogger(AsynDeletionEventManager.class);

Expand All @@ -42,17 +47,17 @@ public static synchronized AsynDeletionEventManager getInstance() {
return INSTANCE;
}

protected final BlockingQueue<AsynDeletionEvent> twoPhasesDeletionEventQueue =
protected final BlockingQueue<AsynDeletionEvent> renameAndAsynDeleteEventQueue =
Queues.newLinkedBlockingQueue();
protected Thread twoPhasesDeletionThread;
protected Thread renameAndAsynDeleteThread;

public AsynDeletionEventManager() {
Runnable twoPhasesDeletionTask =
Runnable renameAndDeletionTask =
() -> {
while (true) {
AsynDeletionEvent asynDeletionEvent = null;
try {
asynDeletionEvent = twoPhasesDeletionEventQueue.take();
asynDeletionEvent = renameAndAsynDeleteEventQueue.take();
if (asynDeletionEvent
.getStorageType()
.equalsIgnoreCase(StorageType.LOCALFILE.name())) {
Expand Down Expand Up @@ -85,18 +90,18 @@ public AsynDeletionEventManager() {
LOG.error(
"Delete Paths of {} failed.", asynDeletionEvent.getNeedDeleteRenamePaths(), e);
} else {
LOG.error("Failed to delete a directory in twoPhasesDeletionThread.", e);
LOG.error("Failed to delete a directory in renameAndAsynDeleteThread.", e);
}
}
}
};
twoPhasesDeletionThread = new Thread(twoPhasesDeletionTask);
twoPhasesDeletionThread.setName("twoPhasesDeletionThread");
twoPhasesDeletionThread.setDaemon(true);
twoPhasesDeletionThread.start();
renameAndAsynDeleteThread = new Thread(renameAndDeletionTask);
renameAndAsynDeleteThread.setName("renameAndAsynDeleteThread");
renameAndAsynDeleteThread.setDaemon(true);
renameAndAsynDeleteThread.start();
}

public synchronized boolean handlerDeletionQueue(AsynDeletionEvent asynDeletionEvent) {
return twoPhasesDeletionEventQueue.offer(asynDeletionEvent);
public synchronized boolean handlerAsynDelete(AsynDeletionEvent asynDeletionEvent) {
return renameAndAsynDeleteEventQueue.offer(asynDeletionEvent);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ public boolean delete(String[] storageBasePaths, String appId, String user) {
e);
}
}
if (!asynDeletionEventManager.handlerDeletionQueue(asynDeletionEvent)) {
if (!asynDeletionEventManager.handlerAsynDelete(asynDeletionEvent)) {
LOG.warn(
"Remove the case where the twoPhasesDeletionEventQueue queue is full and cannot accept elements.");
"Remove the case where the renameAndDeletionEventQueue queue is full and cannot accept elements.");
return false;
}
return true;
Expand Down

0 comments on commit 75aaa4f

Please sign in to comment.