From 75aaa4f29024a235ca5208719ebc6643670ae570 Mon Sep 17 00:00:00 2001 From: yl09099 Date: Thu, 28 Nov 2024 11:22:27 +0800 Subject: [PATCH] improvement: Quickly delete local or HDFS data at the shuffleId level.. --- .../server/ShuffleServerGrpcService.java | 2 +- .../uniffle/server/ShuffleServerMetrics.java | 15 +-- .../uniffle/server/ShuffleTaskManager.java | 6 +- .../uniffle/server/event/PurgeEvent.java | 16 +-- .../server/event/ShufflePurgeEvent.java | 4 +- .../server/storage/HadoopStorageManager.java | 8 +- .../server/storage/LocalStorageManager.java | 8 +- .../factory/ShuffleHandlerFactory.java | 6 +- .../impl/AsynDeletionEventManager.java | 27 ++-- .../impl/HadoopShuffleAsyncDeleteHandler.java | 124 ------------------ .../impl/LocalFileAsyncDeleteHandler.java | 4 +- 11 files changed, 43 insertions(+), 177 deletions(-) delete mode 100644 storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleAsyncDeleteHandler.java diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index 417e7c08d9..77dcc21376 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -256,7 +256,7 @@ public void registerShuffle( long start = System.currentTimeMillis(); shuffleServer .getShuffleTaskManager() - .removeShuffleDataSyncTwoPhases(appId, shuffleId); + .removeShuffleDataSyncRenameAndDelete(appId, shuffleId); LOG.info( "Deleted the previous stage attempt data due to stage recomputing for app: {}, " + "shuffleId: {}. It costs {} ms", diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java index fb5ec10aea..939dbf8da6 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java @@ -170,10 +170,8 @@ public class ShuffleServerMetrics { public static final String REPORTED_BLOCK_COUNT = "reported_block_count"; public static final String CACHED_BLOCK_COUNT = "cached_block_count"; - private static final String TOTAL_HADOOP_TWO_PHASES_DELETION_FAILED = - "total_hadoop_two_phases_deletion_failed"; - private static final String TOTAL_LOCAL_TWO_PHASES_DELETION_FAILED = - "total_local_two_phases_deletion_failed"; + private static final String TOTAL_LOCAL_RENAME_AND_DELETION_FAILED = + "total_local_rename_and_deletion_failed"; public static Counter.Child counterTotalAppNum; public static Counter.Child counterTotalAppWithHugePartitionNum; @@ -250,8 +248,7 @@ public class ShuffleServerMetrics { public static Gauge.Child gaugeReadLocalDataFileBufferSize; public static Gauge.Child gaugeReadLocalIndexFileBufferSize; public static Gauge.Child gaugeReadMemoryDataBufferSize; - public static Counter.Child counterLocalTwoPhasesDeletionFaileTd; - public static Counter.Child counterHadoopTwoPhasesDeletionFailed; + public static Counter.Child counterLocalRenameAndDeletionFaileTd; public static Gauge gaugeTotalDataSizeUsage; public static Gauge gaugeInMemoryDataSizeUsage; @@ -447,10 +444,8 @@ private static void setUpMetrics(ShuffleServerConf serverConf) { counterTotalHugePartitionNum = metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_NUM); counterTotalHugePartitionExceedHardLimitNum = metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_EXCEED_HARD_LIMIT_NUM); - counterLocalTwoPhasesDeletionFaileTd = - metricsManager.addLabeledCounter(TOTAL_LOCAL_TWO_PHASES_DELETION_FAILED); - counterHadoopTwoPhasesDeletionFailed = - metricsManager.addLabeledCounter(TOTAL_HADOOP_TWO_PHASES_DELETION_FAILED); + counterLocalRenameAndDeletionFaileTd = + metricsManager.addLabeledCounter(TOTAL_LOCAL_RENAME_AND_DELETION_FAILED); gaugeLocalStorageIsWritable = metricsManager.addGauge(LOCAL_STORAGE_IS_WRITABLE, LOCAL_DISK_PATH_LABEL); diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index a7762c12a4..5f54c9e448 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -838,7 +838,7 @@ public void removeResourcesByShuffleIds(String appId, List shuffleIds) } public void removeResourcesByShuffleIds( - String appId, List shuffleIds, boolean isTwoPhases) { + String appId, List shuffleIds, boolean isRenameAndDelete) { Lock writeLock = getAppWriteLock(appId); writeLock.lock(); try { @@ -871,7 +871,7 @@ public void removeResourcesByShuffleIds( withTimeoutExecution( () -> { storageManager.removeResources( - new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds, isTwoPhases)); + new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds, isRenameAndDelete)); return null; }, storageRemoveOperationTimeoutSec, @@ -1079,7 +1079,7 @@ public void removeShuffleDataSync(String appId, int shuffleId) { removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId)); } - public void removeShuffleDataSyncTwoPhases(String appId, int shuffleId) { + public void removeShuffleDataSyncRenameAndDelete(String appId, int shuffleId) { removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId), true); } diff --git a/server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java b/server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java index ec9fa283da..f890d44d4d 100644 --- a/server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java +++ b/server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java @@ -25,19 +25,19 @@ public abstract class PurgeEvent { private String appId; private String user; private List shuffleIds; - // Quick Delete or not. - private boolean isTwoPhasesDeletion; + // Whether to enable the deletion mode: Rename files and then delete them asynchronously. + private boolean isRenameAndDelete; public PurgeEvent(String appId, String user, List shuffleIds) { this(appId, user, shuffleIds, false); } public PurgeEvent( - String appId, String user, List shuffleIds, boolean isTwoPhasesDeletion) { + String appId, String user, List shuffleIds, boolean isRenameAndDelete) { this.appId = appId; this.user = user; this.shuffleIds = shuffleIds; - this.isTwoPhasesDeletion = isTwoPhasesDeletion; + this.isRenameAndDelete = isRenameAndDelete; } public String getAppId() { @@ -52,8 +52,8 @@ public List getShuffleIds() { return shuffleIds; } - public boolean isTwoPhasesDeletion() { - return isTwoPhasesDeletion; + public boolean isRenameAndDelete() { + return isRenameAndDelete; } @Override @@ -68,8 +68,8 @@ public String toString() { + '\'' + ", shuffleIds=" + shuffleIds - + ", isTwoPhasesDeletion=" - + isTwoPhasesDeletion + + ", isRenameAndDelete=" + + isRenameAndDelete + '}'; } } diff --git a/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java b/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java index 32750990b0..c7ea277c75 100644 --- a/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java +++ b/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java @@ -26,7 +26,7 @@ public ShufflePurgeEvent(String appId, String user, List shuffleIds) { } public ShufflePurgeEvent( - String appId, String user, List shuffleIds, boolean isTwoPhases) { - super(appId, user, shuffleIds, isTwoPhases); + String appId, String user, List shuffleIds, boolean isRenameAndDelete) { + super(appId, user, shuffleIds, isRenameAndDelete); } } diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java index 658cf04184..628004eedb 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java @@ -115,7 +115,7 @@ public void removeResources(PurgeEvent event) { StorageType.HDFS.name(), storage.getConf(), purgeForExpired ? shuffleServerId : null, - event.isTwoPhasesDeletion())); + event.isRenameAndDelete())); String basicPath = ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId); @@ -150,11 +150,7 @@ public void removeResources(PurgeEvent event) { storage.getStoragePath())); } } - boolean isSuccess = - deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser()); - if (!isSuccess && event.isTwoPhasesDeletion()) { - ShuffleServerMetrics.counterLocalTwoPhasesDeletionFaileTd.inc(); - } + deleteHandler.delete(deletePaths.toArray(new String[0]), appId, event.getUser()); removeAppStorageInfo(event); } else { LOG.warn("Storage gotten is null when removing resources for event: {}", event); diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index 950934f651..5f7589513d 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -311,9 +311,7 @@ public void removeResources(PurgeEvent event) { ShuffleHandlerFactory.getInstance() .createShuffleDeleteHandler( new CreateShuffleDeleteHandlerRequest( - StorageType.LOCALFILE.name(), - new Configuration(), - event.isTwoPhasesDeletion())); + StorageType.LOCALFILE.name(), new Configuration(), event.isRenameAndDelete())); List deletePaths = localStorages.stream() @@ -356,8 +354,8 @@ public void removeResources(PurgeEvent event) { boolean isSuccess = deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user); - if (!isSuccess && event.isTwoPhasesDeletion()) { - ShuffleServerMetrics.counterHadoopTwoPhasesDeletionFailed.inc(); + if (!isSuccess && event.isRenameAndDelete()) { + ShuffleServerMetrics.counterLocalRenameAndDeletionFaileTd.inc(); } removeAppStorageInfo(event); } diff --git a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java index 6e10601b66..9e1a4b7aab 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java +++ b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java @@ -35,7 +35,6 @@ import org.apache.uniffle.storage.handler.impl.AsynDeletionEventManager; import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler; import org.apache.uniffle.storage.handler.impl.HadoopClientReadHandler; -import org.apache.uniffle.storage.handler.impl.HadoopShuffleAsyncDeleteHandler; import org.apache.uniffle.storage.handler.impl.HadoopShuffleDeleteHandler; import org.apache.uniffle.storage.handler.impl.LocalFileAsyncDeleteHandler; import org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler; @@ -187,10 +186,7 @@ private ClientReadHandler getHadoopClientReadHandler( public ShuffleDeleteHandler createShuffleDeleteHandler( CreateShuffleDeleteHandlerRequest request) { - if (StorageType.HDFS.name().equals(request.getStorageType()) && request.isAsync()) { - return new HadoopShuffleAsyncDeleteHandler( - request.getConf(), request.getShuffleServerId(), AsynDeletionEventManager.getInstance()); - } else if (StorageType.HDFS.name().equals(request.getStorageType()) && !request.isAsync()) { + if (StorageType.HDFS.name().equals(request.getStorageType())) { return new HadoopShuffleDeleteHandler(request.getConf(), request.getShuffleServerId()); } else if (StorageType.LOCALFILE.name().equals(request.getStorageType()) && request.isAsync()) { return new LocalFileAsyncDeleteHandler(AsynDeletionEventManager.getInstance()); diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/AsynDeletionEventManager.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/AsynDeletionEventManager.java index 1a2a9f9b66..a4398e8413 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/AsynDeletionEventManager.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/AsynDeletionEventManager.java @@ -30,6 +30,11 @@ import org.apache.uniffle.storage.request.CreateShuffleDeleteHandlerRequest; import org.apache.uniffle.storage.util.StorageType; +/** + * To quickly delete the Shuffle Data that has been dropped to the disk, you need to rename the data + * first and then encapsulate the data into an asynchronous deletion event. This function is used to + * manage the actual execution of the asynchronous deletion event. + */ public class AsynDeletionEventManager { private static final Logger LOG = LoggerFactory.getLogger(AsynDeletionEventManager.class); @@ -42,17 +47,17 @@ public static synchronized AsynDeletionEventManager getInstance() { return INSTANCE; } - protected final BlockingQueue twoPhasesDeletionEventQueue = + protected final BlockingQueue renameAndAsynDeleteEventQueue = Queues.newLinkedBlockingQueue(); - protected Thread twoPhasesDeletionThread; + protected Thread renameAndAsynDeleteThread; public AsynDeletionEventManager() { - Runnable twoPhasesDeletionTask = + Runnable renameAndDeletionTask = () -> { while (true) { AsynDeletionEvent asynDeletionEvent = null; try { - asynDeletionEvent = twoPhasesDeletionEventQueue.take(); + asynDeletionEvent = renameAndAsynDeleteEventQueue.take(); if (asynDeletionEvent .getStorageType() .equalsIgnoreCase(StorageType.LOCALFILE.name())) { @@ -85,18 +90,18 @@ public AsynDeletionEventManager() { LOG.error( "Delete Paths of {} failed.", asynDeletionEvent.getNeedDeleteRenamePaths(), e); } else { - LOG.error("Failed to delete a directory in twoPhasesDeletionThread.", e); + LOG.error("Failed to delete a directory in renameAndAsynDeleteThread.", e); } } } }; - twoPhasesDeletionThread = new Thread(twoPhasesDeletionTask); - twoPhasesDeletionThread.setName("twoPhasesDeletionThread"); - twoPhasesDeletionThread.setDaemon(true); - twoPhasesDeletionThread.start(); + renameAndAsynDeleteThread = new Thread(renameAndDeletionTask); + renameAndAsynDeleteThread.setName("renameAndAsynDeleteThread"); + renameAndAsynDeleteThread.setDaemon(true); + renameAndAsynDeleteThread.start(); } - public synchronized boolean handlerDeletionQueue(AsynDeletionEvent asynDeletionEvent) { - return twoPhasesDeletionEventQueue.offer(asynDeletionEvent); + public synchronized boolean handlerAsynDelete(AsynDeletionEvent asynDeletionEvent) { + return renameAndAsynDeleteEventQueue.offer(asynDeletionEvent); } } diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleAsyncDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleAsyncDeleteHandler.java deleted file mode 100644 index 3e49e6e022..0000000000 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleAsyncDeleteHandler.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.uniffle.storage.handler.impl; - -import java.io.FileNotFoundException; -import java.util.Arrays; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider; -import org.apache.uniffle.storage.handler.AsynDeletionEvent; -import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler; -import org.apache.uniffle.storage.util.StorageType; - -public class HadoopShuffleAsyncDeleteHandler implements ShuffleDeleteHandler { - private static final Logger LOG = LoggerFactory.getLogger(HadoopShuffleAsyncDeleteHandler.class); - private final String shuffleServerId; - private Configuration hadoopConf; - private AsynDeletionEventManager asynDeletionEventManager; - - public HadoopShuffleAsyncDeleteHandler( - Configuration hadoopConf, - String shuffleServerId, - AsynDeletionEventManager asynDeletionEventManager) { - this.hadoopConf = hadoopConf; - this.shuffleServerId = shuffleServerId; - this.asynDeletionEventManager = asynDeletionEventManager; - } - - /** Rename the file and then delete it asynchronously. */ - @Override - public boolean delete(String[] storageBasePaths, String appId, String user) { - AsynDeletionEvent asynDeletionEvent = - new AsynDeletionEvent( - appId, - user, - hadoopConf, - shuffleServerId, - Arrays.asList(storageBasePaths), - StorageType.HDFS.name()); - for (Map.Entry appIdNeedDeletePaths : - asynDeletionEvent.getNeedDeletePathAndRenamePath().entrySet()) { - final Path path = new Path(appIdNeedDeletePaths.getKey()); - final Path breakdownPathFolder = new Path(appIdNeedDeletePaths.getValue()); - boolean isExists = false; - boolean isSuccess = false; - int times = 0; - int retryMax = 5; - long start = System.currentTimeMillis(); - LOG.info( - "Try rename shuffle data in Hadoop FS for appId[{}] of user[{}] with {}", - appId, - user, - path); - while (!isSuccess && times < retryMax) { - try { - FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(user, path, hadoopConf); - isExists = fileSystem.exists(path); - if (isExists) { - isSuccess = fileSystem.rename(path, breakdownPathFolder); - } else { - break; - } - } catch (Exception e) { - if (e instanceof FileNotFoundException) { - LOG.info("[{}] doesn't exist, ignore it.", path); - return false; - } - times++; - LOG.warn("Can't rename shuffle data for appId[{}] with {} times", appId, times, e); - try { - Thread.sleep(1000); - } catch (Exception ex) { - LOG.warn("Exception happened when Thread.sleep", ex); - } - } - } - if (isExists) { - if (isSuccess) { - LOG.info( - "Rename shuffle data in Hadoop FS for appId[{}] with {} successfully in {} ms", - appId, - path, - (System.currentTimeMillis() - start)); - } else { - LOG.warn( - "Failed to rename shuffle data in Hadoop FS for appId [{}] with {} successfully in {} ms", - appId, - path, - (System.currentTimeMillis() - start)); - } - } else { - LOG.info( - "Rename shuffle data in Hadoop FS for appId[{}] with {} is not exists", appId, path); - } - } - if (!asynDeletionEventManager.handlerDeletionQueue(asynDeletionEvent)) { - LOG.warn( - "Remove the case where the twoPhasesDeletionEventQueue queue is full and cannot accept elements."); - return false; - } - return true; - } -} diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileAsyncDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileAsyncDeleteHandler.java index 31a68251de..cdec039f4e 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileAsyncDeleteHandler.java +++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileAsyncDeleteHandler.java @@ -83,9 +83,9 @@ public boolean delete(String[] storageBasePaths, String appId, String user) { e); } } - if (!asynDeletionEventManager.handlerDeletionQueue(asynDeletionEvent)) { + if (!asynDeletionEventManager.handlerAsynDelete(asynDeletionEvent)) { LOG.warn( - "Remove the case where the twoPhasesDeletionEventQueue queue is full and cannot accept elements."); + "Remove the case where the renameAndDeletionEventQueue queue is full and cannot accept elements."); return false; } return true;