diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 9e0e9946b..e6ca5d034 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -106,6 +106,10 @@ class TaskMetrics extends Serializable { def diskNanos: Long = _diskNanos private[spark] def incDiskNanos(value: Long) = _diskNanos += value + private var _diskReadNanos: Long = _ + def diskReadNanos: Long = _diskReadNanos + private[spark] def incDiskReadNanos(value: Long) = _diskReadNanos += value + /** * Total time tht disk monotasks for this macrotask spent queued. */ diff --git a/core/src/main/scala/org/apache/spark/monotasks/disk/DiskReadMonotask.scala b/core/src/main/scala/org/apache/spark/monotasks/disk/DiskReadMonotask.scala index 1e5f229f2..af0942db9 100644 --- a/core/src/main/scala/org/apache/spark/monotasks/disk/DiskReadMonotask.scala +++ b/core/src/main/scala/org/apache/spark/monotasks/disk/DiskReadMonotask.scala @@ -81,10 +81,12 @@ private[spark] class DiskReadMonotask( val buffer = ByteBuffer.allocate(bytesToRead) try { - val startTimeMillis = System.currentTimeMillis() + val startTimeNanos = System.nanoTime() channel.read(buffer) + val totalTime = System.nanoTime() - startTimeNanos + context.taskMetrics.incDiskReadNanos(totalTime) logDebug(s"Block $blockId (size: ${Utils.bytesToString(bytesToRead)}) read from " + - s"disk $diskId in ${System.currentTimeMillis - startTimeMillis} ms into $buffer") + s"disk $diskId in ${totalTime / 1.0e6} ms into $buffer") } finally { channel.close() stream.close() diff --git a/core/src/main/scala/org/apache/spark/monotasks/disk/HdfsReadMonotask.scala b/core/src/main/scala/org/apache/spark/monotasks/disk/HdfsReadMonotask.scala index ad277f2f6..bdb4dabaa 100644 --- a/core/src/main/scala/org/apache/spark/monotasks/disk/HdfsReadMonotask.scala +++ b/core/src/main/scala/org/apache/spark/monotasks/disk/HdfsReadMonotask.scala @@ -47,6 +47,7 @@ private[spark] class HdfsReadMonotask( val stream = path.getFileSystem(hadoopConf).open(path) val sizeBytes = hadoopSplit.getLength().toInt val buffer = new Array[Byte](sizeBytes) + val startTime = System.nanoTime() try { stream.readFully(hadoopSplit.getStart(), buffer) @@ -56,6 +57,7 @@ private[spark] class HdfsReadMonotask( SparkEnv.get.blockManager.cacheBytes( getResultBlockId(), ByteBuffer.wrap(buffer), StorageLevel.MEMORY_ONLY_SER, false) + context.taskMetrics.incDiskReadNanos(System.nanoTime() - startTime) context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop).incBytesRead(sizeBytes) } } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 15d44a49b..a9200ce78 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -309,6 +309,7 @@ private[spark] object JsonProtocol { ("Computation Nanos" -> taskMetrics.computationNanos) ~ ("Compute Wait Nanos" -> taskMetrics.computeWaitNanos) ~ ("Disk Nanos" -> taskMetrics.diskNanos) ~ + ("Disk Read Nanos" -> taskMetrics.diskReadNanos) ~ ("Disk Wait Nanos" -> taskMetrics.diskWaitNanos) ~ ("HDFS Deserialization/Decompression Millis" -> taskMetrics.hdfsDeserializationDecompressionMillis) ~