Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2212] feat(client,server,coordinator): Introduce ServiceVersion framework to support backward compatible and reduce BlockIdLayout message #2217

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

maobaolong
Copy link
Member

@maobaolong maobaolong commented Oct 22, 2024

What changes were proposed in this pull request?

  • Introduce ServiceVersion framework to support backward compatible
  • Reduce BlcokIdLayout message by store it into server through registerShuffle and get it for each getShuffleResult and getShuffleResultForMultiPart.

Why are the changes needed?

Fix: #2212

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Test locally

  • New client -> New Coordinator/Server : use blockIdLayout from ShuffleTaskInfo stored by registerShuffleRequest
[2024-10-22 16:42:18.685] cmd=registerShuffle	statusCode=SUCCESS	from=/localhost:55338	executionTimeUs=47161	appId=app-20241022164214-0077_1729586532865	shuffleId=0	args{remoteStoragePath=, user=root, stageAttemptNumber=0, blockIdLayout=null}
[2024-10-22 16:42:20.555] cmd=requireBuffer	statusCode=SUCCESS	from=/localhost:55352	executionTimeUs=9604	appId=app-20241022164214-0077_1729586532865	shuffleId=0	args{requireSize=251, partitionIdsSize=1}	return{requireBufferId=1}
[2024-10-22 16:42:20.555] cmd=requireBuffer	statusCode=SUCCESS	from=/localhost:55352	executionTimeUs=10581	appId=app-20241022164214-0077_1729586532865	shuffleId=0	args{requireSize=457, partitionIdsSize=2}	return{requireBufferId=2}
[2024-10-22 16:42:20.680] cmd=sendShuffleData	statusCode=SUCCESS	from=/localhost:55136	executionTimeUs=56345	appId=app-20241022164214-0077_1729586532865	shuffleId=0	args{requireBufferId=1, requireSize=251, isPreAllocated=true, requireBlocksSize=48, stageAttemptNumber=0, partitionCount=1}
[2024-10-22 16:42:20.683] cmd=sendShuffleData	statusCode=SUCCESS	from=/localhost:55136	executionTimeUs=607	appId=app-20241022164214-0077_1729586532865	shuffleId=0	args{requireBufferId=2, requireSize=457, isPreAllocated=true, requireBlocksSize=110, stageAttemptNumber=0, partitionCount=2}
[2024-10-22 16:42:20.719] cmd=commitShuffleTask	statusCode=SUCCESS	from=/localhost:55352	executionTimeUs=6617	appId=app-20241022164214-0077_1729586532865	shuffleId=0	return{commitCount=2}
[2024-10-22 16:42:20.719] cmd=commitShuffleTask	statusCode=SUCCESS	from=/localhost:55352	executionTimeUs=6456	appId=app-20241022164214-0077_1729586532865	shuffleId=0	return{commitCount=1}
[2024-10-22 16:42:20.948] cmd=reportShuffleResult	statusCode=SUCCESS	from=/localhost:55352	executionTimeUs=16630	appId=app-20241022164214-0077_1729586532865	shuffleId=0	args{taskAttemptId=0, bitmapNum=1, partitionToBlockIdsSize=2}
[2024-10-22 16:42:21.771] cmd=finishShuffle	statusCode=SUCCESS	from=/localhost:55352	executionTimeUs=1028454	appId=app-20241022164214-0077_1729586532865	shuffleId=0
[2024-10-22 16:42:22.114] cmd=reportShuffleResult	statusCode=SUCCESS	from=/localhost:55352	executionTimeUs=2799	appId=app-20241022164214-0077_1729586532865	shuffleId=0	args{taskAttemptId=4, bitmapNum=1, partitionToBlockIdsSize=1}
[2024-10-22 16:42:22.513] cmd=getShuffleResultForMultiPart	statusCode=SUCCESS	from=/localhost:55352	executionTimeUs=27809	appId=app-20241022164214-0077_1729586532865	shuffleId=0	args{partitionsListSize=1, blockIdLayout=blockIdLayout[seq: 21 bits, part: 20 bits, task: 22 bits]}
[2024-10-22 16:42:22.513] cmd=getShuffleResultForMultiPart	statusCode=SUCCESS	from=/localhost:55352	executionTimeUs=27843	appId=app-20241022164214-0077_1729586532865	shuffleId=0	args{partitionsListSize=1, blockIdLayout=blockIdLayout[seq: 21 bits, part: 20 bits, task: 22 bits]}
[2024-10-22 16:42:22.627] cmd=getLocalShuffleIndex	statusCode=SUCCESS	from=/localhost:55138	executionTimeUs=23477	appId=app-20241022164214-0077_1729586532865	shuffleId=0	args{requestId=2, partitionId=0, partitionNumPerRange=1, partitionNum=2}	return{len=40}
[2024-10-22 16:42:22.628] cmd=getLocalShuffleIndex	statusCode=SUCCESS	from=/localhost:55138	executionTimeUs=706	appId=app-20241022164214-0077_1729586532865	shuffleId=0	args{requestId=3, partitionId=1, partitionNumPerRange=1, partitionNum=2}	return{len=80}
[2024-10-22 16:42:22.652] cmd=getLocalShuffleData	statusCode=SUCCESS	from=/localhost:55136	executionTimeUs=3129	appId=app-20241022164214-0077_1729586532865	shuffleId=0	args{requestId=4, partitionId=0, partitionNumPerRange=1, partitionNum=2, offset=0, length=13}	return{len=13}
[2024-10-22 16:42:22.655] cmd=getLocalShuffleData	statusCode=SUCCESS	from=/localhost:55136	executionTimeUs=975	appId=app-20241022164214-0077_1729586532865	shuffleId=0	args{requestId=5, partitionId=1, partitionNumPerRange=1, partitionNum=2, offset=0, length=49}	return{len=49}
  • Old client -> New Coordinator/Server : use blockIdLayout by getShuffleReportRequest, nothing change.
