Skip to content

Commit 793fe55

Browse files
committed
add option
1 parent 9931e5a commit 793fe55

File tree

6 files changed

+72
-14
lines changed

6 files changed

+72
-14
lines changed

client-spark/common/pom.xml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,14 @@
9595
<artifactId>fory-core</artifactId>
9696
<version>0.12.0</version>
9797
</dependency>
98+
99+
<!-- Scala dependencies -->
100+
<dependency>
101+
<groupId>org.scala-lang</groupId>
102+
<artifactId>scala-library</artifactId>
103+
<version>${scala.version}</version>
104+
<scope>provided</scope>
105+
</dependency>
98106

99107
<!-- Test dependencies -->
100108
<dependency>
@@ -119,8 +127,17 @@
119127
<version>${scala.maven.plugin.version}</version>
120128
<executions>
121129
<execution>
130+
<id>scala-compile-first</id>
131+
<phase>process-resources</phase>
122132
<goals>
133+
<goal>add-source</goal>
123134
<goal>compile</goal>
135+
</goals>
136+
</execution>
137+
<execution>
138+
<id>scala-test-compile-first</id>
139+
<phase>process-test-resources</phase>
140+
<goals>
124141
<goal>testCompile</goal>
125142
</goals>
126143
</execution>
@@ -129,6 +146,19 @@
129146
<scalaVersion>${scala.version}</scalaVersion>
130147
</configuration>
131148
</plugin>
149+
150+
<plugin>
151+
<groupId>org.apache.maven.plugins</groupId>
152+
<artifactId>maven-compiler-plugin</artifactId>
153+
<executions>
154+
<execution>
155+
<phase>compile</phase>
156+
<goals>
157+
<goal>compile</goal>
158+
</goals>
159+
</execution>
160+
</executions>
161+
</plugin>
132162

133163
<!-- ScalaTest Maven plugin for running tests -->
134164
<plugin>

client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.uniffle.common.config.ConfigUtils;
3737
import org.apache.uniffle.common.config.RssClientConf;
3838
import org.apache.uniffle.common.config.RssConf;
39+
import org.apache.uniffle.shuffle.ShuffleSerializer;
3940

4041
public class RssSparkConfig {
4142

@@ -110,6 +111,12 @@ public class RssSparkConfig {
110111
.defaultValue(true)
111112
.withDescription("indicates row based shuffle, set false when use in columnar shuffle");
112113

114+
public static final ConfigOption<ShuffleSerializer> RSS_SHUFFLE_SERIALIZER =
115+
ConfigOptions.key("rss.client.shuffle.serializer")
116+
.enumType(ShuffleSerializer.class)
117+
.noDefaultValue()
118+
.withDescription("Shuffle serializer type");
119+
113120
public static final ConfigOption<Boolean> RSS_MEMORY_SPILL_ENABLED =
114121
ConfigOptions.key("rss.client.memory.spill.enabled")
115122
.booleanType()

client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@
4040
import org.apache.spark.memory.MemoryConsumer;
4141
import org.apache.spark.memory.MemoryMode;
4242
import org.apache.spark.memory.TaskMemoryManager;
43+
import org.apache.spark.serializer.ForySerializerInstance;
4344
import org.apache.spark.serializer.SerializationStream;
4445
import org.apache.spark.serializer.Serializer;
45-
import org.apache.spark.serializer.SerializerInstance;
4646
import org.apache.spark.shuffle.RssSparkConfig;
4747
import org.slf4j.Logger;
4848
import org.slf4j.LoggerFactory;
@@ -58,6 +58,7 @@
5858
import org.apache.uniffle.common.util.BlockIdLayout;
5959
import org.apache.uniffle.common.util.ChecksumUtils;
6060

61+
import static org.apache.spark.shuffle.RssSparkConfig.RSS_SHUFFLE_SERIALIZER;
6162
import static org.apache.spark.shuffle.RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_ENABLED;
6263

6364
public class WriteBufferManager extends MemoryConsumer {
@@ -81,7 +82,6 @@ public class WriteBufferManager extends MemoryConsumer {
8182
private int shuffleId;
8283
private String taskId;
8384
private long taskAttemptId;
84-
private SerializerInstance instance;
8585
private ShuffleWriteMetrics shuffleWriteMetrics;
8686
// cache partition -> records
8787
private Map<Integer, WriterBuffer> buffers;
@@ -192,8 +192,11 @@ public WriteBufferManager(
192192
// in columnar shuffle, the serializer here is never used
193193
this.isRowBased = rssConf.getBoolean(RssSparkConfig.RSS_ROW_BASED);
194194
if (isRowBased) {
195-
this.instance = serializer.newInstance();
196-
this.serializeStream = instance.serializeStream(arrayOutputStream);
195+
if (rssConf.contains(RSS_SHUFFLE_SERIALIZER)) {
196+
this.serializeStream = new ForySerializerInstance().serializeStream(arrayOutputStream);
197+
} else {
198+
this.serializeStream = serializer.newInstance().serializeStream(arrayOutputStream);
199+
}
197200
}
198201
boolean compress =
199202
rssConf.getBoolean(
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.shuffle;
19+
20+
public enum ShuffleSerializer {
21+
FORY
22+
}

client-spark/common/src/main/scala/org/apache/spark/serializer/ForySerializer.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,6 @@ class ForySerializerInstance extends org.apache.spark.serializer.SerializerInsta
3131
f
3232
})
3333

34-
private def createFuryInstance(): Fory = {
35-
Fory.builder()
36-
.withLanguage(Language.JAVA)
37-
.withRefTracking(true)
38-
.withCompatibleMode(CompatibleMode.COMPATIBLE)
39-
.requireClassRegistration(false)
40-
.build()
41-
}
42-
4334
override def serialize[T: ClassTag](t: T): ByteBuffer = {
4435
val bytes = fury.get().serialize(t.asInstanceOf[AnyRef])
4536
ByteBuffer.wrap(bytes)

client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.spark.ShuffleDependency;
4343
import org.apache.spark.TaskContext;
4444
import org.apache.spark.executor.ShuffleReadMetrics;
45+
import org.apache.spark.serializer.ForySerializer;
4546
import org.apache.spark.serializer.Serializer;
4647
import org.apache.spark.shuffle.FunctionUtils;
4748
import org.apache.spark.shuffle.RssShuffleHandle;
@@ -68,6 +69,7 @@
6869

6970
import static org.apache.spark.shuffle.RssSparkConfig.RSS_READ_REORDER_MULTI_SERVERS_ENABLED;
7071
import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;
72+
import static org.apache.spark.shuffle.RssSparkConfig.RSS_SHUFFLE_SERIALIZER;
7173

7274
public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
7375
private static final Logger LOG = LoggerFactory.getLogger(RssShuffleReader.class);
@@ -125,7 +127,10 @@ public RssShuffleReader(
125127
this.numMaps = rssShuffleHandle.getNumMaps();
126128
this.shuffleDependency = rssShuffleHandle.getDependency();
127129
this.shuffleId = shuffleDependency.shuffleId();
128-
this.serializer = rssShuffleHandle.getDependency().serializer();
130+
this.serializer =
131+
rssConf.contains(RSS_SHUFFLE_SERIALIZER)
132+
? new ForySerializer()
133+
: rssShuffleHandle.getDependency().serializer();
129134
this.taskId = "" + context.taskAttemptId() + "_" + context.attemptNumber();
130135
this.basePath = basePath;
131136
this.partitionNum = partitionNum;

0 commit comments

Comments
 (0)