From e4e3180bef1cee9ba8bda17596b5fddb1abe616e Mon Sep 17 00:00:00 2001 From: Shafi Rasulov Date: Fri, 17 Oct 2025 18:45:15 +0400 Subject: [PATCH] Expose stage level metrics in prometheus endpoint --- .../status/api/v1/PrometheusResource.scala | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala index 6efe3106ba56d..b087124331976 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/PrometheusResource.scala @@ -37,6 +37,16 @@ import org.apache.spark.ui.SparkUI @Experimental @Path("/executors") private[v1] class PrometheusResource extends ApiRequestContext { + + private case class ExecutorStageMetrics( + var memoryBytesSpilled: Long = 0L, + var diskBytesSpilled: Long = 0L, + var shuffleWriteRecords: Long = 0L, + var shuffleReadRecords: Long = 0L, + var inputRecords: Long = 0L, + var outputBytes: Long = 0L, + var outputRecords: Long = 0L + ) @GET @Path("prometheus") @Produces(Array(MediaType.TEXT_PLAIN)) @@ -44,6 +54,24 @@ private[v1] class PrometheusResource extends ApiRequestContext { val sb = new StringBuilder sb.append(s"""spark_info{version="$SPARK_VERSION_SHORT", revision="$SPARK_REVISION"} 1.0\n""") val store = uiRoot.asInstanceOf[SparkUI].store + + // Aggregate stage-level metrics for each executor + val executorStageMetrics = + scala.collection.mutable.Map[String, ExecutorStageMetrics]() + store.stageList(null).foreach { stage => + val stageSummaries = store.executorSummary(stage.stageId, stage.attemptId) + stageSummaries.foreach { case (executorId, summary) => + val current = executorStageMetrics.getOrElseUpdate(executorId, ExecutorStageMetrics()) + current.memoryBytesSpilled += summary.memoryBytesSpilled + current.diskBytesSpilled += summary.diskBytesSpilled + current.shuffleWriteRecords += summary.shuffleWriteRecords + current.shuffleReadRecords += summary.shuffleReadRecords + current.inputRecords += summary.inputRecords + current.outputBytes += summary.outputBytes + current.outputRecords += summary.outputRecords + } + } + store.executorList(true).foreach { executor => val prefix = "metrics_executor_" val labels = Seq( @@ -103,6 +131,15 @@ private[v1] class PrometheusResource extends ApiRequestContext { sb.append(s"$prefix${name}_seconds_total$labels ${m.getMetricValue(name) * 0.001}\n") } } + executorStageMetrics.get(executor.id).foreach { metrics => + sb.append(s"${prefix}memoryBytesSpilled_bytes_total$labels ${metrics.memoryBytesSpilled}\n") + sb.append(s"${prefix}diskBytesSpilled_bytes_total$labels ${metrics.diskBytesSpilled}\n") + sb.append(s"${prefix}shuffleWriteRecords_total$labels ${metrics.shuffleWriteRecords}\n") + sb.append(s"${prefix}shuffleReadRecords_total$labels ${metrics.shuffleReadRecords}\n") + sb.append(s"${prefix}inputRecords_total$labels ${metrics.inputRecords}\n") + sb.append(s"${prefix}outputBytes_bytes_total$labels ${metrics.outputBytes}\n") + sb.append(s"${prefix}outputRecords_total$labels ${metrics.outputRecords}\n") + } } sb.toString }