Skip to content

Commit

Permalink
GPU device watermark metrics (#11457)
Browse files Browse the repository at this point in the history
* add max memory watermark metric

Signed-off-by: Zach Puller <[email protected]>

---------

Signed-off-by: Zach Puller <[email protected]>
  • Loading branch information
zpuller authored Sep 26, 2024
1 parent b113c46 commit c047707
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ private final class GpuSemaphore() extends Logging {
def completeTask(context: TaskContext): Unit = {
val taskAttemptId = context.taskAttemptId()
GpuTaskMetrics.get.updateRetry(taskAttemptId)
GpuTaskMetrics.get.updateMaxGpuMemory(taskAttemptId)
val refs = tasks.remove(taskAttemptId)
if (refs == null) {
throw new IllegalStateException(s"Completion of unknown task $taskAttemptId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,35 @@ class NanoSecondAccumulator extends AccumulatorV2[jl.Long, NanoTime] {
override def value: NanoTime = NanoTime(_sum)
}

class HighWatermarkAccumulator extends AccumulatorV2[jl.Long, Long] {
private var _value = 0L
override def isZero: Boolean = _value == 0

override def copy(): HighWatermarkAccumulator = {
val newAcc = new HighWatermarkAccumulator
newAcc._value = this._value
newAcc
}

override def reset(): Unit = {
_value = 0
}

override def add(v: jl.Long): Unit = {
_value += v
}

override def merge(other: AccumulatorV2[jl.Long, Long]): Unit = other match {
case wa: HighWatermarkAccumulator =>
_value = _value.max(wa._value)
case _ =>
throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}

override def value: Long = _value
}

class GpuTaskMetrics extends Serializable {
private val semWaitTimeNs = new NanoSecondAccumulator
private val retryCount = new LongAccumulator
Expand All @@ -91,6 +120,8 @@ class GpuTaskMetrics extends Serializable {
private val readSpillFromHostTimeNs = new NanoSecondAccumulator
private val readSpillFromDiskTimeNs = new NanoSecondAccumulator

private val maxDeviceMemoryBytes = new HighWatermarkAccumulator

private val metrics = Map[String, AccumulatorV2[_, _]](
"gpuSemaphoreWait" -> semWaitTimeNs,
"gpuRetryCount" -> retryCount,
Expand All @@ -100,7 +131,8 @@ class GpuTaskMetrics extends Serializable {
"gpuSpillToHostTime" -> spillToHostTimeNs,
"gpuSpillToDiskTime" -> spillToDiskTimeNs,
"gpuReadSpillFromHostTime" -> readSpillFromHostTimeNs,
"gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs
"gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs,
"gpuMaxDeviceMemoryBytes" -> maxDeviceMemoryBytes
)

def register(sc: SparkContext): Unit = {
Expand Down Expand Up @@ -178,6 +210,18 @@ class GpuTaskMetrics extends Serializable {
retryComputationTime.add(compNs)
}
}

def updateMaxGpuMemory(taskAttemptId: Long): Unit = {
val maxMem = RmmSpark.getAndResetGpuMaxMemoryAllocated(taskAttemptId)
if (maxMem > 0) {
// This metric tracks the max amount of memory that is allocated on the gpu during
// the lifespan of a task. However, this update function only gets called once on task
// completion, whereas the actual logic tracking of the max value during memory allocations
// lives in the JNI. Therefore, we can stick the convention here of calling the add method
// instead of adding a dedicated max method to the accumulator.
maxDeviceMemoryBytes.add(maxMem)
}
}
}

/**
Expand Down

0 comments on commit c047707

Please sign in to comment.