diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 93fff4526..3f6cdaef2 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -75,6 +75,12 @@ class TaskMetrics extends Serializable { */ var executorRunTime: Long = _ + /** + * Total time consumed by compute monotasks for this macrotask. May be larger than executorRunTime + * if multiple compute monotasks ran simultaneously. + */ + var computationNanos: Long = _ + /** * The number of bytes this task transmitted back to the driver as the TaskResult */ diff --git a/core/src/main/scala/org/apache/spark/monotasks/compute/ComputeMonotask.scala b/core/src/main/scala/org/apache/spark/monotasks/compute/ComputeMonotask.scala index 8124a545a..7d477e765 100644 --- a/core/src/main/scala/org/apache/spark/monotasks/compute/ComputeMonotask.scala +++ b/core/src/main/scala/org/apache/spark/monotasks/compute/ComputeMonotask.scala @@ -35,6 +35,7 @@ private[spark] abstract class ComputeMonotask(context: TaskContext) /** Runs the execute method and handles common exceptions thrown by ComputeMonotasks. */ def executeAndHandleExceptions() { + val startTimeNanos = System.nanoTime() try { Accumulators.registeredAccumulables.set(context.accumulators) val result = execute() @@ -66,6 +67,8 @@ private[spark] abstract class ComputeMonotask(context: TaskContext) val closureSerializer = context.env.closureSerializer.newInstance() context.localDagScheduler.handleTaskFailure(this, closureSerializer.serialize(reason)) } + } finally { + context.taskMetrics.computationNanos = System.nanoTime() - startTimeNanos } } } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index a8edba374..426b4add6 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -261,6 +261,7 @@ private[spark] object JsonProtocol { ("Host Name" -> taskMetrics.hostname) ~ ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~ ("Executor Run Time" -> taskMetrics.executorRunTime) ~ + ("Computation Nanos" -> taskMetrics.computationNanos) ~ ("Result Size" -> taskMetrics.resultSize) ~ ("JVM GC Time" -> taskMetrics.jvmGCTime) ~ ("Result Serialization Time" -> taskMetrics.resultSerializationTime) ~ @@ -619,6 +620,8 @@ private[spark] object JsonProtocol { metrics.hostname = (json \ "Host Name").extract[String] metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long] metrics.executorRunTime = (json \ "Executor Run Time").extract[Long] + metrics.computationNanos = + Utils.jsonOption(json \ "Computation Nanos").map(_.extract[Long]).getOrElse(0L) metrics.resultSize = (json \ "Result Size").extract[Long] metrics.jvmGCTime = (json \ "JVM GC Time").extract[Long] metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long]