Skip to content

Commit 136b9b3

Browse files
committed
Basic shuffle file consolidation
The Spark shuffle phase can produce a large number of files, as one file is created per mapper per reducer. For large or repeated jobs, this often produces millions of shuffle files, which sees extremely degredaded performance from the OS file system. This patch seeks to reduce that burden by combining multipe shuffle files into one. This PR draws upon the work of Jason Dai in mesos/spark#669. However, it simplifies the design in order to get the majority of the gain with less overall intellectual and code burden. The vast majority of code in this pull request is a refactor to allow the insertion of a clean layer of indirection between logical block ids and physical files. This, I feel, provides some design clarity in addition to enabling shuffle file consolidation. The main goal is to produce one shuffle file per reducer per active mapper thread. This allows us to isolate the mappers (simplifying the failure modes), while still allowing us to reduce the number of mappers tremendously for large tasks. In order to accomplish this, we simply create a new set of shuffle files for every parallel task, and return the files to a pool which will be given out to the next run task.
1 parent 861dc40 commit 136b9b3

File tree

8 files changed

+57
-14
lines changed

8 files changed

+57
-14
lines changed

core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
5151
ctx.flush();
5252
return;
5353
}
54-
long length = file.length();
54+
long length = fileSegment.length();
5555
if (length > Integer.MAX_VALUE || length <= 0) {
5656
ctx.write(new FileHeader(0, blockId).buffer());
5757
ctx.flush();

core/src/main/scala/org/apache/spark/TaskContext.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class TaskContext(
2525
val stageId: Int,
2626
val partitionId: Int,
2727
val attemptId: Long,
28+
val executorId: String,
2829
val runningLocally: Boolean = false,
2930
@volatile var interrupted: Boolean = false,
3031
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty()

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ private[spark] class Executor(
206206

207207
// Run the actual task and measure its runtime.
208208
taskStart = System.currentTimeMillis()
209-
val value = task.run(taskId.toInt)
209+
val value = task.run(taskId.toInt, executorId)
210210
val taskFinish = System.currentTimeMillis()
211211

212212
// If the task has been killed, let's fail it.

core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ private[spark] class ShuffleMapTask(
152152
try {
153153
// Obtain all the block writers for shuffle blocks.
154154
val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
155-
shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)
155+
shuffle = blockManager.shuffleBlockManager.forShuffle(
156+
dep.shuffleId, context.executorId, numOutputSplits, ser)
156157
buckets = shuffle.acquireWriters(partitionId)
157158

158159
// Write the map output to its associated buckets.

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ import org.apache.spark.util.ByteBufferInputStream
4545
*/
4646
private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {
4747

48-
def run(attemptId: Long): T = {
49-
context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)
48+
final def run(attemptId: Long, executorId: String): T = {
49+
context = new TaskContext(stageId, partitionId, attemptId, executorId, runningLocally = false)
5050
if (_killed) {
5151
kill()
5252
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,7 @@ private[spark] class BlockManager(
578578
val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending = true)
579579
val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
580580
writer.registerCloseEventHandler(() => {
581+
diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment())
581582
val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false)
582583
blockInfo.put(blockId, myInfo)
583584
myInfo.markReady(writer.fileSegment().length)

core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,37 +17,77 @@
1717

1818
package org.apache.spark.storage
1919

20-
import org.apache.spark.serializer.Serializer
20+
import java.util.concurrent.ConcurrentLinkedQueue
21+
import java.util.concurrent.atomic.AtomicInteger
2122

23+
import org.apache.spark.serializer.Serializer
2224

2325
private[spark]
24-
class ShuffleWriterGroup(val id: Int, val writers: Array[BlockObjectWriter])
25-
26+
class ShuffleWriterGroup(val id: Int, val fileId: Int, val writers: Array[BlockObjectWriter])
2627

2728
private[spark]
2829
trait ShuffleBlocks {
2930
def acquireWriters(mapId: Int): ShuffleWriterGroup
3031
def releaseWriters(group: ShuffleWriterGroup)
3132
}
3233

34+
/**
35+
* Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one writer
36+
* per reducer.
37+
*
38+
* As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
39+
* Blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
40+
* per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files,
41+
* it releases them for another task.
42+
* Regarding the implementation of this feature, shuffle files are identified by a 4-tuple:
43+
* - shuffleId: The unique id given to the entire shuffle stage.
44+
* - executorId: The id of the executor running the task. Required in order to ensure that
45+
* multiple executors running on the same node do not collide.
46+
* - bucketId: The id of the output partition (i.e., reducer id)
47+
* - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
48+
* time owns a particular fileId, and this id is returned to a pool when the task finishes.
49+
*/
3350
private[spark]
3451
class ShuffleBlockManager(blockManager: BlockManager) {
52+
/** Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. */
53+
val consolidateShuffleFiles =
54+
System.getProperty("spark.storage.consolidateShuffleFiles", "true").toBoolean
55+
56+
var nextFileId = new AtomicInteger(0)
57+
val unusedFileIds = new ConcurrentLinkedQueue[java.lang.Integer]()
3558

36-
def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = {
59+
def forShuffle(shuffleId: Int, executorId: String, numBuckets: Int, serializer: Serializer) = {
3760
new ShuffleBlocks {
3861
// Get a group of writers for a map task.
3962
override def acquireWriters(mapId: Int): ShuffleWriterGroup = {
4063
val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
64+
val fileId = getUnusedFileId()
4165
val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
4266
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
43-
blockManager.getDiskWriter(blockId, blockId.name, serializer, bufferSize)
67+
val filename = physicalFileName(shuffleId, executorId, bucketId, fileId)
68+
blockManager.getDiskWriter(blockId, filename, serializer, bufferSize)
4469
}
45-
new ShuffleWriterGroup(mapId, writers)
70+
new ShuffleWriterGroup(mapId, fileId, writers)
4671
}
4772

48-
override def releaseWriters(group: ShuffleWriterGroup) = {
49-
// Nothing really to release here.
73+
override def releaseWriters(group: ShuffleWriterGroup) {
74+
recycleFileId(group.fileId)
5075
}
5176
}
5277
}
78+
79+
private def getUnusedFileId(): Int = {
80+
val fileId = unusedFileIds.poll()
81+
if (fileId == null) nextFileId.getAndIncrement()
82+
else fileId
83+
}
84+
85+
private def recycleFileId(fileId: Int) {
86+
if (!consolidateShuffleFiles) { return } // ensures we always generate new file id
87+
unusedFileIds.add(fileId)
88+
}
89+
90+
private def physicalFileName(shuffleId: Int, executorId: String, bucketId: Int, fileId: Int) = {
91+
"merged_shuffle_%d_%s_%d_%d".format(shuffleId, executorId, bucketId, fileId)
92+
}
5393
}

core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte
4040
val func = (c: TaskContext, i: Iterator[String]) => i.next
4141
val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0)
4242
intercept[RuntimeException] {
43-
task.run(0)
43+
task.run(0, "test")
4444
}
4545
assert(completed === true)
4646
}

0 commit comments

Comments
 (0)