diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 2ffa34d9f..f5788fe79 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -211,7 +211,7 @@ class TaskMetrics extends Serializable { setJvmGCTime(currentGCTotalMillis - startingGCTime) cpuUtilization = Some(new CpuUtilization(startCpuCounters)) networkUtilization = Some(new NetworkUtilization(startNetworkCounters)) - diskUtilization = Some(new DiskUtilization(startDiskCounters)) + diskUtilization = Some(DiskUtilization(startDiskCounters)) } /** diff --git a/core/src/main/scala/org/apache/spark/performance_logging/ContinuousMonitor.scala b/core/src/main/scala/org/apache/spark/performance_logging/ContinuousMonitor.scala index 4f64def84..b986f28ff 100644 --- a/core/src/main/scala/org/apache/spark/performance_logging/ContinuousMonitor.scala +++ b/core/src/main/scala/org/apache/spark/performance_logging/ContinuousMonitor.scala @@ -46,7 +46,7 @@ private[spark] class ContinuousMonitor(sparkConf: SparkConf) { val currentNetworkCounters = new NetworkCounters() val cpuUtilization = new CpuUtilization(previousCpuCounters, currentCpuCounters) - val diskUtilization = new DiskUtilization(previousDiskCounters, currentDiskCounters) + val diskUtilization = DiskUtilization(previousDiskCounters, currentDiskCounters) val networkUtilization = new NetworkUtilization(previousNetworkCounters, currentNetworkCounters) previousCpuCounters = currentCpuCounters diff --git a/core/src/main/scala/org/apache/spark/performance_logging/CpuCounters.scala b/core/src/main/scala/org/apache/spark/performance_logging/CpuCounters.scala index 3f2c12389..7441a48c3 100644 --- a/core/src/main/scala/org/apache/spark/performance_logging/CpuCounters.scala +++ b/core/src/main/scala/org/apache/spark/performance_logging/CpuCounters.scala @@ -22,8 +22,9 @@ import scala.io.Source import org.apache.spark.Logging -class CpuCounters() extends Serializable with Logging { - val timeMillis = System.currentTimeMillis() +class CpuCounters(val timeMillis: Long) extends Serializable with Logging { + + def this() = this(System.currentTimeMillis) // Total CPU time used by the Spark process. var processUserJiffies = 0L diff --git a/core/src/main/scala/org/apache/spark/performance_logging/DiskUtilization.scala b/core/src/main/scala/org/apache/spark/performance_logging/DiskUtilization.scala index fcd7e6354..864e86840 100644 --- a/core/src/main/scala/org/apache/spark/performance_logging/DiskUtilization.scala +++ b/core/src/main/scala/org/apache/spark/performance_logging/DiskUtilization.scala @@ -20,32 +20,50 @@ import scala.collection.mutable.HashMap /** Utilization of a particular block device. */ class BlockDeviceUtilization( - startCounters: BlockDeviceCounters, - endCounters: BlockDeviceCounters, - elapsedMillis: Long) extends Serializable { - val diskUtilization = ((endCounters.millisTotal - startCounters.millisTotal).toFloat / - elapsedMillis) - val readThroughput = ((endCounters.sectorsRead - startCounters.sectorsRead).toFloat * - DiskUtilization.SECTOR_SIZE_BYTES * 1000 / elapsedMillis) - val writeThroughput = ((endCounters.sectorsWritten - startCounters.sectorsWritten).toFloat * - DiskUtilization.SECTOR_SIZE_BYTES * 1000 / elapsedMillis) -} + val diskUtilization: Float, + val readThroughput: Float, + val writeThroughput: Float) + extends Serializable { -class DiskUtilization(startCounters: DiskCounters, endCounters: DiskCounters) extends Serializable { - val deviceNameToUtilization = HashMap[String, BlockDeviceUtilization]() - val elapsedMillis = endCounters.timeMillis - startCounters.timeMillis - endCounters.deviceNameToCounters.foreach { - case (deviceName: String, endCounters: BlockDeviceCounters) => - startCounters.deviceNameToCounters.get(deviceName).foreach { - deviceNameToUtilization += - deviceName -> new BlockDeviceUtilization(_, endCounters, elapsedMillis) - } + def this( + startCounters: BlockDeviceCounters, endCounters: BlockDeviceCounters, elapsedMillis: Long) { + this( + (endCounters.millisTotal - startCounters.millisTotal).toFloat / elapsedMillis, + ((endCounters.sectorsRead - startCounters.sectorsRead).toFloat * + DiskUtilization.SECTOR_SIZE_BYTES * 1000 / elapsedMillis), + ((endCounters.sectorsWritten - startCounters.sectorsWritten).toFloat * + DiskUtilization.SECTOR_SIZE_BYTES * 1000 / elapsedMillis)) } - - def this(startCounters: DiskCounters) = this(startCounters, new DiskCounters()) } +class DiskUtilization( + val elapsedMillis: Long, + val deviceNameToUtilization: HashMap[String, BlockDeviceUtilization]) + extends Serializable + object DiskUtilization { // This is not at all portable -- can be obtained for a particular machine with "fdisk -l". val SECTOR_SIZE_BYTES = 512 + + /** + * Creates a DiskUtilization based on two sets of disk counters. + * + * This constructor lives in this companion object because it needs to do some computation + * (to construct the map of device name to device utilization) before constructing the + * DiskUtilization object. + */ + def apply(startCounters: DiskCounters, endCounters: DiskCounters): DiskUtilization = { + val deviceNameToUtilization = HashMap[String, BlockDeviceUtilization]() + val elapsedMillis = endCounters.timeMillis - startCounters.timeMillis + endCounters.deviceNameToCounters.foreach { + case (deviceName: String, endCounters: BlockDeviceCounters) => + startCounters.deviceNameToCounters.get(deviceName).foreach { + deviceNameToUtilization += + deviceName -> new BlockDeviceUtilization(_, endCounters, elapsedMillis) + } + } + new DiskUtilization(elapsedMillis, deviceNameToUtilization) + } + + def apply(startCounters: DiskCounters): DiskUtilization = apply(startCounters, new DiskCounters()) } diff --git a/core/src/main/scala/org/apache/spark/performance_logging/NetworkUtilization.scala b/core/src/main/scala/org/apache/spark/performance_logging/NetworkUtilization.scala index 71c1eddef..77f1d974f 100644 --- a/core/src/main/scala/org/apache/spark/performance_logging/NetworkUtilization.scala +++ b/core/src/main/scala/org/apache/spark/performance_logging/NetworkUtilization.scala @@ -16,19 +16,37 @@ package org.apache.spark.performance_logging -class NetworkUtilization(startCounters: NetworkCounters, endCounters: NetworkCounters) - extends Serializable { - val elapsedMillis = endCounters.timeMillis - startCounters.timeMillis +class NetworkUtilization( + val elapsedMillis: Long, + val bytesReceivedPerSecond: Float, + val bytesTransmittedPerSecond: Float, + val packetsReceivedPerSecond: Float, + val packetsTransmittedPerSecond: Float) + extends Serializable { - val bytesReceivedPerSecond = - (endCounters.receivedBytes - startCounters.receivedBytes).toFloat * 1000 / elapsedMillis - val bytesTransmittedPerSecond = - (endCounters.transmittedBytes - startCounters.transmittedBytes).toFloat * 1000 / elapsedMillis - val packetsReceivedPerSecond = - (endCounters.receivedPackets - startCounters.receivedPackets).toFloat * 1000 / elapsedMillis - val packetsTransmittedPerSecond = - ((endCounters.transmittedPackets - startCounters.transmittedPackets).toFloat * 1000 / - elapsedMillis) + /** + * This constructor is private because it is used only by other constructors, so they can + * re-use elapsedMillis for all of the throughput calculations. + */ + private def this( + startCounters: NetworkCounters, + endCounters: NetworkCounters, + elapsedMillis: Long) = { + this( + elapsedMillis, + (endCounters.receivedBytes - startCounters.receivedBytes).toFloat * 1000 / elapsedMillis, + ((endCounters.transmittedBytes - startCounters.transmittedBytes).toFloat * 1000 / + elapsedMillis), + (endCounters.receivedPackets - startCounters.receivedPackets).toFloat * 1000 / elapsedMillis, + ((endCounters.transmittedPackets - startCounters.transmittedPackets).toFloat * 1000 / + elapsedMillis)) + } - def this(startCounters: NetworkCounters) = this(startCounters, new NetworkCounters()) + def this(startCounters: NetworkCounters, endCounters: NetworkCounters) = { + this(startCounters, endCounters, endCounters.timeMillis - startCounters.timeMillis) + } + + def this(startCounters: NetworkCounters) = { + this(startCounters, new NetworkCounters()) + } } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 82eb189de..f6d112f71 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -39,6 +39,7 @@ import org.apache.spark.scheduler.cluster.ExecutorInfo import scala.collection.JavaConverters._ import scala.collection.Map +import scala.collection.mutable.HashMap import org.json4s.DefaultFormats import org.json4s.JsonDSL._ @@ -317,7 +318,6 @@ private[spark] object JsonProtocol { ("Input Metrics" -> inputMetrics) ~ ("Output Metrics" -> outputMetrics) ~ ("Updated Blocks" -> updatedBlocks) ~ - ("Updated Blocks" -> updatedBlocks) ~ ("Cpu Utilization" -> cpuUtilization) ~ ("Disk Utilization" -> diskUtilization) ~ ("Network Utilization" -> networkUtilization) @@ -353,6 +353,7 @@ private[spark] object JsonProtocol { } def networkUtilizationToJson(networkUtilization: NetworkUtilization): JValue = { + ("Elapsed Millis" -> networkUtilization.elapsedMillis) ~ ("Bytes Received Per Second" -> networkUtilization.bytesReceivedPerSecond) ~ ("Bytes Transmitted Per Second" -> networkUtilization.bytesTransmittedPerSecond) ~ ("Packets Received Per Second" -> networkUtilization.packetsReceivedPerSecond) ~ @@ -745,6 +746,12 @@ private[spark] object JsonProtocol { (id, status) } } + metrics.cpuUtilization = + Utils.jsonOption(json \ "Cpu Utilization").map(cpuUtilizationFromJson) + metrics.diskUtilization = + Utils.jsonOption(json \ "Disk Utilization").map(diskUtilizationFromJson) + metrics.networkUtilization = + Utils.jsonOption(json \ "Network Utilization").map(networkUtilizationFromJson) metrics } @@ -784,6 +791,47 @@ private[spark] object JsonProtocol { metrics } + def cpuCountersFromJson(json: JValue): CpuCounters = { + val cpuCounters = new CpuCounters((json \ "Time Milliseconds").extract[Long]) + cpuCounters.processUserJiffies = (json \ "Process User Jiffies").extract[Long] + cpuCounters.processSystemJiffies = (json \ "Process System Jiffies").extract[Long] + cpuCounters.totalUserJiffies = (json \ "Total User Jiffies").extract[Long] + cpuCounters.totalSystemJiffies = (json \ "Total System Jiffies").extract[Long] + cpuCounters + } + + def cpuUtilizationFromJson(json: JValue): CpuUtilization = { + val startCounters = cpuCountersFromJson(json \ "Start Counters") + val endCounters = cpuCountersFromJson(json \ "End Counters") + new CpuUtilization(startCounters, endCounters) + } + + def blockDeviceUtilizationFromJson(json: JValue): BlockDeviceUtilization = { + new BlockDeviceUtilization( + (json \ "Disk Utilization").extract[Float], + (json \ "Read Throughput").extract[Float], + (json \ "Write Throughput").extract[Float]) + } + + def diskUtilizationFromJson(json: JValue): DiskUtilization = { + // TODO: This does not currently decode the block device utilization, which should be done + // using blockDeviceUtilizationFromJson! Decoding the mapping of names to utilizations is + // tricky because of the way the Json object is structured. + // https://github.com/NetSys/spark-monotasks/issues/18 + new DiskUtilization( + (json \ "Elapsed Millis").extract[Long], + new HashMap[String, BlockDeviceUtilization]()) + } + + def networkUtilizationFromJson(json: JValue): NetworkUtilization = { + new NetworkUtilization( + (json \ "Elapsed Millis").extract[Long], + (json \ "Bytes Received Per Second").extract[Float], + (json \ "Bytes Transmitted Per Second").extract[Float], + (json \ "Packets Received Per Second").extract[Float], + (json \ "Packets Transmitted Per Second").extract[Float]) + } + def taskEndReasonFromJson(json: JValue): TaskEndReason = { val success = Utils.getFormattedClassName(Success) val resubmitted = Utils.getFormattedClassName(Resubmitted) diff --git a/core/src/test/scala/org/apache/spark/performange_logging/DiskUtilizationSuite.scala b/core/src/test/scala/org/apache/spark/performange_logging/DiskUtilizationSuite.scala index b315f2b8c..f11725a01 100644 --- a/core/src/test/scala/org/apache/spark/performange_logging/DiskUtilizationSuite.scala +++ b/core/src/test/scala/org/apache/spark/performange_logging/DiskUtilizationSuite.scala @@ -72,7 +72,7 @@ class DiskUtilizationSuite extends FunSuite { val endCounters = new DiskCounters(2048, endNameToCounters) - val utilization = new DiskUtilization(startCounters, endCounters) + val utilization = DiskUtilization(startCounters, endCounters) assert(utilization.deviceNameToUtilization.contains("disk1")) val disk1Utilization = utilization.deviceNameToUtilization("disk1") assert(0 === disk1Utilization.readThroughput) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 22c8b1d01..b5daa14b0 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -690,6 +690,7 @@ class JsonProtocolSuite extends FunSuite { t.setHostname("localhost") t.setExecutorDeserializeTime(a) t.setExecutorRunTime(b) + t.setComputationNanos(b + 50) t.setResultSize(c) t.setJvmGCTime(d) t.setResultSerializationTime(a + b) @@ -945,6 +946,7 @@ class JsonProtocolSuite extends FunSuite { | "Host Name": "localhost", | "Executor Deserialize Time": 300, | "Executor Run Time": 400, + | "Computation Nanos": 450, | "Result Size": 500, | "JVM GC Time": 600, | "Result Serialization Time": 700, @@ -1031,6 +1033,7 @@ class JsonProtocolSuite extends FunSuite { | "Host Name": "localhost", | "Executor Deserialize Time": 300, | "Executor Run Time": 400, + | "Computation Nanos": 450, | "Result Size": 500, | "JVM GC Time": 600, | "Result Serialization Time": 700,