Skip to content

Commit

Permalink
[Improvement] Quickly delete local or HDFS data at the shuffleId level.
Browse files Browse the repository at this point in the history
  • Loading branch information
yl09099 committed Sep 9, 2024
1 parent 456e119 commit a6e8d64
Show file tree
Hide file tree
Showing 12 changed files with 56 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void registerShuffle(
taskInfo.refreshLatestStageAttemptNumber(shuffleId, stageAttemptNumber);
try {
long start = System.currentTimeMillis();
shuffleServer.getShuffleTaskManager().quickRemoveShuffleDataSync(appId, shuffleId);
shuffleServer.getShuffleTaskManager().softRemoveShuffleDataSync(appId, shuffleId);
LOG.info(
"Deleted the previous stage attempt data due to stage recomputing for app: {}, "
+ "shuffleId: {}. It costs {} ms",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ public class ShuffleServerMetrics {
public static final String TOPN_OF_ON_HADOOP_DATA_SIZE_FOR_APP =
"topN_of_on_hadoop_data_size_for_app";

private static final String TOTAL_HADOOP_SOFT_DELETE_FAILED = "total_hadoop_soft_delete_failed";
private static final String TOTAL_LOCAL_SOFT_DELETE_FAILED = "total_local_soft_delete_failed";

public static Counter.Child counterTotalAppNum;
public static Counter.Child counterTotalAppWithHugePartitionNum;
public static Counter.Child counterTotalPartitionNum;
Expand Down Expand Up @@ -232,6 +235,8 @@ public class ShuffleServerMetrics {
public static Gauge.Child gaugeReadLocalDataFileBufferSize;
public static Gauge.Child gaugeReadLocalIndexFileBufferSize;
public static Gauge.Child gaugeReadMemoryDataBufferSize;
public static Counter.Child counterLocalSoftDeleteFailed;
public static Counter.Child counterHadoopSoftDeleteFailed;

public static Gauge gaugeTotalDataSizeUsage;
public static Gauge gaugeInMemoryDataSizeUsage;
Expand Down Expand Up @@ -425,6 +430,10 @@ private static void setUpMetrics(ShuffleServerConf serverConf) {
counterTotalHugePartitionNum = metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_NUM);
counterTotalHugePartitionExceedHardLimitNum =
metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_EXCEED_HARD_LIMIT_NUM);
counterLocalSoftDeleteFailed =
metricsManager.addLabeledCounter(TOTAL_HADOOP_SOFT_DELETE_FAILED);
counterHadoopSoftDeleteFailed =
metricsManager.addLabeledCounter(TOTAL_LOCAL_SOFT_DELETE_FAILED);

gaugeLocalStorageIsWritable =
metricsManager.addGauge(LOCAL_STORAGE_IS_WRITABLE, LOCAL_DISK_PATH_LABEL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ public void removeResourcesByShuffleIds(String appId, List<Integer> shuffleIds,
withTimeoutExecution(
() -> {
storageManager.removeResources(
new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds), isQuick);
new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds));
return null;
},
storageRemoveOperationTimeoutSec,
Expand Down Expand Up @@ -1014,7 +1014,7 @@ public void removeShuffleDataSync(String appId, int shuffleId) {
* @param appId
* @param shuffleId
*/
public void quickRemoveShuffleDataSync(String appId, int shuffleId) {
public void softRemoveShuffleDataSync(String appId, int shuffleId) {
removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId), true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,18 @@ public abstract class PurgeEvent {
private String appId;
private String user;
private List<Integer> shuffleIds;
// Quick Delete or not.
private boolean isSoftDelete;

public PurgeEvent(String appId, String user, List<Integer> shuffleIds) {
this(appId, user, shuffleIds, false);
}

public PurgeEvent(String appId, String user, List<Integer> shuffleIds, boolean isSoftDelete) {
this.appId = appId;
this.user = user;
this.shuffleIds = shuffleIds;
this.isSoftDelete = isSoftDelete;
}

public String getAppId() {
Expand All @@ -44,6 +51,10 @@ public List<Integer> getShuffleIds() {
return shuffleIds;
}

public boolean isSoftDelete() {
return isSoftDelete;
}

@Override
public String toString() {
return this.getClass().getSimpleName()
Expand All @@ -56,6 +67,8 @@ public String toString() {
+ '\''
+ ", shuffleIds="
+ shuffleIds
+ ", isSoftDelete="
+ isSoftDelete
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -68,9 +66,6 @@ public class HadoopStorageManager extends SingleStorageManager {
private Map<String, HadoopStorage> appIdToStorages = JavaUtils.newConcurrentMap();
private Map<String, HadoopStorage> pathToStorages = JavaUtils.newConcurrentMap();
private final boolean isStorageAuditLogEnabled;
private final BlockingQueue<AsynchronousDeleteEvent> quickNeedDeletePaths =
Queues.newLinkedBlockingQueue();
private Thread clearNeedDeleteLocalPathThread;

HadoopStorageManager(ShuffleServerConf conf) {
super(conf);
Expand All @@ -82,7 +77,7 @@ public class HadoopStorageManager extends SingleStorageManager {
while (true) {
AsynchronousDeleteEvent asynchronousDeleteEvent = null;
try {
asynchronousDeleteEvent = quickNeedDeletePaths.take();
asynchronousDeleteEvent = softDirPaths.take();
ShuffleDeleteHandler deleteHandler =
ShuffleHandlerFactory.getInstance()
.createShuffleDeleteHandler(
Expand All @@ -107,9 +102,9 @@ public class HadoopStorageManager extends SingleStorageManager {
}
}
};
clearNeedDeleteLocalPathThread = new Thread(clearNeedDeletePath);
clearNeedDeleteLocalPathThread.setName("clearNeedDeleteHadoopPathThread");
clearNeedDeleteLocalPathThread.setDaemon(true);
clearSoftDirPathThread = new Thread(clearNeedDeletePath);
clearSoftDirPathThread.setName("clearSoftHadoopDirPathThread");
clearSoftDirPathThread.setDaemon(true);
}

@Override
Expand Down Expand Up @@ -138,11 +133,6 @@ public Storage selectStorage(ShuffleDataReadEvent event) {

@Override
public void removeResources(PurgeEvent event) {
removeResources(event, false);
}

@Override
public void removeResources(PurgeEvent event, boolean isQuick) {
String appId = event.getAppId();
HadoopStorage storage = getStorageByAppId(appId);
if (storage != null) {
Expand Down Expand Up @@ -193,13 +183,14 @@ public void removeResources(PurgeEvent event, boolean isQuick) {
storage.getStoragePath()));
}
}
if (isQuick) {
if (event.isSoftDelete()) {
AsynchronousDeleteEvent asynchronousDeleteEvent =
new AsynchronousDeleteEvent(
appId, event.getUser(), storage.getConf(), event.getShuffleIds(), deletePaths);
deleteHandler.quickDelete(asynchronousDeleteEvent);
boolean isSucess = quickNeedDeletePaths.offer(asynchronousDeleteEvent);
deleteHandler.softDelete(asynchronousDeleteEvent);
boolean isSucess = softDirPaths.offer(asynchronousDeleteEvent);
if (!isSucess) {
ShuffleServerMetrics.counterHadoopSoftDeleteFailed.inc();
LOG.warn(
"Remove the case where the clearNeedDeleteHadoopPathThread queue is full and cannot accept elements.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,9 @@ public Map<String, StorageInfo> getStorageInfo() {
}

public void removeResources(PurgeEvent event) {
removeResources(event, false);
}

public void removeResources(PurgeEvent event, boolean isQuick) {
LOG.info("Start to remove resource of {}", event);
warmStorageManager.removeResources(event, isQuick);
coldStorageManager.removeResources(event, isQuick);
warmStorageManager.removeResources(event);
coldStorageManager.removeResources(event);
}

public StorageManager getColdStorageManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand All @@ -42,7 +41,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -94,9 +92,6 @@ public class LocalStorageManager extends SingleStorageManager {
private final List<StorageMediaProvider> typeProviders = Lists.newArrayList();

private final boolean isStorageAuditLogEnabled;
private final BlockingQueue<AsynchronousDeleteEvent> quickNeedDeletePaths =
Queues.newLinkedBlockingQueue();
private Thread clearNeedDeleteLocalPathThread;

@VisibleForTesting
LocalStorageManager(ShuffleServerConf conf) {
Expand Down Expand Up @@ -193,7 +188,7 @@ public class LocalStorageManager extends SingleStorageManager {
new CreateShuffleDeleteHandlerRequest(
StorageType.LOCALFILE.name(), new Configuration()));
try {
asynchronousDeleteEvent = quickNeedDeletePaths.take();
asynchronousDeleteEvent = softDirPaths.take();
deleteHandler.delete(
asynchronousDeleteEvent.getNeedDeleteRenamePaths(),
asynchronousDeleteEvent.getAppId(),
Expand All @@ -210,9 +205,9 @@ public class LocalStorageManager extends SingleStorageManager {
}
}
};
clearNeedDeleteLocalPathThread = new Thread(clearNeedDeletePath);
clearNeedDeleteLocalPathThread.setName("clearNeedDeleteLocalPathThread");
clearNeedDeleteLocalPathThread.setDaemon(true);
clearSoftDirPathThread = new Thread(clearNeedDeletePath);
clearSoftDirPathThread.setName("clearSoftLocalDirPathThread");
clearSoftDirPathThread.setDaemon(true);
}

private StorageMedia getStorageTypeForBasePath(String basePath) {
Expand Down Expand Up @@ -304,11 +299,6 @@ public Checker getStorageChecker() {

@Override
public void removeResources(PurgeEvent event) {
removeResources(event, false);
}

@Override
public void removeResources(PurgeEvent event, boolean isQuick) {
String appId = event.getAppId();
String user = event.getUser();
List<Integer> shuffleSet =
Expand Down Expand Up @@ -370,13 +360,14 @@ public void removeResources(PurgeEvent event, boolean isQuick) {
}
})
.collect(Collectors.toList());
if (isQuick) {
if (event.isSoftDelete()) {
AsynchronousDeleteEvent asynchronousDeleteEvent =
new AsynchronousDeleteEvent(
appId, event.getUser(), null, event.getShuffleIds(), deletePaths);
deleteHandler.quickDelete(asynchronousDeleteEvent);
boolean isSucess = quickNeedDeletePaths.offer(asynchronousDeleteEvent);
deleteHandler.softDelete(asynchronousDeleteEvent);
boolean isSucess = softDirPaths.offer(asynchronousDeleteEvent);
if (!isSucess) {
ShuffleServerMetrics.counterLocalSoftDeleteFailed.inc();
LOG.warn(
"Remove the case where the clearNeedDeleteHadoopPathThread queue is full and cannot accept elements.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,6 +38,7 @@
import org.apache.uniffle.server.event.PurgeEvent;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.common.StorageWriteMetrics;
import org.apache.uniffle.storage.handler.AsynchronousDeleteEvent;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;

public abstract class SingleStorageManager implements StorageManager {
Expand All @@ -48,6 +51,9 @@ public abstract class SingleStorageManager implements StorageManager {
private final long eventSizeThresholdL3;
protected final Map<String, ApplicationStorageInfo> appStorageInfoMap =
JavaUtils.newConcurrentMap();
protected final BlockingQueue<AsynchronousDeleteEvent> softDirPaths =
Queues.newLinkedBlockingQueue();
protected Thread clearSoftDirPathThread;

public SingleStorageManager(ShuffleServerConf conf) {
writeSlowThreshold = conf.getSizeAsBytes(ShuffleServerConf.SERVER_WRITE_SLOW_THRESHOLD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ public interface StorageManager {

void removeResources(PurgeEvent event);

void removeResources(PurgeEvent event, boolean isQuick);

void start();

void stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ public interface ShuffleDeleteHandler {
void delete(String[] storageBasePaths, String appId, String user);

/** Rename the file and then delete it asynchronously. */
void quickDelete(AsynchronousDeleteEvent shuffleQuickPurgeEvent);
void softDelete(AsynchronousDeleteEvent shuffleSoftDeletePurgeEvent);
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ private void delete(FileSystem fileSystem, Path path, String filePrefix) throws
}

@Override
public void quickDelete(AsynchronousDeleteEvent asynchronousDeleteEvent) {
String appId = asynchronousDeleteEvent.getAppId();
String user = asynchronousDeleteEvent.getUser();
public void softDelete(AsynchronousDeleteEvent shuffleSoftDeletePurgeEvent) {
String appId = shuffleSoftDeletePurgeEvent.getAppId();
String user = shuffleSoftDeletePurgeEvent.getUser();
for (Map.Entry<String, String> appIdNeedDeletePaths :
asynchronousDeleteEvent.getNeedDeletePathAndRenamePath().entrySet()) {
shuffleSoftDeletePurgeEvent.getNeedDeletePathAndRenamePath().entrySet()) {
final Path path = new Path(appIdNeedDeletePaths.getKey());
final Path breakdownPathFolder = new Path(appIdNeedDeletePaths.getValue());
boolean isSuccess = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ public void delete(String[] shuffleDataStoredPath, String appId, String user) {
}

@Override
public void quickDelete(AsynchronousDeleteEvent asynchronousDeleteEvent) {
public void softDelete(AsynchronousDeleteEvent shuffleSoftDeletePurgeEvent) {

for (Map.Entry<String, String> appIdNeedDeletePaths :
asynchronousDeleteEvent.getNeedDeletePathAndRenamePath().entrySet()) {
shuffleSoftDeletePurgeEvent.getNeedDeletePathAndRenamePath().entrySet()) {
String appId = appIdNeedDeletePaths.getKey();
String shufflePath = appIdNeedDeletePaths.getKey();
String breakdownShufflePath = appIdNeedDeletePaths.getValue();
Expand Down

0 comments on commit a6e8d64

Please sign in to comment.