Skip to content

Commit 8f057a9

Browse files
dongjoon-hyundbtsai
authored andcommittedSep 13, 2019
[SPARK-29032][CORE] Add PrometheusServlet to monitor Master/Worker/Driver
### What changes were proposed in this pull request? This PR aims to simplify `Prometheus` support by adding `PrometheusServlet`. The main use cases are `K8s` and `Spark Standalone` cluster environments. ### Why are the changes needed? Prometheus.io is a CNCF project used widely with K8s. - https://github.com/prometheus/prometheus For `Master/Worker/Driver`, `Spark JMX Sink` and `Prometheus JMX Converter` combination is used in many cases. One way to achieve that is having the followings. **JMX Sink (conf/metrics.properties)** ``` *.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink ``` **JMX Converter(conf/spark-env.sh)** - https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.12.0/jmx_prometheus_javaagent-0.12.0.jar ``` export SPARK_DAEMON_JAVA_OPTS= "-javaagent:${PWD}/jmx_prometheus_javaagent-${JMX_PROMETHEUS_VERSION}.jar= ${PORT_AGENT}:jmx_prometheus.yaml" ``` This agent approach requires `PORT_AGENT` additionally. Instead, this PR natively support `Prometheus` format exporting with reusing REST API port for the better UX. ### Does this PR introduce any user-facing change? Yes. New web interfaces are added along with the existing JSON API. | | JSON End Point | Prometheus End Point | | ------- | ------------------------------------------- | ---------------------------------- | | Master | /metrics/master/json/ | /metrics/master/prometheus/ | | Master | /metrics/applications/json/ | /metrics/applications/prometheus/ | | Worker | /metrics/json/ | /metrics/prometheus/ | | Driver | /metrics/json/ | /metrics/prometheus/ | ### How was this patch tested? Manually connect the new end-points with `curl`. **Setup (Master/Worker/Driver)** Add the followings at `conf/metrics.properties` (`conf/metrics.properties.template` has these examples) ``` *.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet *.sink.prometheusServlet.path=/metrics/prometheus master.sink.prometheusServlet.path=/metrics/master/prometheus applications.sink.prometheusServlet.path=/metrics/applications/prometheus ``` ``` $ sbin/start-master.sh $ sbin/start-slave.sh spark://`hostname`:7077 $ bin/spark-shell --master spark://`hostname`:7077 ``` ``` $ curl -s http://localhost:8080/metrics/master/json/ | jq { "version": "3.1.3", "gauges": { "master.aliveWorkers": { "value": 1 }, "master.apps": { "value": 1 }, "master.waitingApps": { "value": 0 }, "master.workers": { "value": 1 } }, ... $ curl -s http://localhost:8080/metrics/master/prometheus/ | grep master metrics_master_aliveWorkers_Value 1 metrics_master_apps_Value 1 metrics_master_waitingApps_Value 0 metrics_master_workers_Value 1 ``` ``` $ curl -s http://localhost:8080/metrics/applications/json/ | jq { "version": "3.1.3", "gauges": { "application.Spark shell.1568261490667.cores": { "value": 16 }, "application.Spark shell.1568261490667.runtime_ms": { "value": 108966 }, "application.Spark shell.1568261490667.status": { "value": "RUNNING" } }, ... $ curl -s http://localhost:8080/metrics/applications/prometheus/ | grep application metrics_application_Spark_shell_1568261490667_cores_Value 16 metrics_application_Spark_shell_1568261490667_runtime_ms_Value 143174 ``` ``` $ curl -s http://localhost:8081/metrics/json/ | jq { "version": "3.1.3", "gauges": { "worker.coresFree": { "value": 0 }, "worker.coresUsed": { "value": 16 }, "worker.executors": { "value": 1 }, "worker.memFree_MB": { "value": 30720 }, "worker.memUsed_MB": { "value": 1024 } }, ... $ curl -s http://localhost:8081/metrics/prometheus/ | grep worker metrics_worker_coresFree_Value 0 metrics_worker_coresUsed_Value 16 metrics_worker_executors_Value 1 metrics_worker_memFree_MB_Value 30720 metrics_worker_memUsed_MB_Value 1024 ``` ``` $ curl -s http://localhost:4040/metrics/json/ | jq { "version": "3.1.3", "gauges": { "app-20190911211130-0000.driver.BlockManager.disk.diskSpaceUsed_MB": { "value": 0 }, "app-20190911211130-0000.driver.BlockManager.memory.maxMem_MB": { "value": 732 }, "app-20190911211130-0000.driver.BlockManager.memory.maxOffHeapMem_MB": { "value": 0 }, "app-20190911211130-0000.driver.BlockManager.memory.maxOnHeapMem_MB": { "value": 732 }, ... $ curl -s http://localhost:4040/metrics/prometheus/ | head -n5 metrics_app_20190911211130_0000_driver_BlockManager_disk_diskSpaceUsed_MB_Value 0 metrics_app_20190911211130_0000_driver_BlockManager_memory_maxMem_MB_Value 732 metrics_app_20190911211130_0000_driver_BlockManager_memory_maxOffHeapMem_MB_Value 0 metrics_app_20190911211130_0000_driver_BlockManager_memory_maxOnHeapMem_MB_Value 732 metrics_app_20190911211130_0000_driver_BlockManager_memory_memUsed_MB_Value 0 ``` Closes apache#25769 from dongjoon-hyun/SPARK-29032-2. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: DB Tsai <d_tsai@apple.com>
1 parent bbfaadb commit 8f057a9