[2024-10-22 16:55:47.711] cmd=registerShuffle	statusCode=SUCCESS	from=/localhost:38382	executionTimeUs=5681	appId=app-20241022165543-0078_1729587342008	shuffleId=0	args{remoteStoragePath=, user=root, stageAttemptNumber=0, blockIdLayout=blockIdLayout[seq: 21 bits, part: 20 bits, task: 22 bits]}
[2024-10-22 16:55:49.571] cmd=requireBuffer	statusCode=SUCCESS	from=/localhost:38386	executionTimeUs=4007	appId=app-20241022165543-0078_1729587342008	shuffleId=0	args{requireSize=251, partitionIdsSize=1}	return{requireBufferId=4}
[2024-10-22 16:55:49.572] cmd=requireBuffer	statusCode=SUCCESS	from=/localhost:38386	executionTimeUs=4550	appId=app-20241022165543-0078_1729587342008	shuffleId=0	args{requireSize=457, partitionIdsSize=2}	return{requireBufferId=3}
[2024-10-22 16:55:49.651] cmd=sendShuffleData	statusCode=SUCCESS	from=/localhost:34940	executionTimeUs=11168	appId=app-20241022165543-0078_1729587342008	shuffleId=0	args{requireBufferId=3, requireSize=457, isPreAllocated=true, requireBlocksSize=110, stageAttemptNumber=0, partitionCount=2}
[2024-10-22 16:55:49.651] cmd=sendShuffleData	statusCode=SUCCESS	from=/localhost:34926	executionTimeUs=10189	appId=app-20241022165543-0078_1729587342008	shuffleId=0	args{requireBufferId=4, requireSize=251, isPreAllocated=true, requireBlocksSize=48, stageAttemptNumber=0, partitionCount=1}
[2024-10-22 16:55:49.707] cmd=commitShuffleTask	statusCode=SUCCESS	from=/localhost:38386	executionTimeUs=5040	appId=app-20241022165543-0078_1729587342008	shuffleId=0	return{commitCount=2}
[2024-10-22 16:55:49.707] cmd=commitShuffleTask	statusCode=SUCCESS	from=/localhost:38386	executionTimeUs=5149	appId=app-20241022165543-0078_1729587342008	shuffleId=0	return{commitCount=1}
[2024-10-22 16:55:49.890] cmd=reportShuffleResult	statusCode=SUCCESS	from=/localhost:38386	executionTimeUs=2541	appId=app-20241022165543-0078_1729587342008	shuffleId=0	args{taskAttemptId=0, bitmapNum=1, partitionToBlockIdsSize=2}
[2024-10-22 16:55:50.742] cmd=finishShuffle	statusCode=SUCCESS	from=/localhost:38386	executionTimeUs=1006461	appId=app-20241022165543-0078_1729587342008	shuffleId=0
[2024-10-22 16:55:51.080] cmd=reportShuffleResult	statusCode=SUCCESS	from=/localhost:38386	executionTimeUs=1851	appId=app-20241022165543-0078_1729587342008	shuffleId=0	args{taskAttemptId=4, bitmapNum=1, partitionToBlockIdsSize=1}
[2024-10-22 16:55:51.497] cmd=getShuffleResultForMultiPart	statusCode=SUCCESS	from=/localhost:38386	executionTimeUs=3230	appId=app-20241022165543-0078_1729587342008	shuffleId=0	args{partitionsListSize=1, blockIdLayout=null}
[2024-10-22 16:55:51.497] cmd=getShuffleResultForMultiPart	statusCode=SUCCESS	from=/localhost:38386	executionTimeUs=7935	appId=app-20241022165543-0078_1729587342008	shuffleId=0	args{partitionsListSize=1, blockIdLayout=null}
[2024-10-22 16:55:51.578] cmd=getLocalShuffleIndex	statusCode=SUCCESS	from=/localhost:34926	executionTimeUs=4631	appId=app-20241022165543-0078_1729587342008	shuffleId=0	args{requestId=2, partitionId=0, partitionNumPerRange=1, partitionNum=2}	return{len=40}
[2024-10-22 16:55:51.578] cmd=getLocalShuffleIndex	statusCode=SUCCESS	from=/localhost:34940	executionTimeUs=5751	appId=app-20241022165543-0078_1729587342008	shuffleId=0	args{requestId=3, partitionId=1, partitionNumPerRange=1, partitionNum=2}	return{len=80}
[2024-10-22 16:55:51.604] cmd=getLocalShuffleData	statusCode=SUCCESS	from=/localhost:34926	executionTimeUs=1735	appId=app-20241022165543-0078_1729587342008	shuffleId=0	args{requestId=5, partitionId=1, partitionNumPerRange=1, partitionNum=2, offset=0, length=49}	return{len=49}
[2024-10-22 16:55:51.604] cmd=getLocalShuffleData	statusCode=SUCCESS	from=/localhost:34940	executionTimeUs=1735	appId=app-20241022165543-0078_1729587342008	shuffleId=0	args{requestId=4, partitionId=0, partitionNumPerRange=1, partitionNum=2, offset=0, length=13}	return{len=13}
  • New client -> Old Coordinator/New Server : use blockIdLayout by getShuffleReportRequest, nothing change.
