From d5cf3563a7fff681a2dcc3da76260b9e730a406b Mon Sep 17 00:00:00 2001 From: sahibamatta Date: Fri, 5 Jul 2024 18:11:14 -0400 Subject: [PATCH 1/7] [#1863] fix(server): Update Comitted Block Ids Even if Storage Manager Write Fails --- .../apache/uniffle/server/ShuffleFlushManager.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java index 9237884fdc..4daee26c79 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java @@ -162,13 +162,20 @@ public void processFlushEvent(ShuffleDataFlushEvent event) throws Exception { user, maxConcurrencyPerPartitionToWrite); ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request); - boolean writeSuccess = storageManager.write(storage, handler, event); - if (!writeSuccess) { - throw new EventRetryException(); + boolean writeSuccess = false; + try { + writeSuccess = storageManager.write(storage, handler, event); + } catch(Exception e) { + // do nothing, log exception and proceed + LOG.error("storageManager write error.", e); } // update some metrics for shuffle task updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), event.getShuffleBlocks()); + + if (!writeSuccess) { + throw new EventRetryException(); + } ShuffleTaskInfo shuffleTaskInfo = shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(event.getAppId()); if (null != shuffleTaskInfo) { From ed4bafcb533c3f131b0a6c906e58c3d820a05ac0 Mon Sep 17 00:00:00 2001 From: sahibamatta Date: Tue, 16 Jul 2024 18:29:43 -0400 Subject: [PATCH 2/7] Fail commitShuffle when flush event write fails --- .../runConfigurations/ShuffleServer.run.xml | 34 ------------------- .../uniffle/server/ShuffleFlushManager.java | 20 +++++++---- 2 files changed, 13 insertions(+), 41 deletions(-) delete mode 100644 dev/intellij/runConfigurations/ShuffleServer.run.xml diff --git a/dev/intellij/runConfigurations/ShuffleServer.run.xml b/dev/intellij/runConfigurations/ShuffleServer.run.xml deleted file mode 100644 index 4a05a752e5..0000000000 --- a/dev/intellij/runConfigurations/ShuffleServer.run.xml +++ /dev/null @@ -1,34 +0,0 @@ - - - - - diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java index 4daee26c79..c8f8a21260 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java @@ -65,6 +65,7 @@ public class ShuffleFlushManager { private final StorageManager storageManager; private final long pendingEventTimeoutSec; private FlushEventHandler eventHandler; + private boolean writeError = false; public ShuffleFlushManager( ShuffleServerConf shuffleServerConf, @@ -161,21 +162,22 @@ public void processFlushEvent(ShuffleDataFlushEvent event) throws Exception { storageDataReplica, user, maxConcurrencyPerPartitionToWrite); - ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request); + boolean writeSuccess = false; try { + ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request); writeSuccess = storageManager.write(storage, handler, event); - } catch(Exception e) { - // do nothing, log exception and proceed + } catch (Exception e) { LOG.error("storageManager write error.", e); + writeError = true; } - // update some metrics for shuffle task - updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), event.getShuffleBlocks()); - if (!writeSuccess) { throw new EventRetryException(); } + + // update some metrics for shuffle task + updateCommittedBlockIds(event.getAppId(), event.getShuffleId(), event.getShuffleBlocks()); ShuffleTaskInfo shuffleTaskInfo = shuffleServer.getShuffleTaskManager().getShuffleTaskInfo(event.getAppId()); if (null != shuffleTaskInfo) { @@ -226,7 +228,11 @@ private void updateCommittedBlockIds( } } - public Roaring64NavigableMap getCommittedBlockIds(String appId, Integer shuffleId) { + public Roaring64NavigableMap getCommittedBlockIds(String appId, Integer shuffleId) throws EventDiscardException { + if (writeError) { + throw new EventDiscardException(); + } + Map shuffleIdToBlockIds = committedBlockIds.get(appId); if (shuffleIdToBlockIds == null) { LOG.warn("Unexpected value when getCommittedBlockIds for appId[" + appId + "]"); From 5a8404dfbfab8851664dfb1aa56413fd7995d1fb Mon Sep 17 00:00:00 2001 From: sahibamatta Date: Tue, 16 Jul 2024 18:37:02 -0400 Subject: [PATCH 3/7] Add removed file fix --- .../runConfigurations/ShuffleServer.run.xml | 34 +++++++++++++++++++ .../uniffle/server/ShuffleFlushManager.java | 3 +- 2 files changed, 36 insertions(+), 1 deletion(-) create mode 100644 dev/intellij/runConfigurations/ShuffleServer.run.xml diff --git a/dev/intellij/runConfigurations/ShuffleServer.run.xml b/dev/intellij/runConfigurations/ShuffleServer.run.xml new file mode 100644 index 0000000000..4a05a752e5 --- /dev/null +++ b/dev/intellij/runConfigurations/ShuffleServer.run.xml @@ -0,0 +1,34 @@ + + + + + diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java index e306ca1676..9068c3d308 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java @@ -171,9 +171,10 @@ public void processFlushEvent(ShuffleDataFlushEvent event) throws Exception { maxConcurrencyPerPartitionToWrite); boolean writeSuccess = false; + long startTime = 0L; try { ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request); - long startTime = System.currentTimeMillis(); + startTime = System.currentTimeMillis(); writeSuccess = storageManager.write(storage, handler, event); } catch (Exception e) { LOG.error("storageManager write error.", e); From 525cac695a71f9a6bda0900d6457f60b446cc646 Mon Sep 17 00:00:00 2001 From: sahibamatta Date: Tue, 16 Jul 2024 18:51:19 -0400 Subject: [PATCH 4/7] sync --- .../java/org/apache/uniffle/server/ShuffleFlushManager.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java index 9068c3d308..8fe316d9be 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java @@ -180,7 +180,10 @@ public void processFlushEvent(ShuffleDataFlushEvent event) throws Exception { LOG.error("storageManager write error.", e); writeError = true; } - + + if (!writeSuccess) { + throw new EventRetryException(); + } long endTime = System.currentTimeMillis(); // update some metrics for shuffle task From 402bf31c9341722414b33d598517f457e31009df Mon Sep 17 00:00:00 2001 From: sahibamatta Date: Tue, 16 Jul 2024 19:19:06 -0400 Subject: [PATCH 5/7] Update writeError, not throw exception in constructor --- .../org/apache/uniffle/server/ShuffleFlushManager.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java index 8fe316d9be..bde4eaadee 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java @@ -170,20 +170,19 @@ public void processFlushEvent(ShuffleDataFlushEvent event) throws Exception { user, maxConcurrencyPerPartitionToWrite); - boolean writeSuccess = false; long startTime = 0L; try { ShuffleWriteHandler handler = storage.getOrCreateWriteHandler(request); startTime = System.currentTimeMillis(); - writeSuccess = storageManager.write(storage, handler, event); + boolean writeSuccess = storageManager.write(storage, handler, event); + if (!writeSuccess) { + writeError = true; + } } catch (Exception e) { LOG.error("storageManager write error.", e); writeError = true; } - if (!writeSuccess) { - throw new EventRetryException(); - } long endTime = System.currentTimeMillis(); // update some metrics for shuffle task From 98b00f7d72c7029054e0e0357705ba12e768e5b3 Mon Sep 17 00:00:00 2001 From: sahibamatta Date: Fri, 19 Jul 2024 15:38:25 -0400 Subject: [PATCH 6/7] Handle write errors separately for each shuffle id --- .../org/apache/uniffle/server/ShuffleFlushManager.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java index bde4eaadee..3bdcaafa47 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; @@ -64,13 +65,14 @@ public class ShuffleFlushManager { // appId -> shuffleId -> committed shuffle blockIds private Map> committedBlockIds = JavaUtils.newConcurrentMap(); + private List shuffleIdsWithWriteError = new CopyOnWriteArrayList<>(); private final int retryMax; private final StorageManager storageManager; private final long pendingEventTimeoutSec; private FlushEventHandler eventHandler; private final boolean isAuditLogEnabled; - private boolean writeError = false; + public ShuffleFlushManager( ShuffleServerConf shuffleServerConf, @@ -176,11 +178,11 @@ public void processFlushEvent(ShuffleDataFlushEvent event) throws Exception { startTime = System.currentTimeMillis(); boolean writeSuccess = storageManager.write(storage, handler, event); if (!writeSuccess) { - writeError = true; + shuffleIdsWithWriteError.add(event.getShuffleId()); } } catch (Exception e) { LOG.error("storageManager write error.", e); - writeError = true; + shuffleIdsWithWriteError.add(event.getShuffleId()); } long endTime = System.currentTimeMillis(); @@ -253,7 +255,7 @@ private void updateCommittedBlockIds( } public Roaring64NavigableMap getCommittedBlockIds(String appId, Integer shuffleId) throws EventDiscardException { - if (writeError) { + if (shuffleIdsWithWriteError.contains(shuffleId)) { throw new EventDiscardException(); } From 92649976d7f1db084c2aa08013fa0f0d815c842f Mon Sep 17 00:00:00 2001 From: sahibamatta Date: Fri, 19 Jul 2024 18:22:20 -0400 Subject: [PATCH 7/7] Add UT to test commitFail on Write Fail --- .../uniffle/server/ShuffleFlushManager.java | 5 ++ .../server/ShuffleTaskManagerTest.java | 49 +++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java index 3bdcaafa47..363d32a493 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java @@ -314,4 +314,9 @@ public ShuffleDataDistributionType getDataDistributionType(String appId) { public FlushEventHandler getEventHandler() { return eventHandler; } + + @VisibleForTesting + public void setShuffleIdsWithWriteError(int shuffleId) { + shuffleIdsWithWriteError.add(shuffleId); + } } diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java index 75c49cd4dc..80d1571a52 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java @@ -36,6 +36,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.uniffle.server.flush.EventDiscardException; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -75,6 +76,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -1048,6 +1050,53 @@ public void checkAndClearLeakShuffleDataTest(@TempDir File tempDir) throws Excep assertTrue(hiddenFile.exists()); } + @Test + public void testCommitShuffleFailOnWriteFail() throws Exception { + String confFile = ClassLoader.getSystemResource("server.conf").getFile(); + ShuffleServerConf conf = new ShuffleServerConf(confFile); + final String remoteStorage = HDFS_URI + "rss/test"; + final String appId = "testAppId"; + final int shuffleId = 1; + conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L); + conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0); + conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0); + conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.HDFS.name()); + conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true); + conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L); + conf.set(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED, 3000L); + conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false); + shuffleServer = new ShuffleServer(conf); + ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); + shuffleTaskManager.registerShuffle( + appId, + shuffleId, + Lists.newArrayList(new PartitionRange(1, 1)), + new RemoteStorageInfo(remoteStorage), + StringUtils.EMPTY); + shuffleTaskManager.registerShuffle( + appId, + shuffleId, + Lists.newArrayList(new PartitionRange(2, 2)), + new RemoteStorageInfo(remoteStorage), + StringUtils.EMPTY); + final List expectedBlocks1 = Lists.newArrayList(); + final List expectedBlocks2 = Lists.newArrayList(); + final Map bufferIds = shuffleTaskManager.getRequireBufferIds(); + + shuffleTaskManager.requireBuffer(10); + shuffleTaskManager.requireBuffer(10); + shuffleTaskManager.requireBuffer(10); + assertEquals(3, bufferIds.size()); + // required buffer should be clear if it doesn't receive data after timeout + Thread.sleep(6000); + assertEquals(0, bufferIds.size()); + + ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager(); + shuffleFlushManager.setShuffleIdsWithWriteError(shuffleId); + + assertThrows(EventDiscardException.class, () -> shuffleTaskManager.commitShuffle(appId, shuffleId)); + } + private Set getAppIdsOnDisk(LocalStorageManager localStorageManager) { Set appIdsOnDisk = new HashSet<>();