Skip to content

Commit

Permalink
Change resultBlockId for RddComputeMonotasks.
Browse files Browse the repository at this point in the history
This commit changes the resultBlockId used by RddComputeMonotasks from
being a RDDBlockId to being a MonotaskResultBlockId. There's no reason this
result to use a RDDBlockId (because it's temporary data and not where the RDD
will more permanently be stored), and storing it with RDDBlockId can sometimes
trigger a race condition in BlockManager between when the monotask's result
gets cleaned up and when a DiskWriteMonotask writes the result
(NetSys/spark-monotasks#26).
  • Loading branch information
kayousterhout committed Jul 20, 2015
1 parent 5df3963 commit d62c5cd
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.nio.ByteBuffer

import org.apache.spark.{Partition, SparkEnv, TaskContextImpl}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.storage.{MonotaskResultBlockId, StorageLevel}

/**
* Computes the specified partition of the specified RDD and stores the result in the BlockManager.
Expand All @@ -30,12 +30,12 @@ import org.apache.spark.storage.{RDDBlockId, StorageLevel}
private[spark] class RddComputeMonotask[T](context: TaskContextImpl, rdd: RDD[T], split: Partition)
extends ComputeMonotask(context) {

resultBlockId = Some(new RDDBlockId(rdd.id, split.index))
resultBlockId = Some(new MonotaskResultBlockId(taskId))

override def execute(): Option[ByteBuffer] = {
val iterator = rdd.compute(split, context)
SparkEnv.get.blockManager.cacheIterator(
getResultBlockId(), iterator, StorageLevel.MEMORY_ONLY, true)
getResultBlockId(), iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
None
}
}
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ abstract class RDD[T: ClassTag](

// Create a SerializationMonotask to serialize the block and a DiskWriteMonotask to write it
// to disk.
val serializationMonotask = new SerializationMonotask(context, blockId)
val serializationMonotask =
new SerializationMonotask(context, rddComputeMonotask.getResultBlockId())
serializationMonotask.addDependency(rddComputeMonotask)
val diskWriteMonotask = new DiskWriteMonotask(
context, blockId, serializationMonotask.getResultBlockId())
Expand Down

0 comments on commit d62c5cd

Please sign in to comment.