File tree

3 files changed

+147
-3
lines changed

3 files changed

+147
-3
lines changed
 

‎conf/metrics.properties.template

+16-1
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,15 @@
113113
# /metrics/applications/json # App information
114114
# /metrics/master/json # Master information
115115

116+
# org.apache.spark.metrics.sink.PrometheusServlet
117+
# Name: Default: Description:
118+
# path VARIES* Path prefix from the web server root
119+
#
120+
# * Default path is /metrics/prometheus for all instances except the master. The
121+
# master has two paths:
122+
# /metrics/applications/prometheus # App information
123+
# /metrics/master/prometheus # Master information
124+
116125
# org.apache.spark.metrics.sink.GraphiteSink
117126
# Name: Default: Description:
118127
# host NONE Hostname of the Graphite server, must be set
@@ -192,4 +201,10 @@
192201

193202
#driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
194203

195-
#executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
204+
#executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
205+
206+
# Example configuration for PrometheusServlet
207+
#*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet
208+
#*.sink.prometheusServlet.path=/metrics/prometheus
209+
#master.sink.prometheusServlet.path=/metrics/master/prometheus
210+
#applications.sink.prometheusServlet.path=/metrics/applications/prometheus

‎core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala

+10-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler
2828
import org.apache.spark.{SecurityManager, SparkConf}
2929
import org.apache.spark.internal.Logging
3030
import org.apache.spark.internal.config._
31-
import org.apache.spark.metrics.sink.{MetricsServlet, Sink}
31+
import org.apache.spark.metrics.sink.{MetricsServlet, PrometheusServlet, Sink}
3232
import org.apache.spark.metrics.source.{Source, StaticSources}
3333
import org.apache.spark.util.Utils
3434

