Skip to content

Commit

Permalink
Triggering Stage retry requires reassigning the shuffle server in the…
Browse files Browse the repository at this point in the history
… retry Stage
  • Loading branch information
yl09099 committed Nov 29, 2024
1 parent d461201 commit 01512c9
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.apache.uniffle.proto.RssProtos.ShuffleRegisterResponse;
import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerImplBase;
import org.apache.uniffle.server.audit.ServerRpcAuditContext;
import org.apache.uniffle.server.block.ShuffleBlockIdManager;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.server.merge.MergeStatus;
import org.apache.uniffle.storage.common.Storage;
Expand Down Expand Up @@ -267,6 +268,28 @@ public void registerShuffle(
shuffleId,
lastAttemptNumber,
System.currentTimeMillis() - start);
// Add a check to prevent undeleted metadata.
ShuffleBlockIdManager shuffleBlockIdManager =
shuffleServer
.getShuffleTaskManager()
.getShuffleTaskInfo(appId)
.getShuffleBlockIdManager();
long blockCountByShuffleId =
shuffleBlockIdManager.getBlockCountByShuffleId(
appId, Lists.newArrayList(shuffleId));
if (blockCountByShuffleId != 0) {
LOG.error(
"Metadata is not deleted on clearing previous stage attempt data for app: {}, shuffleId: {}, stageAttemptNumber: {}",
appId,
shuffleId,
lastAttemptNumber);
StatusCode code = StatusCode.INTERNAL_ERROR;
auditContext.withStatusCode(code);
reply = ShuffleRegisterResponse.newBuilder().setStatus(code.toProto()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
return;
}
} catch (Exception e) {
LOG.error(
"Errors on clearing previous stage attempt data for app: {}, shuffleId: {}, stageAttemptNumber: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,18 @@ public long getTotalBlockCount() {
.sum();
}

@Override
public long getBlockCountByShuffleId(String appId, List<Integer> shuffleIds) {

return partitionsToBlockIds.values().stream()
.filter(k -> shuffleIds.contains(k.keySet()))
.flatMap(innerMap -> innerMap.values().stream())
.flatMapToLong(
arr ->
java.util.Arrays.stream(arr).mapToLong(Roaring64NavigableMap::getLongCardinality))
.sum();
}

@Override
public boolean contains(String appId) {
return partitionsToBlockIds.containsKey(appId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ public long getTotalBlockCount() {
.sum();
}

@Override
public long getBlockCountByShuffleId(String appId, List<Integer> shuffleIds) {

return partitionsToBlockIds.values().stream()
.filter(k -> shuffleIds.contains(k.keySet()))
.flatMap(innerMap -> innerMap.values().stream())
.flatMap(innerMap -> innerMap.values().stream())
.mapToLong(roaringMap -> roaringMap.getLongCardinality())
.sum();
}

@Override
public boolean contains(String appId) {
return partitionsToBlockIds.containsKey(appId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ byte[] getFinishedBlockIds(

long getTotalBlockCount();

long getBlockCountByShuffleId(String appId, List<Integer> shuffleIds);

boolean contains(String testAppId);

long getBitmapNum(String appId, int shuffleId);
Expand Down

0 comments on commit 01512c9

Please sign in to comment.