From d026a1943cafd1a9fd442e3f521706bceedcfdce Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Sun, 29 Jun 2014 14:56:57 -0700 Subject: [PATCH] Add utilization information, per-task and per-executor. Thiis commit adds instrumentation for CPU, disk, and network utilization. The instrumentation is included in the metrics sent back to the driver about each task, and also is logged every 10ms (by default) on each executor, so that we can measure how utilization changes with time. --- .../org/apache/spark/executor/Executor.scala | 5 ++ .../apache/spark/executor/TaskMetrics.scala | 20 +++++ .../ContinuousMonitor.scala | 81 +++++++++++++++++ .../performance_logging/CpuCounters.scala | 74 +++++++++++++++ .../performance_logging/CpuUtilization.scala | 46 ++++++++++ .../performance_logging/DiskCounters.scala | 72 +++++++++++++++ .../performance_logging/DiskUtilization.scala | 51 +++++++++++ .../performance_logging/NetworkCounters.scala | 62 +++++++++++++ .../NetworkUtilization.scala | 34 +++++++ .../spark/performance_logging/Utils.scala | 25 ++++++ .../org/apache/spark/util/JsonProtocol.scala | 65 +++++++++++++- .../DiskUtilizationSuite.scala | 90 +++++++++++++++++++ 12 files changed, 623 insertions(+), 2 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/performance_logging/ContinuousMonitor.scala create mode 100644 core/src/main/scala/org/apache/spark/performance_logging/CpuCounters.scala create mode 100644 core/src/main/scala/org/apache/spark/performance_logging/CpuUtilization.scala create mode 100644 core/src/main/scala/org/apache/spark/performance_logging/DiskCounters.scala create mode 100644 core/src/main/scala/org/apache/spark/performance_logging/DiskUtilization.scala create mode 100644 core/src/main/scala/org/apache/spark/performance_logging/NetworkCounters.scala create mode 100644 core/src/main/scala/org/apache/spark/performance_logging/NetworkUtilization.scala create mode 100644 core/src/main/scala/org/apache/spark/performance_logging/Utils.scala create mode 100644 core/src/test/scala/org/apache/spark/performange_logging/DiskUtilizationSuite.scala diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 30c083eb4..b879b05ef 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -39,6 +39,7 @@ import org.apache.spark.{Logging, SparkConf, SparkEnv, TaskContext, TaskState} import org.apache.spark.util.{AkkaUtils, Utils} import org.apache.spark.monotasks.LocalDagScheduler import org.apache.spark.monotasks.compute.PrepareMonotask +import org.apache.spark.performance_logging.ContinuousMonitor /** * Spark executor used with Mesos, YARN, and the standalone scheduler. @@ -89,6 +90,9 @@ private[spark] class Executor( } } + private val continuousMonitor = new ContinuousMonitor(conf) + continuousMonitor.start(env) + // Create our DependencyManager, which manages the class loader. private val dependencyManager = new DependencyManager(env, conf) @@ -114,6 +118,7 @@ private[spark] class Executor( def stop() { env.metricsSystem.report() + continuousMonitor.stop() isStopped = true } } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index b7d331f3e..93fff4526 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -39,6 +39,8 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.performance_logging.{CpuCounters, CpuUtilization, DiskCounters, + DiskUtilization, NetworkCounters, NetworkUtilization} import org.apache.spark.storage.{BlockId, BlockStatus} /** @@ -133,10 +135,28 @@ class TaskMetrics extends Serializable { */ var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None + /** + * CPU / network / disk counters for when the task started running. These are used when the task + * completes to compute the utilization during the period that the task was running. + */ + @transient private val startCpuCounters = new CpuCounters() + @transient private val startNetworkCounters = new NetworkCounters() + @transient private val startDiskCounters = new DiskCounters() + + /** + * Metrics about machine utilization while the task was running. + */ + var cpuUtilization: Option[CpuUtilization] = None + var networkUtilization: Option[NetworkUtilization] = None + var diskUtilization: Option[DiskUtilization] = None + /** Should be called when a macrotask completes to set metrics about the task's runtime. */ def setMetricsOnTaskCompletion() { executorRunTime = System.currentTimeMillis() - startingTime jvmGCTime = currentGCTotalMillis - startingGCTime + cpuUtilization = Some(new CpuUtilization(startCpuCounters)) + networkUtilization = Some(new NetworkUtilization(startNetworkCounters)) + diskUtilization = Some(new DiskUtilization(startDiskCounters)) } /** diff --git a/core/src/main/scala/org/apache/spark/performance_logging/ContinuousMonitor.scala b/core/src/main/scala/org/apache/spark/performance_logging/ContinuousMonitor.scala new file mode 100644 index 000000000..a6c7573e5 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/performance_logging/ContinuousMonitor.scala @@ -0,0 +1,81 @@ +/* + * Copyright 2014 The Regents of The University California + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.performance_logging + +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration.Duration + +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods + +import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.util.{FileLogger, JsonProtocol} + +/** + * Periodically logs information about CPU, network, and disk utilization on the machine to a file. + */ +private[spark] class ContinuousMonitor(sparkConf: SparkConf) { + private val logIntervalMillis = sparkConf.getInt("spark.continuousMonitor.logIntervalMillis", 10) + private val logDir = org.apache.spark.util.Utils.resolveURI( + s"/tmp/spark_continuous_monitor_${System.currentTimeMillis}").toString + private val logger = new FileLogger(logDir, sparkConf) + logger.start() + logger.newFile() + + private var previousCpuCounters = new CpuCounters() + private var previousDiskCounters = new DiskCounters() + private var previousNetworkCounters = new NetworkCounters() + + private def getUtilizationJson(): JValue = { + val currentCpuCounters = new CpuCounters() + val currentDiskCounters = new DiskCounters() + val currentNetworkCounters = new NetworkCounters() + + val cpuUtilization = new CpuUtilization(previousCpuCounters, currentCpuCounters) + val diskUtilization = new DiskUtilization(previousDiskCounters, currentDiskCounters) + val networkUtilization = new NetworkUtilization(previousNetworkCounters, currentNetworkCounters) + + previousCpuCounters = currentCpuCounters + previousDiskCounters = currentDiskCounters + previousNetworkCounters = currentNetworkCounters + + ("Current Time" -> currentCpuCounters.timeMillis) ~ + ("Previous Time" -> previousCpuCounters.timeMillis) ~ + ("Cpu Utilization" -> JsonProtocol.cpuUtilizationToJson(cpuUtilization)) ~ + ("Disk Utilization" -> JsonProtocol.diskUtilizationToJson(diskUtilization)) ~ + ("Network Utilization" -> JsonProtocol.networkUtilizationToJson(networkUtilization)) + } + + def start(env: SparkEnv) { + import env.actorSystem.dispatcher + // TODO: Don't bother starting this when the /proc filesystem isn't available on the machine. + env.actorSystem.scheduler.schedule( + Duration(0, TimeUnit.MILLISECONDS), + Duration(logIntervalMillis, TimeUnit.MILLISECONDS)) { + // TODO: Will this interfere with other uses of the disk? Should write to different disk? + // To the EBS volume? Experiments so far suggest no (because the volume of data is + // small), but worth keeping an eye on. + logger.logLine(JsonMethods.compact(JsonMethods.render(getUtilizationJson()))) + } + } + + def stop() { + logger.stop() + } +} diff --git a/core/src/main/scala/org/apache/spark/performance_logging/CpuCounters.scala b/core/src/main/scala/org/apache/spark/performance_logging/CpuCounters.scala new file mode 100644 index 000000000..3f2c12389 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/performance_logging/CpuCounters.scala @@ -0,0 +1,74 @@ +/* + * Copyright 2014 The Regents of The University California + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.performance_logging + +import java.io.FileNotFoundException + +import scala.io.Source + +import org.apache.spark.Logging + +class CpuCounters() extends Serializable with Logging { + val timeMillis = System.currentTimeMillis() + + // Total CPU time used by the Spark process. + var processUserJiffies = 0L + var processSystemJiffies = 0L + + // Total CPU time used by all processes on the machine. + var totalUserJiffies = 0L + var totalSystemJiffies = 0L + + try { + Source.fromFile(s"/proc/${Utils.getPid()}/stat").getLines().foreach { line => + val values = line.split(" ") + processUserJiffies = values(CpuCounters.UTIME_INDEX).toLong + processSystemJiffies = values(CpuCounters.STIME_INDEX).toLong + } + + Source.fromFile(CpuCounters.CPU_TOTALS_FILENAME).getLines().foreach { line => + // Look for only the line that starts with "cpu ", which has the totals across all CPUs + // (the remaining lines are for a particular core). + if (line.startsWith("cpu ")) { + val cpuTimes = + line.substring(CpuCounters.CPU_COUNTS_START_INDEX, line.length).split(" ").map(_.toInt) + totalUserJiffies = cpuTimes(CpuCounters.USER_JIFFIES_INDEX) + totalSystemJiffies = cpuTimes(CpuCounters.SYSTEM_JIFFIES_INDEX) + } + } + } catch { + case e: FileNotFoundException => + logWarning( + "Unable to record CPU counters because files in /proc filesystem could not be found") + } +} + +object CpuCounters { + val CPU_TOTALS_FILENAME = "/proc/stat" + + // The first 5 characters in /proc/stat are "cpu " or "cpuX " where X is the indentifier of the + // particular core. + val CPU_COUNTS_START_INDEX = 5 + + // Indexes of the counters for user / system CPU counters, starting after the CPU identifier. + val USER_JIFFIES_INDEX = 0 + val SYSTEM_JIFFIES_INDEX = 2 + + // 0-based index in /proc/pid/stat file of the user CPU time. Not necessarily portable. + val UTIME_INDEX = 13 + val STIME_INDEX = 14 +} diff --git a/core/src/main/scala/org/apache/spark/performance_logging/CpuUtilization.scala b/core/src/main/scala/org/apache/spark/performance_logging/CpuUtilization.scala new file mode 100644 index 000000000..a14be89b4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/performance_logging/CpuUtilization.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2014 The Regents of The University California + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.performance_logging + +/** + * CPU utilization measures the average number of cores in use, so when the CPUs are fully + * utilized, the CPU utilization will be equal to the number of cores (i.e., it will typically + * be greater than 1). + */ +class CpuUtilization(val startCounters: CpuCounters, val endCounters: CpuCounters) + extends Serializable { + + def elapsedMillis = endCounters.timeMillis - startCounters.timeMillis + def elapsedJiffies = CpuUtilization.JIFFIES_PER_SECOND * (elapsedMillis * 1.0 / 1000) + + def processUserUtilization = + ((endCounters.processUserJiffies - startCounters.processUserJiffies).toFloat / elapsedJiffies) + def processSystemUtilization = + ((endCounters.processSystemJiffies - startCounters.processSystemJiffies).toFloat / + elapsedJiffies) + def totalUserUtilization = + ((endCounters.totalUserJiffies - startCounters.totalUserJiffies).toFloat / elapsedJiffies) + def totalSystemUtilization = + ((endCounters.totalSystemJiffies - startCounters.totalSystemJiffies).toFloat / elapsedJiffies) + + def this(startCounters: CpuCounters) = this(startCounters, new CpuCounters()) +} + +object CpuUtilization { + // This is the correct value for most linux systems, and for the default Spark AMI. + val JIFFIES_PER_SECOND = 100 +} diff --git a/core/src/main/scala/org/apache/spark/performance_logging/DiskCounters.scala b/core/src/main/scala/org/apache/spark/performance_logging/DiskCounters.scala new file mode 100644 index 000000000..64fd17770 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/performance_logging/DiskCounters.scala @@ -0,0 +1,72 @@ +/* + * Copyright 2014 The Regents of The University California + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.performance_logging + +import java.io.FileNotFoundException + +import scala.collection.mutable.HashMap +import scala.io.Source + +import org.apache.spark.Logging + +/** Stores counters for a particular block device. */ +case class BlockDeviceCounters(val countersLine: String) extends Serializable { + val items = countersLine.split(" ").filter(!_.isEmpty()) + val deviceName = items(DiskCounters.DEVICE_NAME_INDEX) + val sectorsRead = items(DiskCounters.SECTORS_READ_INDEX).toLong + val millisReading = items(DiskCounters.MILLIS_READING_INDEX).toLong + val sectorsWritten = items(DiskCounters.SECTORS_WRITTEN_INDEX).toLong + val millisWriting = items(DiskCounters.MILLIS_WRITING_INDEX).toLong + val millisTotal = items(DiskCounters.MILLIS_TOTAL_INDEX).toLong +} + +/** Counters across all block devices. */ +class DiskCounters( + val timeMillis: Long, + val deviceNameToCounters: HashMap[String, BlockDeviceCounters]) + extends Serializable with Logging { + + def this() = { + this(System.currentTimeMillis(), new HashMap[String, BlockDeviceCounters]()) + + try { + Source.fromFile(DiskCounters.DISK_TOTALS_FILENAME).getLines().foreach { line => + if (line.indexOf("loop") == -1) { + val deviceCounters = BlockDeviceCounters(line) + this.deviceNameToCounters += deviceCounters.deviceName -> deviceCounters + } + } + } catch { + case e: FileNotFoundException => + logWarning( + s"Unable to record disk counters because ${DiskCounters.DISK_TOTALS_FILENAME} " + + "could not be found") + } + } +} + +object DiskCounters { + val DISK_TOTALS_FILENAME = "/proc/diskstats" + + // Indices of information in the DISK_TOTALS_FILENAME file. + val DEVICE_NAME_INDEX = 2 + val SECTORS_READ_INDEX = 5 + val MILLIS_READING_INDEX = 6 + val SECTORS_WRITTEN_INDEX = 9 + val MILLIS_WRITING_INDEX = 10 + val MILLIS_TOTAL_INDEX = 12 +} diff --git a/core/src/main/scala/org/apache/spark/performance_logging/DiskUtilization.scala b/core/src/main/scala/org/apache/spark/performance_logging/DiskUtilization.scala new file mode 100644 index 000000000..fcd7e6354 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/performance_logging/DiskUtilization.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2014 The Regents of The University California + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.performance_logging + +import scala.collection.mutable.HashMap + +/** Utilization of a particular block device. */ +class BlockDeviceUtilization( + startCounters: BlockDeviceCounters, + endCounters: BlockDeviceCounters, + elapsedMillis: Long) extends Serializable { + val diskUtilization = ((endCounters.millisTotal - startCounters.millisTotal).toFloat / + elapsedMillis) + val readThroughput = ((endCounters.sectorsRead - startCounters.sectorsRead).toFloat * + DiskUtilization.SECTOR_SIZE_BYTES * 1000 / elapsedMillis) + val writeThroughput = ((endCounters.sectorsWritten - startCounters.sectorsWritten).toFloat * + DiskUtilization.SECTOR_SIZE_BYTES * 1000 / elapsedMillis) +} + +class DiskUtilization(startCounters: DiskCounters, endCounters: DiskCounters) extends Serializable { + val deviceNameToUtilization = HashMap[String, BlockDeviceUtilization]() + val elapsedMillis = endCounters.timeMillis - startCounters.timeMillis + endCounters.deviceNameToCounters.foreach { + case (deviceName: String, endCounters: BlockDeviceCounters) => + startCounters.deviceNameToCounters.get(deviceName).foreach { + deviceNameToUtilization += + deviceName -> new BlockDeviceUtilization(_, endCounters, elapsedMillis) + } + } + + def this(startCounters: DiskCounters) = this(startCounters, new DiskCounters()) +} + +object DiskUtilization { + // This is not at all portable -- can be obtained for a particular machine with "fdisk -l". + val SECTOR_SIZE_BYTES = 512 +} diff --git a/core/src/main/scala/org/apache/spark/performance_logging/NetworkCounters.scala b/core/src/main/scala/org/apache/spark/performance_logging/NetworkCounters.scala new file mode 100644 index 000000000..a5e0ddd6b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/performance_logging/NetworkCounters.scala @@ -0,0 +1,62 @@ +/* + * Copyright 2014 The Regents of The University California + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.performance_logging + +import java.io.FileNotFoundException + +import scala.io.Source + +import org.apache.spark.Logging + +case class NetworkCounters() extends Serializable with Logging { + val timeMillis = System.currentTimeMillis() + var receivedBytes = 0L + var receivedPackets = 0L + var transmittedBytes = 0L + var transmittedPackets = 0L + + try { + // Could also read per-process counters from s"/proc/${Utils.getPid()}/net/dev", but (a) this + // doesn't work on m2.4xlarge instances (it's just the same as the total counters) and (b) if it + // did work, it wouldn't include the HDFS network data (because that happens in a separate + // process) which can be important to understanding utilization. + Source.fromFile(NetworkCounters.NETWORK_TOTALS_FILENAME).getLines().foreach { line => + if (line.contains(":") && !line.contains("lo")) { + val counts = line.split(":")(1).split(" ").filter(_.length > 0).map(_.toLong) + receivedBytes += counts(NetworkCounters.RECEIVED_BYTES_INDEX) + receivedPackets += counts(NetworkCounters.RECEIVED_PACKETS_INDEX) + transmittedBytes += counts(NetworkCounters.TRANSMITTED_BYTES_INDEX) + transmittedPackets += counts(NetworkCounters.TRANSMITTED_PACKETS_INDEX) + } + } + } catch { + case e: FileNotFoundException => + logWarning( + s"Unable to record network counters because ${NetworkCounters.NETWORK_TOTALS_FILENAME} " + + "could not be found") + } +} + +object NetworkCounters { + val NETWORK_TOTALS_FILENAME = "/proc/net/dev" + // 0-based index within the list of numbers in /proc/pid/net/dev file of the received and + // transmitted bytes/packets. Not necessarily portable. + val RECEIVED_BYTES_INDEX = 0 + val RECEIVED_PACKETS_INDEX = 1 + val TRANSMITTED_BYTES_INDEX = 8 + val TRANSMITTED_PACKETS_INDEX = 9 +} diff --git a/core/src/main/scala/org/apache/spark/performance_logging/NetworkUtilization.scala b/core/src/main/scala/org/apache/spark/performance_logging/NetworkUtilization.scala new file mode 100644 index 000000000..71c1eddef --- /dev/null +++ b/core/src/main/scala/org/apache/spark/performance_logging/NetworkUtilization.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2014 The Regents of The University California + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.performance_logging + +class NetworkUtilization(startCounters: NetworkCounters, endCounters: NetworkCounters) + extends Serializable { + val elapsedMillis = endCounters.timeMillis - startCounters.timeMillis + + val bytesReceivedPerSecond = + (endCounters.receivedBytes - startCounters.receivedBytes).toFloat * 1000 / elapsedMillis + val bytesTransmittedPerSecond = + (endCounters.transmittedBytes - startCounters.transmittedBytes).toFloat * 1000 / elapsedMillis + val packetsReceivedPerSecond = + (endCounters.receivedPackets - startCounters.receivedPackets).toFloat * 1000 / elapsedMillis + val packetsTransmittedPerSecond = + ((endCounters.transmittedPackets - startCounters.transmittedPackets).toFloat * 1000 / + elapsedMillis) + + def this(startCounters: NetworkCounters) = this(startCounters, new NetworkCounters()) +} diff --git a/core/src/main/scala/org/apache/spark/performance_logging/Utils.scala b/core/src/main/scala/org/apache/spark/performance_logging/Utils.scala new file mode 100644 index 000000000..3deb2818a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/performance_logging/Utils.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2014 The Regents of The University California + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.performance_logging + +import java.lang.management.ManagementFactory + +object Utils { + // Beware that the name returned by getName() is not guaranteed to keep following the pid@X + // format. + def getPid(): String = ManagementFactory.getRuntimeMXBean().getName().split("@")(0) +} diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 7436dd135..a8edba374 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -15,6 +15,22 @@ * limitations under the License. */ +/* + * Copyright 2014 The Regents of The University California + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.util import java.util.{Properties, UUID} @@ -27,9 +43,10 @@ import org.json4s.JsonDSL._ import org.json4s.JsonAST._ import org.json4s.jackson.JsonMethods._ - import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.performance_logging.{BlockDeviceUtilization, CpuCounters, CpuUtilization, + DiskUtilization, NetworkUtilization} import org.apache.spark.scheduler._ import org.apache.spark.storage._ import org.apache.spark._ @@ -237,6 +254,10 @@ private[spark] object JsonProtocol { ("Status" -> blockStatusToJson(status)) }) }.getOrElse(JNothing) + val cpuUtilization = taskMetrics.cpuUtilization.map(cpuUtilizationToJson).getOrElse(JNothing) + val diskUtilization = taskMetrics.diskUtilization.map(diskUtilizationToJson).getOrElse(JNothing) + val networkUtilization = + taskMetrics.networkUtilization.map(networkUtilizationToJson).getOrElse(JNothing) ("Host Name" -> taskMetrics.hostname) ~ ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~ ("Executor Run Time" -> taskMetrics.executorRunTime) ~ @@ -246,7 +267,47 @@ private[spark] object JsonProtocol { ("Shuffle Read Metrics" -> shuffleReadMetrics) ~ ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~ ("Input Metrics" -> inputMetrics) ~ - ("Updated Blocks" -> updatedBlocks) + ("Updated Blocks" -> updatedBlocks) ~ + ("Updated Blocks" -> updatedBlocks) ~ + ("Cpu Utilization" -> cpuUtilization) ~ + ("Disk Utilization" -> diskUtilization) ~ + ("Network Utilization" -> networkUtilization) + } + + def cpuUtilizationToJson(cpuUtilization: CpuUtilization): JValue = { + ("Start Counters" -> cpuCountersToJson(cpuUtilization.startCounters)) ~ + ("End Counters" -> cpuCountersToJson(cpuUtilization.endCounters)) ~ + ("Process User Utilization" -> cpuUtilization.processUserUtilization) ~ + ("Process System Utilization" -> cpuUtilization.processSystemUtilization) ~ + ("Total User Utilization" -> cpuUtilization.totalUserUtilization) ~ + ("Total System Utilization" -> cpuUtilization.totalSystemUtilization) + } + + def cpuCountersToJson(cpuCounters: CpuCounters): JValue = { + ("Time Milliseconds" -> cpuCounters.timeMillis) ~ + ("Process User Jiffies" -> cpuCounters.processUserJiffies) ~ + ("Process System Jiffies" -> cpuCounters.processSystemJiffies) ~ + ("Total User Jiffies" -> cpuCounters.totalUserJiffies) ~ + ("Total System Jiffies" -> cpuCounters.totalSystemJiffies) + } + + def blockDeviceUtilizationToJson(blockDeviceUtilization: BlockDeviceUtilization): JValue = { + ("Disk Utilization" -> blockDeviceUtilization.diskUtilization) ~ + ("Read Throughput" -> blockDeviceUtilization.readThroughput) ~ + ("Write Throughput" -> blockDeviceUtilization.writeThroughput) + } + + def diskUtilizationToJson(diskUtilization: DiskUtilization): JValue = { + diskUtilization.deviceNameToUtilization.map { diskNameAndUtil => + diskNameAndUtil._1 -> blockDeviceUtilizationToJson(diskNameAndUtil._2) + } + } + + def networkUtilizationToJson(networkUtilization: NetworkUtilization): JValue = { + ("Bytes Received Per Second" -> networkUtilization.bytesReceivedPerSecond) ~ + ("Bytes Transmitted Per Second" -> networkUtilization.bytesTransmittedPerSecond) ~ + ("Packets Received Per Second" -> networkUtilization.packetsReceivedPerSecond) ~ + ("Packets Transmitted Per Second" -> networkUtilization.packetsTransmittedPerSecond) } def shuffleReadMetricsToJson(shuffleReadMetrics: ShuffleReadMetrics): JValue = { diff --git a/core/src/test/scala/org/apache/spark/performange_logging/DiskUtilizationSuite.scala b/core/src/test/scala/org/apache/spark/performange_logging/DiskUtilizationSuite.scala new file mode 100644 index 000000000..b315f2b8c --- /dev/null +++ b/core/src/test/scala/org/apache/spark/performange_logging/DiskUtilizationSuite.scala @@ -0,0 +1,90 @@ +/* + * Copyright 2014 The Regents of The University California + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.performance_logging + +import scala.collection.mutable.HashMap + +import org.scalatest.FunSuite + +class DiskUtilizationSuite extends FunSuite { + test("multiple block devices") { + val startSectorsReadDisk1 = 100 + val startSectorsWrittenDisk1 = 10 + val startMillisReadingDisk1 = 1024 + val startMillisWritingDisk1 = 512 + val startMillisTotalDisk1 = 1536 + val disk1StartLine = s" 202 16 disk1 471 94 $startSectorsReadDisk1 "+ + s"$startMillisReadingDisk1 40 62 $startSectorsWrittenDisk1 $startMillisWritingDisk1 13 " + + s"$startMillisTotalDisk1 0 6" + + val startSectorsReadDisk2 = 200 + val startSectorsWrittenDisk2 = 100 + val startMillisReadingDisk2 = 512 + val startMillisWritingDisk2 = 512 + val startMillisTotalDisk2 = 1024 + // Intentionally use different spacing to make sure things still work properly. + val disk2StartLine = s" 202 16 disk2 471 94 $startSectorsReadDisk2 " + + s"$startMillisReadingDisk2 40 62 $startSectorsWrittenDisk2 $startMillisWritingDisk2 13 " + + s"$startMillisTotalDisk2 0 6" + + val startNameToCounters = HashMap[String, BlockDeviceCounters]() + startNameToCounters += "disk1" -> new BlockDeviceCounters(disk1StartLine) + startNameToCounters += "disk2" -> new BlockDeviceCounters(disk2StartLine) + + val startCounters = new DiskCounters(0, startNameToCounters) + + val endSectorsReadDisk1 = 100 + val endSectorsWrittenDisk1 = 20 + val endMillisReadingDisk1 = 1024 + val endMillisWritingDisk1 = 1024 + val endMillisTotalDisk1 = 2048 + val disk1EndLine = s" 202 16 disk1 471 94 $endSectorsReadDisk1 "+ + s"$endMillisReadingDisk1 40 62 $endSectorsWrittenDisk1 $endMillisWritingDisk1 13 " + + s"$endMillisTotalDisk1 0 6" + + val endSectorsReadDisk2 = 400 + val endSectorsWrittenDisk2 = 300 + val endMillisReadingDisk2 = 1024 + val endMillisWritingDisk2 = 1024 + val endMillisTotalDisk2 = 2048 + // Intentionally use different spacing to make sure things still work properly. + val disk2EndLine = s" 202 16 disk2 471 94 $endSectorsReadDisk2 " + + s"$endMillisReadingDisk2 40 62 $endSectorsWrittenDisk2 $endMillisWritingDisk2 13 " + + s"$endMillisTotalDisk2 0 6" + + val endNameToCounters = HashMap[String, BlockDeviceCounters]() + endNameToCounters += "disk1" -> new BlockDeviceCounters(disk1EndLine) + endNameToCounters += "disk2" -> new BlockDeviceCounters(disk2EndLine) + + val endCounters = new DiskCounters(2048, endNameToCounters) + + val utilization = new DiskUtilization(startCounters, endCounters) + assert(utilization.deviceNameToUtilization.contains("disk1")) + val disk1Utilization = utilization.deviceNameToUtilization("disk1") + assert(0 === disk1Utilization.readThroughput) + // 10 sectors written * 512 bytes / sector * 1000 ms / second / 2048 ms = 2500 bytes / s + assert(2500 === disk1Utilization.writeThroughput) + assert(0.25 === disk1Utilization.diskUtilization) + + assert(utilization.deviceNameToUtilization.contains("disk2")) + val disk2Utilization = utilization.deviceNameToUtilization("disk2") + // 200 sectors * 512 bytes / sector * 1000 ms / second / 2048 ms = 50000 bytes / s + assert(50000 === disk2Utilization.readThroughput) + assert(50000 === disk2Utilization.writeThroughput) + assert(0.5 === disk2Utilization.diskUtilization) + } +}