Skip to content
This repository has been archived by the owner on Oct 31, 2018. It is now read-only.

Commit

Permalink
Fix broken Json tests.
Browse files Browse the repository at this point in the history
Previous commits to add utilization information and add instrumentation
for the total computation nanoseconds broke the Json related tests
(JsonProtocolSuite.scala and ReplayListenerSuite.scala) because (a)
these tests weren't properly updated and (b) JsonProtocol.scala did
not include functionality to decode the cpu, disk, and network
utilization. This commit mostly fixes those issues; the only
remaining issue is described in
#18.

Conflicts:
	core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
  • Loading branch information
kayousterhout authored and root committed Jun 2, 2015
1 parent 6cc869b commit 9393fd1
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class TaskMetrics extends Serializable {
def setUtilization() {
cpuUtilization = Some(new CpuUtilization(startCpuCounters))
networkUtilization = Some(new NetworkUtilization(startNetworkCounters))
diskUtilization = Some(new DiskUtilization(startDiskCounters))
diskUtilization = Some(DiskUtilization(startDiskCounters))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[spark] class ContinuousMonitor(sparkConf: SparkConf) {
val currentNetworkCounters = new NetworkCounters()

val cpuUtilization = new CpuUtilization(previousCpuCounters, currentCpuCounters)
val diskUtilization = new DiskUtilization(previousDiskCounters, currentDiskCounters)
val diskUtilization = DiskUtilization(previousDiskCounters, currentDiskCounters)
val networkUtilization = new NetworkUtilization(previousNetworkCounters, currentNetworkCounters)

previousCpuCounters = currentCpuCounters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import scala.io.Source

import org.apache.spark.Logging

class CpuCounters() extends Serializable with Logging {
val timeMillis = System.currentTimeMillis()
class CpuCounters(val timeMillis: Long) extends Serializable with Logging {

def this() = this(System.currentTimeMillis)

// Total CPU time used by the Spark process.
var processUserJiffies = 0L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,50 @@ import scala.collection.mutable.HashMap

/** Utilization of a particular block device. */
class BlockDeviceUtilization(
startCounters: BlockDeviceCounters,
endCounters: BlockDeviceCounters,
elapsedMillis: Long) extends Serializable {
val diskUtilization = ((endCounters.millisTotal - startCounters.millisTotal).toFloat /
elapsedMillis)
val readThroughput = ((endCounters.sectorsRead - startCounters.sectorsRead).toFloat *
DiskUtilization.SECTOR_SIZE_BYTES * 1000 / elapsedMillis)
val writeThroughput = ((endCounters.sectorsWritten - startCounters.sectorsWritten).toFloat *
DiskUtilization.SECTOR_SIZE_BYTES * 1000 / elapsedMillis)
}
val diskUtilization: Float,
val readThroughput: Float,
val writeThroughput: Float)
extends Serializable {

class DiskUtilization(startCounters: DiskCounters, endCounters: DiskCounters) extends Serializable {
val deviceNameToUtilization = HashMap[String, BlockDeviceUtilization]()
val elapsedMillis = endCounters.timeMillis - startCounters.timeMillis
endCounters.deviceNameToCounters.foreach {
case (deviceName: String, endCounters: BlockDeviceCounters) =>
startCounters.deviceNameToCounters.get(deviceName).foreach {
deviceNameToUtilization +=
deviceName -> new BlockDeviceUtilization(_, endCounters, elapsedMillis)
}
def this(
startCounters: BlockDeviceCounters, endCounters: BlockDeviceCounters, elapsedMillis: Long) {
this(
(endCounters.millisTotal - startCounters.millisTotal).toFloat / elapsedMillis,
((endCounters.sectorsRead - startCounters.sectorsRead).toFloat *
DiskUtilization.SECTOR_SIZE_BYTES * 1000 / elapsedMillis),
((endCounters.sectorsWritten - startCounters.sectorsWritten).toFloat *
DiskUtilization.SECTOR_SIZE_BYTES * 1000 / elapsedMillis))
}

def this(startCounters: DiskCounters) = this(startCounters, new DiskCounters())
}

class DiskUtilization(
val elapsedMillis: Long,
val deviceNameToUtilization: HashMap[String, BlockDeviceUtilization])
extends Serializable

object DiskUtilization {
// This is not at all portable -- can be obtained for a particular machine with "fdisk -l".
val SECTOR_SIZE_BYTES = 512

/**
* Creates a DiskUtilization based on two sets of disk counters.
*
* This constructor lives in this companion object because it needs to do some computation
* (to construct the map of device name to device utilization) before constructing the
* DiskUtilization object.
*/
def apply(startCounters: DiskCounters, endCounters: DiskCounters): DiskUtilization = {
val deviceNameToUtilization = HashMap[String, BlockDeviceUtilization]()
val elapsedMillis = endCounters.timeMillis - startCounters.timeMillis
endCounters.deviceNameToCounters.foreach {
case (deviceName: String, endCounters: BlockDeviceCounters) =>
startCounters.deviceNameToCounters.get(deviceName).foreach {
deviceNameToUtilization +=
deviceName -> new BlockDeviceUtilization(_, endCounters, elapsedMillis)
}
}
new DiskUtilization(elapsedMillis, deviceNameToUtilization)
}

def apply(startCounters: DiskCounters): DiskUtilization = apply(startCounters, new DiskCounters())
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,37 @@

package org.apache.spark.performance_logging

class NetworkUtilization(startCounters: NetworkCounters, endCounters: NetworkCounters)
extends Serializable {
val elapsedMillis = endCounters.timeMillis - startCounters.timeMillis
class NetworkUtilization(
val elapsedMillis: Long,
val bytesReceivedPerSecond: Float,
val bytesTransmittedPerSecond: Float,
val packetsReceivedPerSecond: Float,
val packetsTransmittedPerSecond: Float)
extends Serializable {

val bytesReceivedPerSecond =
(endCounters.receivedBytes - startCounters.receivedBytes).toFloat * 1000 / elapsedMillis
val bytesTransmittedPerSecond =
(endCounters.transmittedBytes - startCounters.transmittedBytes).toFloat * 1000 / elapsedMillis
val packetsReceivedPerSecond =
(endCounters.receivedPackets - startCounters.receivedPackets).toFloat * 1000 / elapsedMillis
val packetsTransmittedPerSecond =
((endCounters.transmittedPackets - startCounters.transmittedPackets).toFloat * 1000 /
elapsedMillis)
/**
* This constructor is private because it is used only by other constructors, so they can
* re-use elapsedMillis for all of the throughput calculations.
*/
private def this(
startCounters: NetworkCounters,
endCounters: NetworkCounters,
elapsedMillis: Long) = {
this(
elapsedMillis,
(endCounters.receivedBytes - startCounters.receivedBytes).toFloat * 1000 / elapsedMillis,
((endCounters.transmittedBytes - startCounters.transmittedBytes).toFloat * 1000 /
elapsedMillis),
(endCounters.receivedPackets - startCounters.receivedPackets).toFloat * 1000 / elapsedMillis,
((endCounters.transmittedPackets - startCounters.transmittedPackets).toFloat * 1000 /
elapsedMillis))
}

def this(startCounters: NetworkCounters) = this(startCounters, new NetworkCounters())
def this(startCounters: NetworkCounters, endCounters: NetworkCounters) = {
this(startCounters, endCounters, endCounters.timeMillis - startCounters.timeMillis)
}

def this(startCounters: NetworkCounters) = {
this(startCounters, new NetworkCounters())
}
}
57 changes: 54 additions & 3 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.scheduler.cluster.ExecutorInfo

import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.mutable.HashMap

import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
Expand Down Expand Up @@ -316,7 +317,6 @@ private[spark] object JsonProtocol {
("Input Metrics" -> inputMetrics) ~
("Output Metrics" -> outputMetrics) ~
("Updated Blocks" -> updatedBlocks) ~
("Updated Blocks" -> updatedBlocks) ~
("Cpu Utilization" -> cpuUtilization) ~
("Disk Utilization" -> diskUtilization) ~
("Network Utilization" -> networkUtilization)
Expand Down Expand Up @@ -346,12 +346,16 @@ private[spark] object JsonProtocol {
}

def diskUtilizationToJson(diskUtilization: DiskUtilization): JValue = {
diskUtilization.deviceNameToUtilization.map { diskNameAndUtil =>
diskNameAndUtil._1 -> blockDeviceUtilizationToJson(diskNameAndUtil._2)
val deviceNameToUtilizationJson = diskUtilization.deviceNameToUtilization.map {
diskNameAndUtil =>
diskNameAndUtil._1 -> blockDeviceUtilizationToJson(diskNameAndUtil._2)
}
("Elapsed Millis" -> diskUtilization.elapsedMillis) ~
("Device Name To Utilization" -> deviceNameToUtilizationJson)
}

def networkUtilizationToJson(networkUtilization: NetworkUtilization): JValue = {
("Elapsed Millis" -> networkUtilization.elapsedMillis) ~
("Bytes Received Per Second" -> networkUtilization.bytesReceivedPerSecond) ~
("Bytes Transmitted Per Second" -> networkUtilization.bytesTransmittedPerSecond) ~
("Packets Received Per Second" -> networkUtilization.packetsReceivedPerSecond) ~
Expand Down Expand Up @@ -742,6 +746,12 @@ private[spark] object JsonProtocol {
(id, status)
}
}
metrics.cpuUtilization =
Utils.jsonOption(json \ "Cpu Utilization").map(cpuUtilizationFromJson)
metrics.diskUtilization =
Utils.jsonOption(json \ "Disk Utilization").map(diskUtilizationFromJson)
metrics.networkUtilization =
Utils.jsonOption(json \ "Network Utilization").map(networkUtilizationFromJson)
metrics
}

Expand Down Expand Up @@ -781,6 +791,47 @@ private[spark] object JsonProtocol {
metrics
}

def cpuCountersFromJson(json: JValue): CpuCounters = {
val cpuCounters = new CpuCounters((json \ "Time Milliseconds").extract[Long])
cpuCounters.processUserJiffies = (json \ "Process User Jiffies").extract[Long]
cpuCounters.processSystemJiffies = (json \ "Process System Jiffies").extract[Long]
cpuCounters.totalUserJiffies = (json \ "Total User Jiffies").extract[Long]
cpuCounters.totalSystemJiffies = (json \ "Total System Jiffies").extract[Long]
cpuCounters
}

def cpuUtilizationFromJson(json: JValue): CpuUtilization = {
val startCounters = cpuCountersFromJson(json \ "Start Counters")
val endCounters = cpuCountersFromJson(json \ "End Counters")
new CpuUtilization(startCounters, endCounters)
}

def blockDeviceUtilizationFromJson(json: JValue): BlockDeviceUtilization = {
new BlockDeviceUtilization(
(json \ "Disk Utilization").extract[Float],
(json \ "Read Throughput").extract[Float],
(json \ "Write Throughput").extract[Float])
}

def diskUtilizationFromJson(json: JValue): DiskUtilization = {
// TODO: This does not currently decode the block device utilization, which should be done
// using blockDeviceUtilizationFromJson! Decoding the mapping of names to utilizations is
// tricky because of the way the Json object is structured.
// https://github.com/NetSys/spark-monotasks/issues/18
new DiskUtilization(
(json \ "Elapsed Millis").extract[Long],
new HashMap[String, BlockDeviceUtilization]())
}

def networkUtilizationFromJson(json: JValue): NetworkUtilization = {
new NetworkUtilization(
(json \ "Elapsed Millis").extract[Long],
(json \ "Bytes Received Per Second").extract[Float],
(json \ "Bytes Transmitted Per Second").extract[Float],
(json \ "Packets Received Per Second").extract[Float],
(json \ "Packets Transmitted Per Second").extract[Float])
}

def taskEndReasonFromJson(json: JValue): TaskEndReason = {
val success = Utils.getFormattedClassName(Success)
val resubmitted = Utils.getFormattedClassName(Resubmitted)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class DiskUtilizationSuite extends FunSuite {

val endCounters = new DiskCounters(2048, endNameToCounters)

val utilization = new DiskUtilization(startCounters, endCounters)
val utilization = DiskUtilization(startCounters, endCounters)
assert(utilization.deviceNameToUtilization.contains("disk1"))
val disk1Utilization = utilization.deviceNameToUtilization("disk1")
assert(0 === disk1Utilization.readThroughput)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,7 @@ class JsonProtocolSuite extends FunSuite {
t.setHostname("localhost")
t.setExecutorDeserializeTime(a)
t.setExecutorRunTime(b)
t.setComputationNanos(b + 50)
t.setResultSize(c)
t.setJvmGCTime(d)
t.setResultSerializationTime(a + b)
Expand Down Expand Up @@ -945,6 +946,7 @@ class JsonProtocolSuite extends FunSuite {
| "Host Name": "localhost",
| "Executor Deserialize Time": 300,
| "Executor Run Time": 400,
| "Computation Nanos": 450,
| "Result Size": 500,
| "JVM GC Time": 600,
| "Result Serialization Time": 700,
Expand Down Expand Up @@ -1031,6 +1033,7 @@ class JsonProtocolSuite extends FunSuite {
| "Host Name": "localhost",
| "Executor Deserialize Time": 300,
| "Executor Run Time": 400,
| "Computation Nanos": 450,
| "Result Size": 500,
| "JVM GC Time": 600,
| "Result Serialization Time": 700,
Expand Down Expand Up @@ -1114,6 +1117,7 @@ class JsonProtocolSuite extends FunSuite {
| "Host Name": "localhost",
| "Executor Deserialize Time": 300,
| "Executor Run Time": 400,
| "Computation Nanos": 450,
| "Result Size": 500,
| "JVM GC Time": 600,
| "Result Serialization Time": 700,
Expand Down

0 comments on commit 9393fd1

Please sign in to comment.