From 09bbd4cad666a7cb73daff00ce51c004bade0908 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Fri, 20 Mar 2015 18:26:23 -0700 Subject: [PATCH] Fix broken Json tests. Previous commits to add utilization information and add instrumentation for the total computation nanoseconds broke the Json related tests (JsonProtocolSuite.scala and ReplayListenerSuite.scala) because (a) these tests weren't properly updated and (b) JsonProtocol.scala did not include functionality to decode the cpu, disk, and network utilization. This commit mostly fixes those issues; the only remaining issue is described in https://github.com/NetSys/spark-monotasks/issues/18. Conflicts: core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala --- .../apache/spark/executor/TaskMetrics.scala | 2 +- .../ContinuousMonitor.scala | 2 +- .../performance_logging/CpuCounters.scala | 5 +- .../performance_logging/DiskUtilization.scala | 60 ++++++++++++------- .../NetworkUtilization.scala | 44 ++++++++++---- .../org/apache/spark/util/JsonProtocol.scala | 50 +++++++++++++++- .../DiskUtilizationSuite.scala | 2 +- .../apache/spark/util/JsonProtocolSuite.scala | 3 + 8 files changed, 128 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 2ffa34d9f..f5788fe79 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -211,7 +211,7 @@ class TaskMetrics extends Serializable { setJvmGCTime(currentGCTotalMillis - startingGCTime) cpuUtilization = Some(new CpuUtilization(startCpuCounters)) networkUtilization = Some(new NetworkUtilization(startNetworkCounters)) - diskUtilization = Some(new DiskUtilization(startDiskCounters)) + diskUtilization = Some(DiskUtilization(startDiskCounters)) } /** diff --git a/core/src/main/scala/org/apache/spark/performance_logging/ContinuousMonitor.scala b/core/src/main/scala/org/apache/spark/performance_logging/ContinuousMonitor.scala index 4f64def84..b986f28ff 100644 --- a/core/src/main/scala/org/apache/spark/performance_logging/ContinuousMonitor.scala +++ b/core/src/main/scala/org/apache/spark/performance_logging/ContinuousMonitor.scala @@ -46,7 +46,7 @@ private[spark] class ContinuousMonitor(sparkConf: SparkConf) { val currentNetworkCounters = new NetworkCounters() val cpuUtilization = new CpuUtilization(previousCpuCounters, currentCpuCounters) - val diskUtilization = new DiskUtilization(previousDiskCounters, currentDiskCounters) + val diskUtilization = DiskUtilization(previousDiskCounters, currentDiskCounters) val networkUtilization = new NetworkUtilization(previousNetworkCounters, currentNetworkCounters) previousCpuCounters = currentCpuCounters diff --git a/core/src/main/scala/org/apache/spark/performance_logging/CpuCounters.scala b/core/src/main/scala/org/apache/spark/performance_logging/CpuCounters.scala index 3f2c12389..7441a48c3 100644 --- a/core/src/main/scala/org/apache/spark/performance_logging/CpuCounters.scala +++ b/core/src/main/scala/org/apache/spark/performance_logging/CpuCounters.scala @@ -22,8 +22,9 @@ import scala.io.Source import org.apache.spark.Logging -class CpuCounters() extends Serializable with Logging { - val timeMillis = System.currentTimeMillis() +class CpuCounters(val timeMillis: Long) extends Serializable with Logging { + + def this() = this(System.currentTimeMillis) // Total CPU time used by the Spark process. var processUserJiffies = 0L diff --git a/core/src/main/scala/org/apache/spark/performance_logging/DiskUtilization.scala b/core/src/main/scala/org/apache/spark/performance_logging/DiskUtilization.scala index fcd7e6354..864e86840 100644 --- a/core/src/main/scala/org/apache/spark/performance_logging/DiskUtilization.scala +++ b/core/src/main/scala/org/apache/spark/performance_logging/DiskUtilization.scala @@ -20,32 +20,50 @@ import scala.collection.mutable.HashMap /** Utilization of a particular block device. */ class BlockDeviceUtilization( - startCounters: BlockDeviceCounters, - endCounters: BlockDeviceCounters, - elapsedMillis: Long) extends Serializable { - val diskUtilization = ((endCounters.millisTotal - startCounters.millisTotal).toFloat / - elapsedMillis) - val readThroughput = ((endCounters.sectorsRead - startCounters.sectorsRead).toFloat * - DiskUtilization.SECTOR_SIZE_BYTES * 1000 / elapsedMillis) - val writeThroughput = ((endCounters.sectorsWritten - startCounters.sectorsWritten).toFloat * - DiskUtilization.SECTOR_SIZE_BYTES * 1000 / elapsedMillis) -} + val diskUtilization: Float, + val readThroughput: Float, + val writeThroughput: Float) + extends Serializable { -class DiskUtilization(startCounters: DiskCounters, endCounters: DiskCounters) extends Serializable { - val deviceNameToUtilization = HashMap[String, BlockDeviceUtilization]() - val elapsedMillis = endCounters.timeMillis - startCounters.timeMillis - endCounters.deviceNameToCounters.foreach { - case (deviceName: String, endCounters: BlockDeviceCounters) => - startCounters.deviceNameToCounters.get(deviceName).foreach { - deviceNameToUtilization += - deviceName -> new BlockDeviceUtilization(_, endCounters, elapsedMillis) - } + def this( + startCounters: BlockDeviceCounters, endCounters: BlockDeviceCounters, elapsedMillis: Long) { + this( + (endCounters.millisTotal - startCounters.millisTotal).toFloat / elapsedMillis, + ((endCounters.sectorsRead - startCounters.sectorsRead).toFloat * + DiskUtilization.SECTOR_SIZE_BYTES * 1000 / elapsedMillis), + ((endCounters.sectorsWritten - startCounters.sectorsWritten).toFloat * + DiskUtilization.SECTOR_SIZE_BYTES * 1000 / elapsedMillis)) } - - def this(startCounters: DiskCounters) = this(startCounters, new DiskCounters()) } +class DiskUtilization( + val elapsedMillis: Long, + val deviceNameToUtilization: HashMap[String, BlockDeviceUtilization]) + extends Serializable + object DiskUtilization { // This is not at all portable -- can be obtained for a particular machine with "fdisk -l". val SECTOR_SIZE_BYTES = 512 + + /** + * Creates a DiskUtilization based on two sets of disk counters. + * + * This constructor lives in this companion object because it needs to do some computation + * (to construct the map of device name to device utilization) before constructing the + * DiskUtilization object. + */ + def apply(startCounters: DiskCounters, endCounters: DiskCounters): DiskUtilization = { + val deviceNameToUtilization = HashMap[String, BlockDeviceUtilization]() + val elapsedMillis = endCounters.timeMillis - startCounters.timeMillis + endCounters.deviceNameToCounters.foreach { + case (deviceName: String, endCounters: BlockDeviceCounters) => + startCounters.deviceNameToCounters.get(deviceName).foreach { + deviceNameToUtilization += + deviceName -> new BlockDeviceUtilization(_, endCounters, elapsedMillis) + } + } + new DiskUtilization(elapsedMillis, deviceNameToUtilization) + } + + def apply(startCounters: DiskCounters): DiskUtilization = apply(startCounters, new DiskCounters()) } diff --git a/core/src/main/scala/org/apache/spark/performance_logging/NetworkUtilization.scala b/core/src/main/scala/org/apache/spark/performance_logging/NetworkUtilization.scala index 71c1eddef..77f1d974f 100644 --- a/core/src/main/scala/org/apache/spark/performance_logging/NetworkUtilization.scala +++ b/core/src/main/scala/org/apache/spark/performance_logging/NetworkUtilization.scala @@ -16,19 +16,37 @@ package org.apache.spark.performance_logging -class NetworkUtilization(startCounters: NetworkCounters, endCounters: NetworkCounters) - extends Serializable { - val elapsedMillis = endCounters.timeMillis - startCounters.timeMillis +class NetworkUtilization( + val elapsedMillis: Long, + val bytesReceivedPerSecond: Float, + val bytesTransmittedPerSecond: Float, + val packetsReceivedPerSecond: Float, + val packetsTransmittedPerSecond: Float) + extends Serializable { - val bytesReceivedPerSecond = - (endCounters.receivedBytes - startCounters.receivedBytes).toFloat * 1000 / elapsedMillis - val bytesTransmittedPerSecond = - (endCounters.transmittedBytes - startCounters.transmittedBytes).toFloat * 1000 / elapsedMillis - val packetsReceivedPerSecond = - (endCounters.receivedPackets - startCounters.receivedPackets).toFloat * 1000 / elapsedMillis - val packetsTransmittedPerSecond = - ((endCounters.transmittedPackets - startCounters.transmittedPackets).toFloat * 1000 / - elapsedMillis) + /** + * This constructor is private because it is used only by other constructors, so they can + * re-use elapsedMillis for all of the throughput calculations. + */ + private def this( + startCounters: NetworkCounters, + endCounters: NetworkCounters, + elapsedMillis: Long) = { + this( + elapsedMillis, + (endCounters.receivedBytes - startCounters.receivedBytes).toFloat * 1000 / elapsedMillis, + ((endCounters.transmittedBytes - startCounters.transmittedBytes).toFloat * 1000 / + elapsedMillis), + (endCounters.receivedPackets - startCounters.receivedPackets).toFloat * 1000 / elapsedMillis, + ((endCounters.transmittedPackets - startCounters.transmittedPackets).toFloat * 1000 / + elapsedMillis)) + } - def this(startCounters: NetworkCounters) = this(startCounters, new NetworkCounters()) + def this(startCounters: NetworkCounters, endCounters: NetworkCounters) = { + this(startCounters, endCounters, endCounters.timeMillis - startCounters.timeMillis) + } + + def this(startCounters: NetworkCounters) = { + this(startCounters, new NetworkCounters()) + } } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 82eb189de..f6d112f71 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -39,6 +39,7 @@ import org.apache.spark.scheduler.cluster.ExecutorInfo import scala.collection.JavaConverters._ import scala.collection.Map +import scala.collection.mutable.HashMap import org.json4s.DefaultFormats import org.json4s.JsonDSL._ @@ -317,7 +318,6 @@ private[spark] object JsonProtocol { ("Input Metrics" -> inputMetrics) ~ ("Output Metrics" -> outputMetrics) ~ ("Updated Blocks" -> updatedBlocks) ~ - ("Updated Blocks" -> updatedBlocks) ~ ("Cpu Utilization" -> cpuUtilization) ~ ("Disk Utilization" -> diskUtilization) ~ ("Network Utilization" -> networkUtilization) @@ -353,6 +353,7 @@ private[spark] object JsonProtocol { } def networkUtilizationToJson(networkUtilization: NetworkUtilization): JValue = { + ("Elapsed Millis" -> networkUtilization.elapsedMillis) ~ ("Bytes Received Per Second" -> networkUtilization.bytesReceivedPerSecond) ~ ("Bytes Transmitted Per Second" -> networkUtilization.bytesTransmittedPerSecond) ~ ("Packets Received Per Second" -> networkUtilization.packetsReceivedPerSecond) ~ @@ -745,6 +746,12 @@ private[spark] object JsonProtocol { (id, status) } } + metrics.cpuUtilization = + Utils.jsonOption(json \ "Cpu Utilization").map(cpuUtilizationFromJson) + metrics.diskUtilization = + Utils.jsonOption(json \ "Disk Utilization").map(diskUtilizationFromJson) + metrics.networkUtilization = + Utils.jsonOption(json \ "Network Utilization").map(networkUtilizationFromJson) metrics } @@ -784,6 +791,47 @@ private[spark] object JsonProtocol { metrics } + def cpuCountersFromJson(json: JValue): CpuCounters = { + val cpuCounters = new CpuCounters((json \ "Time Milliseconds").extract[Long]) + cpuCounters.processUserJiffies = (json \ "Process User Jiffies").extract[Long] + cpuCounters.processSystemJiffies = (json \ "Process System Jiffies").extract[Long] + cpuCounters.totalUserJiffies = (json \ "Total User Jiffies").extract[Long] + cpuCounters.totalSystemJiffies = (json \ "Total System Jiffies").extract[Long] + cpuCounters + } + + def cpuUtilizationFromJson(json: JValue): CpuUtilization = { + val startCounters = cpuCountersFromJson(json \ "Start Counters") + val endCounters = cpuCountersFromJson(json \ "End Counters") + new CpuUtilization(startCounters, endCounters) + } + + def blockDeviceUtilizationFromJson(json: JValue): BlockDeviceUtilization = { + new BlockDeviceUtilization( + (json \ "Disk Utilization").extract[Float], + (json \ "Read Throughput").extract[Float], + (json \ "Write Throughput").extract[Float]) + } + + def diskUtilizationFromJson(json: JValue): DiskUtilization = { + // TODO: This does not currently decode the block device utilization, which should be done + // using blockDeviceUtilizationFromJson! Decoding the mapping of names to utilizations is + // tricky because of the way the Json object is structured. + // https://github.com/NetSys/spark-monotasks/issues/18 + new DiskUtilization( + (json \ "Elapsed Millis").extract[Long], + new HashMap[String, BlockDeviceUtilization]()) + } + + def networkUtilizationFromJson(json: JValue): NetworkUtilization = { + new NetworkUtilization( + (json \ "Elapsed Millis").extract[Long], + (json \ "Bytes Received Per Second").extract[Float], + (json \ "Bytes Transmitted Per Second").extract[Float], + (json \ "Packets Received Per Second").extract[Float], + (json \ "Packets Transmitted Per Second").extract[Float]) + } + def taskEndReasonFromJson(json: JValue): TaskEndReason = { val success = Utils.getFormattedClassName(Success) val resubmitted = Utils.getFormattedClassName(Resubmitted) diff --git a/core/src/test/scala/org/apache/spark/performange_logging/DiskUtilizationSuite.scala b/core/src/test/scala/org/apache/spark/performange_logging/DiskUtilizationSuite.scala index b315f2b8c..f11725a01 100644 --- a/core/src/test/scala/org/apache/spark/performange_logging/DiskUtilizationSuite.scala +++ b/core/src/test/scala/org/apache/spark/performange_logging/DiskUtilizationSuite.scala @@ -72,7 +72,7 @@ class DiskUtilizationSuite extends FunSuite { val endCounters = new DiskCounters(2048, endNameToCounters) - val utilization = new DiskUtilization(startCounters, endCounters) + val utilization = DiskUtilization(startCounters, endCounters) assert(utilization.deviceNameToUtilization.contains("disk1")) val disk1Utilization = utilization.deviceNameToUtilization("disk1") assert(0 === disk1Utilization.readThroughput) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 22c8b1d01..b5daa14b0 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -690,6 +690,7 @@ class JsonProtocolSuite extends FunSuite { t.setHostname("localhost") t.setExecutorDeserializeTime(a) t.setExecutorRunTime(b) + t.setComputationNanos(b + 50) t.setResultSize(c) t.setJvmGCTime(d) t.setResultSerializationTime(a + b) @@ -945,6 +946,7 @@ class JsonProtocolSuite extends FunSuite { | "Host Name": "localhost", | "Executor Deserialize Time": 300, | "Executor Run Time": 400, + | "Computation Nanos": 450, | "Result Size": 500, | "JVM GC Time": 600, | "Result Serialization Time": 700, @@ -1031,6 +1033,7 @@ class JsonProtocolSuite extends FunSuite { | "Host Name": "localhost", | "Executor Deserialize Time": 300, | "Executor Run Time": 400, + | "Computation Nanos": 450, | "Result Size": 500, | "JVM GC Time": 600, | "Result Serialization Time": 700,