@@ -83,13 +83,15 @@ private[spark] class MetricsSystem private (
8383

8484
// Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
8585
private var metricsServlet: Option[MetricsServlet] = None
86+
private var prometheusServlet: Option[PrometheusServlet] = None
8687

8788
/**
8889
* Get any UI handlers used by this metrics system; can only be called after start().
8990
*/
9091
def getServletHandlers: Array[ServletContextHandler] = {
9192
require(running, "Can only call getServletHandlers on a running MetricsSystem")
92-
metricsServlet.map(_.getHandlers(conf)).getOrElse(Array())
93+
metricsServlet.map(_.getHandlers(conf)).getOrElse(Array()) ++
94+
prometheusServlet.map(_.getHandlers(conf)).getOrElse(Array())
9395
}
9496

9597
metricsConfig.initialize()
@@ -201,6 +203,12 @@ private[spark] class MetricsSystem private (
201203
classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
202204
.newInstance(kv._2, registry, securityMgr)
203205
metricsServlet = Some(servlet)
206+
} else if (kv._1 == "prometheusServlet") {
207+
val servlet = Utils.classForName[PrometheusServlet](classPath)
208+
.getConstructor(
209+
classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
210+
.newInstance(kv._2, registry, securityMgr)
211+
prometheusServlet = Some(servlet)
204212
} else {
205213
val sink = Utils.classForName[Sink](classPath)
206214
.getConstructor(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.metrics.sink
19+
20+
import java.util.Properties
21+
import javax.servlet.http.HttpServletRequest
22+
23+
import com.codahale.metrics.MetricRegistry
24+
import org.eclipse.jetty.servlet.ServletContextHandler
25+
26+
import org.apache.spark.{SecurityManager, SparkConf}
27+
import org.apache.spark.ui.JettyUtils._
28+
29+
/**
30+
* This exposes the metrics of the given registry with Prometheus format.
31+
*
32+
* The output is consistent with /metrics/json result in terms of item ordering
33+
* and with the previous result of Spark JMX Sink + Prometheus JMX Converter combination
34+
* in terms of key string format.
35+
*/
36+
private[spark] class PrometheusServlet(
37+
val property: Properties,
38+
val registry: MetricRegistry,
39+
securityMgr: SecurityManager)
40+
extends Sink {
41+
42+
val SERVLET_KEY_PATH = "path"
43+
44+
val servletPath = property.getProperty(SERVLET_KEY_PATH)
45+
46+
def getHandlers(conf: SparkConf): Array[ServletContextHandler] = {
47+
Array[ServletContextHandler](
48+
createServletHandler(servletPath,
49+
new ServletParams(request => getMetricsSnapshot(request), "text/plain"), conf)
50+
)
51+
}
52+
53+
def getMetricsSnapshot(request: HttpServletRequest): String = {
54+
import scala.collection.JavaConverters._
55+
56+
val sb = new StringBuilder()
57+
registry.getGauges.asScala.foreach { case (k, v) =>
58+
if (!v.getValue.isInstanceOf[String]) {
59+
sb.append(s"${normalizeKey(k)}Value ${v.getValue}\n")
60+
}
61+
}
62+
registry.getCounters.asScala.foreach { case (k, v) =>
63+
sb.append(s"${normalizeKey(k)}Count ${v.getCount}\n")
64+
}
65+
registry.getHistograms.asScala.foreach { case (k, h) =>
66+
val snapshot = h.getSnapshot
67+
val prefix = normalizeKey(k)
68+
sb.append(s"${prefix}Count ${h.getCount}\n")
69+
sb.append(s"${prefix}Max ${snapshot.getMax}\n")
70+
sb.append(s"${prefix}Mean ${snapshot.getMean}\n")
71+
sb.append(s"${prefix}Min ${snapshot.getMin}\n")
72+
sb.append(s"${prefix}50thPercentile ${snapshot.getMedian}\n")
73+
sb.append(s"${prefix}75thPercentile ${snapshot.get75thPercentile}\n")
74+
sb.append(s"${prefix}95thPercentile ${snapshot.get95thPercentile}\n")
75+
sb.append(s"${prefix}98thPercentile ${snapshot.get98thPercentile}\n")
76+
sb.append(s"${prefix}99thPercentile ${snapshot.get99thPercentile}\n")
77+
sb.append(s"${prefix}999thPercentile ${snapshot.get999thPercentile}\n")
78+
sb.append(s"${prefix}StdDev ${snapshot.getStdDev}\n")
79+
}
80+
registry.getMeters.entrySet.iterator.asScala.foreach { kv =>
81+
val prefix = normalizeKey(kv.getKey)
82+
val meter = kv.getValue
83+
sb.append(s"${prefix}Count ${meter.getCount}\n")
84+
sb.append(s"${prefix}MeanRate ${meter.getMeanRate}\n")
85+
sb.append(s"${prefix}OneMinuteRate ${meter.getOneMinuteRate}\n")
86+
sb.append(s"${prefix}FiveMinuteRate ${meter.getFiveMinuteRate}\n")
87+
sb.append(s"${prefix}FifteenMinuteRate ${meter.getFifteenMinuteRate}\n")
88+
}
89+
registry.getTimers.entrySet.iterator.asScala.foreach { kv =>
90+
val prefix = normalizeKey(kv.getKey)
91+
val timer = kv.getValue
92+
val snapshot = timer.getSnapshot
93+
sb.append(s"${prefix}Count ${timer.getCount}\n")
94+
sb.append(s"${prefix}Max ${snapshot.getMax}\n")
95+
sb.append(s"${prefix}Mean ${snapshot.getMax}\n")
96+
sb.append(s"${prefix}Min ${snapshot.getMin}\n")
97+
sb.append(s"${prefix}50thPercentile ${snapshot.getMedian}\n")
98+
sb.append(s"${prefix}75thPercentile ${snapshot.get75thPercentile}\n")
99+
sb.append(s"${prefix}95thPercentile ${snapshot.get95thPercentile}\n")
100+
sb.append(s"${prefix}98thPercentile ${snapshot.get98thPercentile}\n")
101+
sb.append(s"${prefix}99thPercentile ${snapshot.get99thPercentile}\n")
102+
sb.append(s"${prefix}999thPercentile ${snapshot.get999thPercentile}\n")
103+
sb.append(s"${prefix}StdDev ${snapshot.getStdDev}\n")
104+
sb.append(s"${prefix}FifteenMinuteRate ${timer.getFifteenMinuteRate}\n")
105+
sb.append(s"${prefix}FiveMinuteRate ${timer.getFiveMinuteRate}\n")
106+
sb.append(s"${prefix}OneMinuteRate ${timer.getOneMinuteRate}\n")
107+
sb.append(s"${prefix}MeanRate ${timer.getMeanRate}\n")
108+
}
109+
sb.toString()
110+
}
111+
112+
private def normalizeKey(key: String): String = {
113+
s"metrics_${key.replaceAll("[^a-zA-Z0-9]", "_")}_"
114+
}
115+
116+
override def start() { }
117+
118+
override def stop() { }
119+
120+
override def report() { }
121+
}

0 commit comments

Comments
 (0)
Please sign in to comment.