Skip to content
This repository has been archived by the owner on Oct 31, 2018. It is now read-only.

Commit

Permalink
Add per-task utilization information.
Browse files Browse the repository at this point in the history
Thiis commit adds instrumentation for CPU, disk, and network utilization.
The instrumentation is included in the metrics sent back to the driver
about each task, and also is logged every 10ms (by default) on each
executor, so that we can measure how utilization changes with time.

Conflicts:
	core/src/main/scala/org/apache/spark/executor/Executor.scala
	core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
	core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Conflicts:
	core/src/main/scala/org/apache/spark/executor/Executor.scala
	core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
  • Loading branch information
kayousterhout authored and root committed May 21, 2015
1 parent 058205c commit 4ffba97
Show file tree
Hide file tree
Showing 12 changed files with 626 additions and 1 deletion.
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import akka.actor.Props

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.performance_logging.ContinuousMonitor
import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
Expand Down Expand Up @@ -92,6 +93,9 @@ private[spark] class Executor(
private val executorActor = env.actorSystem.actorOf(
Props(new ExecutorActor(executorId)), "ExecutorActor")

private val continuousMonitor = new ContinuousMonitor(conf)
continuousMonitor.start(env)

// Whether to load classes in user jars before those in Spark jars
private val userClassPathFirst: Boolean = {
conf.getBoolean("spark.executor.userClassPathFirst",
Expand Down Expand Up @@ -140,6 +144,7 @@ private[spark] class Executor(
def stop() {
env.metricsSystem.report()
env.actorSystem.stop(executorActor)
continuousMonitor.stop()
isStopped = true
threadPool.shutdown()
if (!isLocal) {
Expand Down Expand Up @@ -275,6 +280,7 @@ private[spark] class Executor(
for (m <- metrics) {
m.setExecutorRunTime(serviceTime)
m.setJvmGCTime(gcTime - startGCTime)
m.setUtilization()
}
val reason = new ExceptionFailure(t, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
Expand Down
24 changes: 24 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.apache.spark.executor.DataReadMethod.DataReadMethod
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.performance_logging.{CpuCounters, CpuUtilization, DiskCounters,
DiskUtilization, NetworkCounters, NetworkUtilization}
import org.apache.spark.storage.{BlockId, BlockStatus}

/**
Expand Down Expand Up @@ -155,6 +157,28 @@ class TaskMetrics extends Serializable {
*/
var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None

/**
* CPU / network / disk counters for when the task started running. These are used when the task
* completes to compute the utilization during the period that the task was running.
*/
@transient private val startCpuCounters = new CpuCounters()
@transient private val startNetworkCounters = new NetworkCounters()
@transient private val startDiskCounters = new DiskCounters()

/**
* Metrics about machine utilization while the task was running.
*/
var cpuUtilization: Option[CpuUtilization] = None
var networkUtilization: Option[NetworkUtilization] = None
var diskUtilization: Option[DiskUtilization] = None

/** Should be called when a macrotask completes to set metrics about the task's runtime. */
def setUtilization() {
cpuUtilization = Some(new CpuUtilization(startCpuCounters))
networkUtilization = Some(new NetworkUtilization(startNetworkCounters))
diskUtilization = Some(new DiskUtilization(startDiskCounters))
}

/**
* A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization
* issues from readers in different threads, in-progress tasks use a ShuffleReadMetrics for each
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2014 The Regents of The University California
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.performance_logging

import java.io.{File, PrintWriter}
import java.util.concurrent.TimeUnit

import scala.concurrent.duration.Duration

import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods

import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.util.JsonProtocol

/**
* Periodically logs information about CPU, network, and disk utilization on the machine to a file.
*/
private[spark] class ContinuousMonitor(sparkConf: SparkConf) {
private val logIntervalMillis = sparkConf.getInt("spark.continuousMonitor.logIntervalMillis", 10)
val printWriter = new PrintWriter(
new File(s"/tmp/spark_continuous_monitor_${System.currentTimeMillis}"))

private var previousCpuCounters = new CpuCounters()
private var previousDiskCounters = new DiskCounters()
private var previousNetworkCounters = new NetworkCounters()

private def getUtilizationJson(): JValue = {
val currentCpuCounters = new CpuCounters()
val currentDiskCounters = new DiskCounters()
val currentNetworkCounters = new NetworkCounters()

val cpuUtilization = new CpuUtilization(previousCpuCounters, currentCpuCounters)
val diskUtilization = new DiskUtilization(previousDiskCounters, currentDiskCounters)
val networkUtilization = new NetworkUtilization(previousNetworkCounters, currentNetworkCounters)

previousCpuCounters = currentCpuCounters
previousDiskCounters = currentDiskCounters
previousNetworkCounters = currentNetworkCounters

("Current Time" -> currentCpuCounters.timeMillis) ~
("Previous Time" -> previousCpuCounters.timeMillis) ~
("Cpu Utilization" -> JsonProtocol.cpuUtilizationToJson(cpuUtilization)) ~
("Disk Utilization" -> JsonProtocol.diskUtilizationToJson(diskUtilization)) ~
("Network Utilization" -> JsonProtocol.networkUtilizationToJson(networkUtilization))
}

def start(env: SparkEnv) {
import env.actorSystem.dispatcher
// TODO: Don't bother starting this when the /proc filesystem isn't available on the machine.
env.actorSystem.scheduler.schedule(
Duration(0, TimeUnit.MILLISECONDS),
Duration(logIntervalMillis, TimeUnit.MILLISECONDS)) {
// TODO: Will this interfere with other uses of the disk? Should write to different disk?
// To the EBS volume? Experiments so far suggest no (because the volume of data is
// small), but worth keeping an eye on.
printWriter.write(JsonMethods.compact(JsonMethods.render(getUtilizationJson())))
}
}

def stop() {
printWriter.close()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2014 The Regents of The University California
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.performance_logging

import java.io.FileNotFoundException

import scala.io.Source

import org.apache.spark.Logging

class CpuCounters() extends Serializable with Logging {
val timeMillis = System.currentTimeMillis()

// Total CPU time used by the Spark process.
var processUserJiffies = 0L
var processSystemJiffies = 0L

// Total CPU time used by all processes on the machine.
var totalUserJiffies = 0L
var totalSystemJiffies = 0L

try {
Source.fromFile(s"/proc/${Utils.getPid()}/stat").getLines().foreach { line =>
val values = line.split(" ")
processUserJiffies = values(CpuCounters.UTIME_INDEX).toLong
processSystemJiffies = values(CpuCounters.STIME_INDEX).toLong
}

Source.fromFile(CpuCounters.CPU_TOTALS_FILENAME).getLines().foreach { line =>
// Look for only the line that starts with "cpu ", which has the totals across all CPUs
// (the remaining lines are for a particular core).
if (line.startsWith("cpu ")) {
val cpuTimes =
line.substring(CpuCounters.CPU_COUNTS_START_INDEX, line.length).split(" ").map(_.toInt)
totalUserJiffies = cpuTimes(CpuCounters.USER_JIFFIES_INDEX)
totalSystemJiffies = cpuTimes(CpuCounters.SYSTEM_JIFFIES_INDEX)
}
}
} catch {
case e: FileNotFoundException =>
logWarning(
"Unable to record CPU counters because files in /proc filesystem could not be found")
}
}

object CpuCounters {
val CPU_TOTALS_FILENAME = "/proc/stat"

// The first 5 characters in /proc/stat are "cpu " or "cpuX " where X is the indentifier of the
// particular core.
val CPU_COUNTS_START_INDEX = 5

// Indexes of the counters for user / system CPU counters, starting after the CPU identifier.
val USER_JIFFIES_INDEX = 0
val SYSTEM_JIFFIES_INDEX = 2

// 0-based index in /proc/pid/stat file of the user CPU time. Not necessarily portable.
val UTIME_INDEX = 13
val STIME_INDEX = 14
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2014 The Regents of The University California
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.performance_logging

/**
* CPU utilization measures the average number of cores in use, so when the CPUs are fully
* utilized, the CPU utilization will be equal to the number of cores (i.e., it will typically
* be greater than 1).
*/
class CpuUtilization(val startCounters: CpuCounters, val endCounters: CpuCounters)
extends Serializable {

def elapsedMillis = endCounters.timeMillis - startCounters.timeMillis
def elapsedJiffies = CpuUtilization.JIFFIES_PER_SECOND * (elapsedMillis * 1.0 / 1000)

def processUserUtilization =
((endCounters.processUserJiffies - startCounters.processUserJiffies).toFloat / elapsedJiffies)
def processSystemUtilization =
((endCounters.processSystemJiffies - startCounters.processSystemJiffies).toFloat /
elapsedJiffies)
def totalUserUtilization =
((endCounters.totalUserJiffies - startCounters.totalUserJiffies).toFloat / elapsedJiffies)
def totalSystemUtilization =
((endCounters.totalSystemJiffies - startCounters.totalSystemJiffies).toFloat / elapsedJiffies)

def this(startCounters: CpuCounters) = this(startCounters, new CpuCounters())
}

object CpuUtilization {
// This is the correct value for most linux systems, and for the default Spark AMI.
val JIFFIES_PER_SECOND = 100
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2014 The Regents of The University California
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.performance_logging

import java.io.FileNotFoundException

import scala.collection.mutable.HashMap
import scala.io.Source

import org.apache.spark.Logging

/** Stores counters for a particular block device. */
case class BlockDeviceCounters(val countersLine: String) extends Serializable {
val items = countersLine.split(" ").filter(!_.isEmpty())
val deviceName = items(DiskCounters.DEVICE_NAME_INDEX)
val sectorsRead = items(DiskCounters.SECTORS_READ_INDEX).toLong
val millisReading = items(DiskCounters.MILLIS_READING_INDEX).toLong
val sectorsWritten = items(DiskCounters.SECTORS_WRITTEN_INDEX).toLong
val millisWriting = items(DiskCounters.MILLIS_WRITING_INDEX).toLong
val millisTotal = items(DiskCounters.MILLIS_TOTAL_INDEX).toLong
}

/** Counters across all block devices. */
class DiskCounters(
val timeMillis: Long,
val deviceNameToCounters: HashMap[String, BlockDeviceCounters])
extends Serializable with Logging {

def this() = {
this(System.currentTimeMillis(), new HashMap[String, BlockDeviceCounters]())

try {
Source.fromFile(DiskCounters.DISK_TOTALS_FILENAME).getLines().foreach { line =>
if (line.indexOf("loop") == -1) {
val deviceCounters = BlockDeviceCounters(line)
this.deviceNameToCounters += deviceCounters.deviceName -> deviceCounters
}
}
} catch {
case e: FileNotFoundException =>
logWarning(
s"Unable to record disk counters because ${DiskCounters.DISK_TOTALS_FILENAME} " +
"could not be found")
}
}
}

object DiskCounters {
val DISK_TOTALS_FILENAME = "/proc/diskstats"

// Indices of information in the DISK_TOTALS_FILENAME file.
val DEVICE_NAME_INDEX = 2
val SECTORS_READ_INDEX = 5
val MILLIS_READING_INDEX = 6
val SECTORS_WRITTEN_INDEX = 9
val MILLIS_WRITING_INDEX = 10
val MILLIS_TOTAL_INDEX = 12
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2014 The Regents of The University California
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.performance_logging

import scala.collection.mutable.HashMap

/** Utilization of a particular block device. */
class BlockDeviceUtilization(
startCounters: BlockDeviceCounters,
endCounters: BlockDeviceCounters,
elapsedMillis: Long) extends Serializable {
val diskUtilization = ((endCounters.millisTotal - startCounters.millisTotal).toFloat /
elapsedMillis)
val readThroughput = ((endCounters.sectorsRead - startCounters.sectorsRead).toFloat *
DiskUtilization.SECTOR_SIZE_BYTES * 1000 / elapsedMillis)
val writeThroughput = ((endCounters.sectorsWritten - startCounters.sectorsWritten).toFloat *
DiskUtilization.SECTOR_SIZE_BYTES * 1000 / elapsedMillis)
}

class DiskUtilization(startCounters: DiskCounters, endCounters: DiskCounters) extends Serializable {
val deviceNameToUtilization = HashMap[String, BlockDeviceUtilization]()
val elapsedMillis = endCounters.timeMillis - startCounters.timeMillis
endCounters.deviceNameToCounters.foreach {
case (deviceName: String, endCounters: BlockDeviceCounters) =>
startCounters.deviceNameToCounters.get(deviceName).foreach {
deviceNameToUtilization +=
deviceName -> new BlockDeviceUtilization(_, endCounters, elapsedMillis)
}
}

def this(startCounters: DiskCounters) = this(startCounters, new DiskCounters())
}

object DiskUtilization {
// This is not at all portable -- can be obtained for a particular machine with "fdisk -l".
val SECTOR_SIZE_BYTES = 512
}
Loading

0 comments on commit 4ffba97

Please sign in to comment.