Skip to content

Commit

Permalink
add max memory watermark metric
Browse files Browse the repository at this point in the history
Signed-off-by: Zach Puller <[email protected]>
  • Loading branch information
zpuller committed Sep 24, 2024
1 parent 6a9731f commit b6a9fd7
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ private final class GpuSemaphore() extends Logging {
def completeTask(context: TaskContext): Unit = {
val taskAttemptId = context.taskAttemptId()
GpuTaskMetrics.get.updateRetry(taskAttemptId)
GpuTaskMetrics.get.updateMaxMemory(taskAttemptId)
val refs = tasks.remove(taskAttemptId)
if (refs == null) {
throw new IllegalStateException(s"Completion of unknown task $taskAttemptId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,35 @@ class NanoSecondAccumulator extends AccumulatorV2[jl.Long, NanoTime] {
override def value: NanoTime = NanoTime(_sum)
}

class WatermarkAccumulator extends AccumulatorV2[jl.Long, Long] {
private var _value = 0L
override def isZero: Boolean = _value == 0

override def copy(): WatermarkAccumulator = {
val newAcc = new WatermarkAccumulator
newAcc._value = this._value
newAcc
}

override def reset(): Unit = {
_value = 0
}

override def add(v: jl.Long): Unit = {
_value += v
}

override def merge(other: AccumulatorV2[jl.Long, Long]): Unit = other match {
case wa: WatermarkAccumulator =>
_value = _value.max(wa._value)
case _ =>
throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}

override def value: Long = _value
}

class GpuTaskMetrics extends Serializable {
private val semWaitTimeNs = new NanoSecondAccumulator
private val retryCount = new LongAccumulator
Expand All @@ -91,6 +120,8 @@ class GpuTaskMetrics extends Serializable {
private val readSpillFromHostTimeNs = new NanoSecondAccumulator
private val readSpillFromDiskTimeNs = new NanoSecondAccumulator

private val maxDeviceMemoryBytes = new WatermarkAccumulator

private val metrics = Map[String, AccumulatorV2[_, _]](
"gpuSemaphoreWait" -> semWaitTimeNs,
"gpuRetryCount" -> retryCount,
Expand All @@ -100,7 +131,8 @@ class GpuTaskMetrics extends Serializable {
"gpuSpillToHostTime" -> spillToHostTimeNs,
"gpuSpillToDiskTime" -> spillToDiskTimeNs,
"gpuReadSpillFromHostTime" -> readSpillFromHostTimeNs,
"gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs
"gpuReadSpillFromDiskTime" -> readSpillFromDiskTimeNs,
"gpuMaxDeviceMemoryBytes" -> maxDeviceMemoryBytes
)

def register(sc: SparkContext): Unit = {
Expand Down Expand Up @@ -178,6 +210,13 @@ class GpuTaskMetrics extends Serializable {
retryComputationTime.add(compNs)
}
}

def updateMaxMemory(taskAttemptId: Long): Unit = {
val maxMem = RmmSpark.getAndResetGpuMaxMemoryAllocated(taskAttemptId)
if (maxMem > 0) {
maxDeviceMemoryBytes.add(maxMem)
}
}
}

/**
Expand Down

0 comments on commit b6a9fd7

Please sign in to comment.