[2024-10-22 18:53:38.191] cmd=registerShuffle	statusCode=SUCCESS	from=/localhost:54010	executionTimeUs=5038832	appId=app-20241022185329-0080_1729594407610	shuffleId=0	args{remoteStoragePath=, user=root, stageAttemptNumber=0, blockIdLayout=blockIdLayout[seq: 21 bits, part: 20 bits, task: 22 bits]}
[2024-10-22 18:53:40.106] cmd=requireBuffer	statusCode=SUCCESS	from=/localhost:54026	executionTimeUs=10600	appId=app-20241022185329-0080_1729594407610	shuffleId=0	args{requireSize=457, partitionIdsSize=2}	return{requireBufferId=1}
[2024-10-22 18:53:40.106] cmd=requireBuffer	statusCode=SUCCESS	from=/localhost:54026	executionTimeUs=11154	appId=app-20241022185329-0080_1729594407610	shuffleId=0	args{requireSize=251, partitionIdsSize=1}	return{requireBufferId=2}
[2024-10-22 18:53:40.264] cmd=sendShuffleData	statusCode=SUCCESS	from=/localhost:60484	executionTimeUs=82391	appId=app-20241022185329-0080_1729594407610	shuffleId=0	args{requireBufferId=2, requireSize=251, isPreAllocated=true, requireBlocksSize=48, stageAttemptNumber=0, partitionCount=1}
[2024-10-22 18:53:40.264] cmd=sendShuffleData	statusCode=SUCCESS	from=/localhost:60486	executionTimeUs=83382	appId=app-20241022185329-0080_1729594407610	shuffleId=0	args{requireBufferId=1, requireSize=457, isPreAllocated=true, requireBlocksSize=110, stageAttemptNumber=0, partitionCount=2}
[2024-10-22 18:53:40.327] cmd=commitShuffleTask	statusCode=SUCCESS	from=/localhost:54026	executionTimeUs=16662	appId=app-20241022185329-0080_1729594407610	shuffleId=0	return{commitCount=1}
[2024-10-22 18:53:40.327] cmd=commitShuffleTask	statusCode=SUCCESS	from=/localhost:54026	executionTimeUs=16689	appId=app-20241022185329-0080_1729594407610	shuffleId=0	return{commitCount=2}
[2024-10-22 18:53:40.514] cmd=reportShuffleResult	statusCode=SUCCESS	from=/localhost:54026	executionTimeUs=5713	appId=app-20241022185329-0080_1729594407610	shuffleId=0	args{taskAttemptId=4, bitmapNum=1, partitionToBlockIdsSize=1}
[2024-10-22 18:53:41.376] cmd=finishShuffle	statusCode=SUCCESS	from=/localhost:54026	executionTimeUs=1025196	appId=app-20241022185329-0080_1729594407610	shuffleId=0
[2024-10-22 18:53:41.701] cmd=reportShuffleResult	statusCode=SUCCESS	from=/localhost:54026	executionTimeUs=3741	appId=app-20241022185329-0080_1729594407610	shuffleId=0	args{taskAttemptId=0, bitmapNum=1, partitionToBlockIdsSize=2}
[2024-10-22 18:53:42.151] cmd=getShuffleResultForMultiPart	statusCode=SUCCESS	from=/localhost:54026	executionTimeUs=33402	appId=app-20241022185329-0080_1729594407610	shuffleId=0	args{partitionsListSize=1, blockIdLayout=blockIdLayout[seq: 21 bits, part: 20 bits, task: 22 bits]}
[2024-10-22 18:53:42.151] cmd=getShuffleResultForMultiPart	statusCode=SUCCESS	from=/localhost:54026	executionTimeUs=32826	appId=app-20241022185329-0080_1729594407610	shuffleId=0	args{partitionsListSize=1, blockIdLayout=blockIdLayout[seq: 21 bits, part: 20 bits, task: 22 bits]}
[2024-10-22 18:53:42.248] cmd=getLocalShuffleIndex	statusCode=SUCCESS	from=/localhost:60486	executionTimeUs=20195	appId=app-20241022185329-0080_1729594407610	shuffleId=0	args{requestId=2, partitionId=0, partitionNumPerRange=1, partitionNum=2}	return{len=40}
[2024-10-22 18:53:42.249] cmd=getLocalShuffleIndex	statusCode=SUCCESS	from=/localhost:60486	executionTimeUs=629	appId=app-20241022185329-0080_1729594407610	shuffleId=0	args{requestId=3, partitionId=1, partitionNumPerRange=1, partitionNum=2}	return{len=80}
[2024-10-22 18:53:42.268] cmd=getLocalShuffleData	statusCode=SUCCESS	from=/localhost:60484	executionTimeUs=1727	appId=app-20241022185329-0080_1729594407610	shuffleId=0	args{requestId=4, partitionId=1, partitionNumPerRange=1, partitionNum=2, offset=0, length=49}	return{len=49}
[2024-10-22 18:53:42.269] cmd=getLocalShuffleData	statusCode=SUCCESS	from=/localhost:60484	executionTimeUs=370	appId=app-20241022185329-0080_1729594407610	shuffleId=0	args{requestId=5, partitionId=0, partitionNumPerRange=1, partitionNum=2, offset=0, length=13}	return{len=13}
  • New Client -> New Coordinator / Old Server : use blockIdLayout by getShuffleReportRequest, nothing change.

