Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions core/src/main/scala/spark/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -635,13 +635,14 @@ abstract class RDD[T: ClassManifest](
*/
def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
var jobResult = Utils.clone(Option.empty[U], sc.env.closureSerializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
def optCombOp(a: Option[U], b: Option[U]): Option[U] = for (u <- b) yield a.fold(u)(cleanCombOp(_, _))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess

for (u <- b) yield a.fold(u)(combOp(_, _))

should be just as good, maybe better.

val aggregatePartition = (it: Iterator[T]) => Option(it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp))
val mergeResult = (index: Int, taskResult: Option[U]) => jobResult = optCombOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
jobResult.get
}

/**
Expand Down