Skip to content
This repository has been archived by the owner on Oct 31, 2018. It is now read-only.

Commit

Permalink
Added separate metrics for disk read time
Browse files Browse the repository at this point in the history
  • Loading branch information
kayousterhout committed May 25, 2017
1 parent eed528e commit ad17278
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ class TaskMetrics extends Serializable {
def diskNanos: Long = _diskNanos
private[spark] def incDiskNanos(value: Long) = _diskNanos += value

private var _diskReadNanos: Long = _
def diskReadNanos: Long = _diskReadNanos
private[spark] def incDiskReadNanos(value: Long) = _diskReadNanos += value

/**
* Total time tht disk monotasks for this macrotask spent queued.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,12 @@ private[spark] class DiskReadMonotask(
val buffer = ByteBuffer.allocate(bytesToRead)

try {
val startTimeMillis = System.currentTimeMillis()
val startTimeNanos = System.nanoTime()
channel.read(buffer)
val totalTime = System.nanoTime() - startTimeNanos
context.taskMetrics.incDiskReadNanos(totalTime)
logDebug(s"Block $blockId (size: ${Utils.bytesToString(bytesToRead)}) read from " +
s"disk $diskId in ${System.currentTimeMillis - startTimeMillis} ms into $buffer")
s"disk $diskId in ${totalTime / 1.0e6} ms into $buffer")
} finally {
channel.close()
stream.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ private[spark] class HdfsReadMonotask(
val stream = path.getFileSystem(hadoopConf).open(path)
val sizeBytes = hadoopSplit.getLength().toInt
val buffer = new Array[Byte](sizeBytes)
val startTime = System.nanoTime()

try {
stream.readFully(hadoopSplit.getStart(), buffer)
Expand All @@ -56,6 +57,7 @@ private[spark] class HdfsReadMonotask(

SparkEnv.get.blockManager.cacheBytes(
getResultBlockId(), ByteBuffer.wrap(buffer), StorageLevel.MEMORY_ONLY_SER, false)
context.taskMetrics.incDiskReadNanos(System.nanoTime() - startTime)
context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop).incBytesRead(sizeBytes)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ private[spark] object JsonProtocol {
("Computation Nanos" -> taskMetrics.computationNanos) ~
("Compute Wait Nanos" -> taskMetrics.computeWaitNanos) ~
("Disk Nanos" -> taskMetrics.diskNanos) ~
("Disk Read Nanos" -> taskMetrics.diskReadNanos) ~
("Disk Wait Nanos" -> taskMetrics.diskWaitNanos) ~
("HDFS Deserialization/Decompression Millis" ->
taskMetrics.hdfsDeserializationDecompressionMillis) ~
Expand Down

0 comments on commit ad17278

Please sign in to comment.