Copy link

github-actions bot commented Oct 22, 2024

Test Results

 2 926 files  +29   2 926 suites  +29   6h 16m 45s ⏱️ + 35m 38s
 1 049 tests ± 0   1 047 ✅ + 2   2 💤 ±0  0 ❌  - 1 
13 045 runs  +62  13 015 ✅ +64  30 💤 ±0  0 ❌  - 1 

Results for commit 2cac65a. ± Comparison against base commit 2b70eb4.

♻️ This comment has been updated with latest results.

@maobaolong
Copy link
Member Author

@jerqi Would you like to take a look at this PR? Thanks!

@jerqi
Copy link
Contributor

jerqi commented Oct 22, 2024

I have already had server version, you can see Constants::SHUFFLE_SERVER_VERSION

@jerqi jerqi changed the title [#2212] feat(client/server/coordinator): Introduce ServiceVersion framework to support backward compatible and reduce BlcokIdLayout message [#2212] feat(client/server/coordinator): Introduce ServiceVersion framework to support backward compatible and reduce BlockIdLayout message Oct 22, 2024
@jerqi jerqi changed the title [#2212] feat(client/server/coordinator): Introduce ServiceVersion framework to support backward compatible and reduce BlockIdLayout message [#2212] feat(client,server,coordinator): Introduce ServiceVersion framework to support backward compatible and reduce BlockIdLayout message Oct 22, 2024
@maobaolong
Copy link
Member Author

@jerqi Thanks for this remind, after go through the code related to SHUFFLE_SERVER_VERSION, I have some primary knowledge about this.

  • SHUFFLE_SERVER_VERSION is used as a tag of server
  • Coordinator select the servers for client follow the tag filter from client
  • Metrics related

In conclusion, I guess the motivation of the SHUFFLE_SERVER_VERSION aimed to select the same version/tag server for client. String type tag/version make easy to understand version, but hard to extend, if we extend a new feature base on ss_v5, old client could not use ss_v5 as tag to get assignment, even the new feature include ss_v5 feature.

But this motivation of ServiceVersion is aimed to make client adopt to the servers with different serviceVersion, to make them all works.

@jerqi
Copy link
Contributor

jerqi commented Oct 23, 2024

@jerqi Thanks for this remind, after go through the code related to SHUFFLE_SERVER_VERSION, I have some primary knowledge about this.

  • SHUFFLE_SERVER_VERSION is used as a tag of server
  • Coordinator select the servers for client follow the tag filter from client
  • Metrics related

In conclusion, I guess the motivation of the SHUFFLE_SERVER_VERSION aimed to select the same version/tag server for client. String type tag/version make easy to understand version, but hard to extend, if we extend a new feature base on ss_v5, old client could not use ss_v5 as tag to get assignment, even the new feature include ss_v5 feature.

But this motivation of ServiceVersion is aimed to make client adopt to the servers with different serviceVersion, to make them all work

NO, if we give uncompatible update, we should change the version.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants