Skip to content

Commit

Permalink
[Improvement] The upstream rewrite is triggered when the read stage f…
Browse files Browse the repository at this point in the history
…ails.
  • Loading branch information
yl09099 committed Dec 9, 2024
1 parent 0552d3b commit 69f340f
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public void reportShuffleFetchFailure(
RssProtos.ReportShuffleFetchFailureRequest request,
StreamObserver<RssProtos.ReportShuffleFetchFailureResponse> responseObserver) {
String appId = request.getAppId();
int shuffleId = request.getShuffleId();
int stageAttempt = request.getStageAttemptId();
int partitionId = request.getPartitionId();
RssProtos.StatusCode code;
Expand Down Expand Up @@ -189,18 +190,35 @@ public void reportShuffleFetchFailure(
code = RssProtos.StatusCode.INVALID_REQUEST;
reSubmitWholeStage = false;
} else { // update the stage partition fetch failure count
code = RssProtos.StatusCode.SUCCESS;
status.incPartitionFetchFailure(stageAttempt, partitionId);
int fetchFailureNum = status.getPartitionFetchFailureNum(stageAttempt, partitionId);
if (fetchFailureNum >= shuffleManager.getMaxFetchFailures()) {
reSubmitWholeStage = true;
msg =
String.format(
"report shuffle fetch failure as maximum number(%d) of shuffle fetch is occurred",
shuffleManager.getMaxFetchFailures());
} else {
reSubmitWholeStage = false;
msg = "don't report shuffle fetch failure";
synchronized (status) {
code = RssProtos.StatusCode.SUCCESS;
status.incPartitionFetchFailure(stageAttempt, partitionId);
if (status.getPartitionFetchFailureNum(stageAttempt, partitionId, shuffleManager)) {
reSubmitWholeStage = true;
if (!status.isClearedMapTrackerBlock()) {
try {
// Clear the metadata of the completed task, otherwise some of the stage's data will
// be lost.
shuffleManager.unregisterAllMapOutput(shuffleId);
status.setClearedMapTrackerBlock(true);
LOG.info(
"Clear shuffle result in shuffleId:{}, stageId:{}.", shuffleId, stageAttempt);
} catch (SparkException e) {
LOG.error(
"Clear MapoutTracker Meta failed in shuffleId:{}, stageAttemptId:{}.",
shuffleId,
stageAttempt);
throw new RssException("Clear MapoutTracker Meta failed!", e);
}
}
msg =
String.format(
"report shuffle fetch failure as maximum number(%d) of shuffle fetch is occurred",
shuffleManager.getMaxFetchFailures());
} else {
reSubmitWholeStage = false;
msg = "don't report shuffle fetch failure";
}
}
}
}
Expand Down Expand Up @@ -489,10 +507,13 @@ private static class RssShuffleStatus {
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
private final int[] partitions;
private int stageAttempt;
// Whether the Shuffle result has been cleared for the current number of attempts.
private boolean isClearedMapTrackerBlock;

private RssShuffleStatus(int partitionNum, int stageAttempt) {
this.stageAttempt = stageAttempt;
this.partitions = new int[partitionNum];
this.isClearedMapTrackerBlock = false;
}

private <T> T withReadLock(Supplier<T> fn) {
Expand Down Expand Up @@ -553,16 +574,36 @@ public void incPartitionFetchFailure(int stageAttempt, int partition) {
});
}

public int getPartitionFetchFailureNum(int stageAttempt, int partition) {
public boolean getPartitionFetchFailureNum(
int stageAttempt, int partition, RssShuffleManagerInterface shuffleManager) {
return withReadLock(
() -> {
if (this.stageAttempt != stageAttempt) {
return 0;
return false;
} else {
return this.partitions[partition];
if (this.partitions[partition] >= shuffleManager.getMaxFetchFailures()) {
return true;
} else {
return false;
}
}
});
}

public void setClearedMapTrackerBlock(boolean isCleared) {
withWriteLock(
() -> {
this.isClearedMapTrackerBlock = isCleared;
return null;
});
}

public boolean isClearedMapTrackerBlock() {
return withReadLock(
() -> {
return isClearedMapTrackerBlock;
});
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void updateSparkConfCustomer(SparkConf sparkConf) {
super.updateSparkConfCustomer(sparkConf);
sparkConf.set(
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
+ RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED,
+ RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED,
"true");
}

Expand Down

0 comments on commit 69f340f

Please sign in to comment.