Skip to content

Commit 1299c8a

Browse files
Bo YangBo Yang
authored andcommitted
[SPARK-33037][SHUFFLE] Remove knownManagers to support user's custom shuffle manager plugin
### What changes were proposed in this pull request? Spark has a hardcode list to contain known shuffle managers, which has two values now. It does not contain user's custom shuffle manager which is set through Spark config "spark.shuffle.manager". We hit issue when set "spark.shuffle.manager" with our own shuffle manager plugin (Uber Remote Shuffle Service implementation, https://github.com/uber/RemoteShuffleService). Other users will hit same issue when they implement their own shuffle manager. It is better to remove that knownManagers hardcode list, to support user's custom shuffle manager implementation. ### Why are the changes needed? Spark has shuffle manager API to support custom shuffle manager implementation. The hardcoded known managers list does not consider that shuffle manager config value which could be set by user. Thus better to remove this hardcoded known managers list. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Current Spark unit test already covers the code path. Closes apache#29916 from boy-uber/knownManagers. Lead-authored-by: Bo Yang <[email protected]> Co-authored-by: Bo Yang <[email protected]> Signed-off-by: Liang-Chi Hsieh <[email protected]>
1 parent 82721ce commit 1299c8a

File tree

4 files changed

+8
-21
lines changed

4 files changed

+8
-21
lines changed

Diff for: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java

-8
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,6 @@ public class ExternalShuffleBlockResolver {
9292
@VisibleForTesting
9393
final DB db;
9494

95-
private final List<String> knownManagers = Arrays.asList(
96-
"org.apache.spark.shuffle.sort.SortShuffleManager",
97-
"org.apache.spark.shuffle.unsafe.UnsafeShuffleManager");
98-
9995
public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile)
10096
throws IOException {
10197
this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor(
@@ -148,10 +144,6 @@ public void registerExecutor(
148144
ExecutorShuffleInfo executorInfo) {
149145
AppExecId fullId = new AppExecId(appId, execId);
150146
logger.info("Registered executor {} with {}", fullId, executorInfo);
151-
if (!knownManagers.contains(executorInfo.shuffleManager)) {
152-
throw new UnsupportedOperationException(
153-
"Unsupported shuffle manager of executor: " + executorInfo);
154-
}
155147
try {
156148
if (db != null) {
157149
byte[] key = dbAppExecKey(fullId);

Diff for: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java

-9
Original file line numberDiff line numberDiff line change
@@ -71,15 +71,6 @@ public void testBadRequests() throws IOException {
7171
assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
7272
}
7373

74-
// Invalid shuffle manager
75-
try {
76-
resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
77-
resolver.getBlockData("app0", "exec2", 1, 1, 0);
78-
fail("Should have failed");
79-
} catch (UnsupportedOperationException e) {
80-
// pass
81-
}
82-
8374
// Nonexistent shuffle block
8475
resolver.registerExecutor("app0", "exec3",
8576
dataContext.createExecutorInfo(SORT_MANAGER));

Diff for: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -233,9 +233,9 @@ public void testFetchThreeSort() throws Exception {
233233
exec0Fetch.releaseBuffers();
234234
}
235235

236-
@Test (expected = RuntimeException.class)
237-
public void testRegisterInvalidExecutor() throws Exception {
238-
registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager"));
236+
@Test
237+
public void testRegisterWithCustomShuffleManager() throws Exception {
238+
registerExecutor("exec-1", dataContext0.createExecutorInfo("custom shuffle manager"));
239239
}
240240

241241
@Test

Diff for: core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala

+5-1
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@ import org.apache.spark.{ShuffleDependency, TaskContext}
2424
* and on each executor, based on the spark.shuffle.manager setting. The driver registers shuffles
2525
* with it, and executors (or tasks running locally in the driver) can ask to read and write data.
2626
*
27-
* NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
27+
* NOTE:
28+
* 1. This will be instantiated by SparkEnv so its constructor can take a SparkConf and
2829
* boolean isDriver as parameters.
30+
* 2. This contains a method ShuffleBlockResolver which interacts with External Shuffle Service
31+
* when it is enabled. Need to pay attention to that, if implementing a custom ShuffleManager, to
32+
* make sure the custom ShuffleManager could co-exist with External Shuffle Service.
2933
*/
3034
private[spark] trait ShuffleManager {
3135

0 commit comments

Comments
 (0)