Skip to content

Commit

Permalink
Triggering Stage retry requires reassigning the shuffle server in the…
Browse files Browse the repository at this point in the history
… retry Stage
  • Loading branch information
yl09099 committed Nov 29, 2024
1 parent 24880a3 commit b53f421
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -571,23 +571,20 @@ protected static RemoteStorageInfo getDefaultRemoteStorageInfo(SparkConf sparkCo
}

public ShuffleHandleInfo getShuffleHandleInfo(
int stageAttemptId, int stageAttemptNumber, RssShuffleHandle<?, ?, ?> rssHandle) {
int stageAttemptId,
int stageAttemptNumber,
RssShuffleHandle<?, ?, ?> rssHandle,
boolean isWritePhase) {
int shuffleId = rssHandle.getShuffleId();
if (shuffleManagerRpcServiceEnabled && rssStageRetryForWriteFailureEnabled) {
// In Stage Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId.
return getRemoteShuffleHandleInfoWithStageRetry(
stageAttemptId,
stageAttemptNumber,
shuffleId,
rssHandle.getDependency().partitioner().numPartitions());
stageAttemptId, stageAttemptNumber, shuffleId, isWritePhase);
} else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
// In partition block Retry mode, Get the ShuffleServer list from the Driver based on the
// shuffleId.
return getRemoteShuffleHandleInfoWithBlockRetry(
stageAttemptId,
stageAttemptNumber,
shuffleId,
rssHandle.getDependency().partitioner().numPartitions());
stageAttemptId, stageAttemptNumber, shuffleId, isWritePhase);
} else {
return new SimpleShuffleHandleInfo(
shuffleId, rssHandle.getPartitionToServers(), rssHandle.getRemoteStorage());
Expand All @@ -601,10 +598,10 @@ public ShuffleHandleInfo getShuffleHandleInfo(
* @return ShuffleHandleInfo
*/
protected synchronized StageAttemptShuffleHandleInfo getRemoteShuffleHandleInfoWithStageRetry(
int stageAttemptId, int stageAttemptNumber, int shuffleId, int numPartitions) {
int stageAttemptId, int stageAttemptNumber, int shuffleId, boolean isWritePhase) {
RssPartitionToShuffleServerRequest rssPartitionToShuffleServerRequest =
new RssPartitionToShuffleServerRequest(
stageAttemptId, stageAttemptNumber, shuffleId, numPartitions);
stageAttemptId, stageAttemptNumber, shuffleId, isWritePhase);
RssReassignOnStageRetryResponse rpcPartitionToShufflerServer =
getOrCreateShuffleManagerClientSupplier()
.get()
Expand All @@ -622,10 +619,10 @@ protected synchronized StageAttemptShuffleHandleInfo getRemoteShuffleHandleInfoW
* @return ShuffleHandleInfo
*/
protected synchronized MutableShuffleHandleInfo getRemoteShuffleHandleInfoWithBlockRetry(
int stageAttemptId, int stageAttemptNumber, int shuffleId, int numPartitions) {
int stageAttemptId, int stageAttemptNumber, int shuffleId, boolean isWritePhase) {
RssPartitionToShuffleServerRequest rssPartitionToShuffleServerRequest =
new RssPartitionToShuffleServerRequest(
stageAttemptId, stageAttemptNumber, shuffleId, numPartitions);
stageAttemptId, stageAttemptNumber, shuffleId, isWritePhase);
RssReassignOnBlockSendFailureResponse rpcPartitionToShufflerServer =
getOrCreateShuffleManagerClientSupplier()
.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,18 @@ public void getPartitionToShufflerServerWithStageRetry(
int stageAttemptId = request.getStageAttemptId();
int stageAttemptNumber = request.getStageAttemptNumber();
int shuffleId = request.getShuffleId();
boolean isWritePhase = request.getIsWritePhase();
StageAttemptShuffleHandleInfo shuffleHandle;
ShuffleServerWriterFailureRecord shuffleServerWriterFailureRecord =
shuffleWriteStatus.get(shuffleId);
if (shuffleServerWriterFailureRecord != null) {
synchronized (shuffleServerWriterFailureRecord) {
if (shuffleServerWriterFailureRecord.isNeedReassignForLastStageNumber(stageAttemptNumber)) {
shuffleManager.reassignOnStageResubmit(shuffleId, stageAttemptId, stageAttemptNumber);
shuffleServerWriterFailureRecord.setShuffleServerAssignmented(true);
if (isWritePhase) {
ShuffleServerWriterFailureRecord shuffleServerWriterFailureRecord =
shuffleWriteStatus.get(shuffleId);
if (shuffleServerWriterFailureRecord != null) {
synchronized (shuffleServerWriterFailureRecord) {
if (shuffleServerWriterFailureRecord.isNeedReassignForLastStageNumber(
stageAttemptNumber)) {
shuffleManager.reassignOnStageResubmit(shuffleId, stageAttemptId, stageAttemptNumber);
shuffleServerWriterFailureRecord.setShuffleServerAssignmented(true);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,12 +494,12 @@ public <K, C> ShuffleReader<K, C> getReader(
// In Stage Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId.
shuffleHandleInfo =
getRemoteShuffleHandleInfoWithStageRetry(
context.stageId(), context.stageAttemptNumber(), shuffleId, partitionNum);
context.stageId(), context.stageAttemptNumber(), shuffleId, false);
} else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
// In Block Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId
shuffleHandleInfo =
getRemoteShuffleHandleInfoWithBlockRetry(
context.stageId(), context.stageAttemptNumber(), shuffleId, partitionNum);
context.stageId(), context.stageAttemptNumber(), shuffleId, false);
} else {
shuffleHandleInfo =
new SimpleShuffleHandleInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public RssShuffleWriter(
rssHandle,
taskFailureCallback,
shuffleManager.getShuffleHandleInfo(
context.stageId(), context.stageAttemptNumber(), rssHandle),
context.stageId(), context.stageAttemptNumber(), rssHandle, true),
context);
BufferManagerOptions bufferOptions = new BufferManagerOptions(sparkConf);
final WriteBufferManager bufferManager =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ public class RssPartitionToShuffleServerRequest {
private int stageAttemptId;
private int stageAttemptNumber;
private int shuffleId;
private int numPartitions;
private boolean isWritePhase;

public RssPartitionToShuffleServerRequest(
int stageAttemptId, int stageAttemptNumber, int shuffleId, int numPartitions) {
int stageAttemptId, int stageAttemptNumber, int shuffleId, boolean isWritePhase) {
this.stageAttemptId = stageAttemptId;
this.stageAttemptNumber = stageAttemptNumber;
this.shuffleId = shuffleId;
this.numPartitions = numPartitions;
this.isWritePhase = isWritePhase;
}

public int getShuffleId() {
Expand All @@ -47,7 +47,7 @@ public RssProtos.PartitionToShuffleServerRequest toProto() {
builder.setStageAttemptId(stageAttemptId);
builder.setStageAttemptNumber(stageAttemptNumber);
builder.setShuffleId(shuffleId);
builder.setNumPartitions(numPartitions);
builder.setIsWritePhase(isWritePhase);
return builder.build();
}
}
2 changes: 1 addition & 1 deletion proto/src/main/proto/Rss.proto
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ message PartitionToShuffleServerRequest {
int32 stageAttemptId = 1;
int32 stageAttemptNumber = 2;
int32 shuffleId = 3;
int32 numPartitions = 4;
bool isWritePhase = 4;
}

message ReassignOnStageRetryResponse {
Expand Down

0 comments on commit b53f421

Please sign in to comment.