5252
5353'''
5454# stdlib
55- from urlparse import urljoin
55+ from urlparse import urljoin , urlsplit , urlunsplit
5656
5757# 3rd party
58+ from requests .exceptions import Timeout , HTTPError , InvalidURL , ConnectionError
5859import requests
5960
6061# Project
6364# Default settings
6465DEFAULT_RM_URI = 'http://localhost:8088'
6566DEFAULT_TIMEOUT = 5
66-
67- # Path to retrieve cluster info
68- INFO_URI = '/ws/v1/cluster/info'
67+ DEFAULT_CUSTER_NAME = 'default_cluster'
6968
7069# Path to retrieve cluster metrics
7170YARN_CLUSTER_METRICS_PATH = '/ws/v1/cluster/metrics'
7877
7978# Metric types
8079GAUGE = 'gauge'
80+ INCREMENT = 'increment'
8181
8282# Name of the service check
8383SERVICE_CHECK_NAME = 'yarn.can_connect'
8484
85+ # Application states to collect
86+ YARN_APPLICATION_STATES = 'RUNNING'
87+
8588# Cluster metrics identifier
8689YARN_CLUSTER_METRICS_ELEMENT = 'clusterMetrics'
8790
114117
115118# Application metrics for YARN
116119YARN_APP_METRICS = {
117- 'progress' : ('yarn.apps.progress' , GAUGE ),
118- 'startedTime' : ('yarn.apps.started_time' , GAUGE ),
119- 'finishedTime' : ('yarn.apps.finished_time' , GAUGE ),
120- 'elapsedTime' : ('yarn.apps.elapsed_time' , GAUGE ),
121- 'allocatedMB' : ('yarn.apps.allocated_mb' , GAUGE ),
122- 'allocatedVCores' : ('yarn.apps.allocated_vcores' , GAUGE ),
123- 'runningContainers' : ('yarn.apps.running_containers' , GAUGE ),
124- 'memorySeconds' : ('yarn.apps.memory_seconds' , GAUGE ),
125- 'vcoreSeconds' : ('yarn.apps.vcore_seconds' , GAUGE ),
120+ 'progress' : ('yarn.apps.progress' , INCREMENT ),
121+ 'startedTime' : ('yarn.apps.started_time' , INCREMENT ),
122+ 'finishedTime' : ('yarn.apps.finished_time' , INCREMENT ),
123+ 'elapsedTime' : ('yarn.apps.elapsed_time' , INCREMENT ),
124+ 'allocatedMB' : ('yarn.apps.allocated_mb' , INCREMENT ),
125+ 'allocatedVCores' : ('yarn.apps.allocated_vcores' , INCREMENT ),
126+ 'runningContainers' : ('yarn.apps.running_containers' , INCREMENT ),
127+ 'memorySeconds' : ('yarn.apps.memory_seconds' , INCREMENT ),
128+ 'vcoreSeconds' : ('yarn.apps.vcore_seconds' , INCREMENT ),
126129}
127130
128131# Node metrics for YARN
@@ -146,61 +149,61 @@ def check(self, instance):
146149 # Get properties from conf file
147150 rm_address = instance .get ('resourcemanager_uri' , DEFAULT_RM_URI )
148151
149- # Get the cluster ID from Yarn
150- cluster_id = self ._get_cluster_id (rm_address )
152+ # Get additional tags from the conf file
153+ tags = instance .get ('tags' , [])
154+ if tags is None :
155+ tags = []
156+ else :
157+ tags = list (set (tags ))
151158
152- # Get metrics from the Resource Manager
153- self ._yarn_cluster_metrics (cluster_id , rm_address )
154- self ._yarn_app_metrics (rm_address )
155- self ._yarn_node_metrics (cluster_id , rm_address )
159+ # Get the cluster name from the conf file
160+ cluster_name = instance .get ('cluster_name' )
161+ if cluster_name is None :
162+ self .warning ("The cluster_name must be specified in the instance configuration, defaulting to '%s'" % (DEFAULT_CUSTER_NAME ))
163+ cluster_name = DEFAULT_CUSTER_NAME
156164
157- def _get_cluster_id (self , rm_address ):
158- '''
159- Return the cluster ID, otherwise raise an exception
160- '''
161- info_json = self ._rest_request_to_json (rm_address , INFO_URI )
165+ tags .append ('cluster_name:%s' % cluster_name )
162166
163- cluster_id = info_json .get ('clusterInfo' , {}).get ('id' )
164- if cluster_id is not None :
165- return cluster_id
166-
167- raise Exception ('Unable to retrieve cluster ID from ResourceManger' )
167+ # Get metrics from the Resource Manager
168+ self ._yarn_cluster_metrics (rm_address , tags )
169+ self ._yarn_app_metrics (rm_address , tags )
170+ self ._yarn_node_metrics (rm_address , tags )
168171
169- def _yarn_cluster_metrics (self , cluster_id , rm_address ):
172+ def _yarn_cluster_metrics (self , rm_address , addl_tags ):
170173 '''
171174 Get metrics related to YARN cluster
172175 '''
173176 metrics_json = self ._rest_request_to_json (rm_address , YARN_CLUSTER_METRICS_PATH )
174177
175178 if metrics_json :
176- tags = ['cluster_id:' + str (cluster_id )]
177179
178180 yarn_metrics = metrics_json [YARN_CLUSTER_METRICS_ELEMENT ]
179181
180182 if yarn_metrics is not None :
181- self ._set_yarn_metrics_from_json (tags , yarn_metrics , YARN_CLUSTER_METRICS )
183+ self ._set_yarn_metrics_from_json (addl_tags , yarn_metrics , YARN_CLUSTER_METRICS )
182184
183- def _yarn_app_metrics (self , rm_address ):
185+ def _yarn_app_metrics (self , rm_address , addl_tags ):
184186 '''
185- Get metrics related to YARN applications
187+ Get metrics for running applications
186188 '''
187- metrics_json = self ._rest_request_to_json (rm_address , YARN_APPS_PATH )
189+ metrics_json = self ._rest_request_to_json (rm_address ,
190+ YARN_APPS_PATH ,
191+ states = YARN_APPLICATION_STATES )
188192
189193 if metrics_json :
190194 if metrics_json ['apps' ] is not None :
191195 if metrics_json ['apps' ]['app' ] is not None :
192196
193197 for app_json in metrics_json ['apps' ]['app' ]:
194198
195- cluster_id = app_json ['clusterId' ]
196- app_id = app_json ['id' ]
199+ app_name = app_json ['name' ]
197200
198- tags = ['cluster_id:' + str (cluster_id ), 'app_id:' + str (app_id )]
201+ tags = ['app_name:%s' % str (app_name )]
202+ tags .extend (addl_tags )
199203
200204 self ._set_yarn_metrics_from_json (tags , app_json , YARN_APP_METRICS )
201205
202-
203- def _yarn_node_metrics (self , cluster_id , rm_address ):
206+ def _yarn_node_metrics (self , rm_address , addl_tags ):
204207 '''
205208 Get metrics related to YARN nodes
206209 '''
@@ -210,11 +213,11 @@ def _yarn_node_metrics(self, cluster_id, rm_address):
210213 if metrics_json ['nodes' ] is not None :
211214 if metrics_json ['nodes' ]['node' ] is not None :
212215
213- tags = ['cluster_id:' + str (cluster_id )]
214-
215216 for node_json in metrics_json ['nodes' ]['node' ]:
216217 node_id = node_json ['id' ]
217- tags .append ('node_id:' + str (node_id ))
218+
219+ tags = ['node_id:%s' % str (node_id )]
220+ tags .extend (addl_tags )
218221
219222 self ._set_yarn_metrics_from_json (tags , node_json , YARN_NODE_METRICS )
220223
@@ -237,34 +240,51 @@ def _set_metric(self, metric_name, metric_type, value, tags=None, device_name=No
237240 '''
238241 if metric_type == GAUGE :
239242 self .gauge (metric_name , value , tags = tags , device_name = device_name )
243+ elif metric_type == INCREMENT :
244+ self .increment (metric_name , value , tags = tags , device_name = device_name )
240245 else :
241246 self .log .error ('Metric type "%s" unknown' % (metric_type ))
242247
243- def _rest_request_to_json (self , address , object_path ):
248+ def _rest_request_to_json (self , address , object_path , * args , ** kwargs ):
244249 '''
245250 Query the given URL and return the JSON response
246251 '''
247252 response_json = None
248253
249- service_check_tags = ['instance :%s' % self .hostname ]
254+ service_check_tags = ['url :%s' % self ._get_url_base ( address ) ]
250255
251- url = urljoin (address , object_path )
256+ url = address
257+
258+ if object_path :
259+ url = self ._join_url_dir (url , object_path )
260+
261+ # Add args to the url
262+ if args :
263+ for directory in args :
264+ url = self ._join_url_dir (url , directory )
265+
266+ self .log .debug ('Attempting to connect to "%s"' % url )
267+
268+ # Add kwargs as arguments
269+ if kwargs :
270+ query = '&' .join (['{0}={1}' .format (key , value ) for key , value in kwargs .iteritems ()])
271+ url = urljoin (url , '?' + query )
252272
253273 try :
254274 response = requests .get (url )
255275 response .raise_for_status ()
256276 response_json = response .json ()
257277
258- except requests . exceptions . Timeout as e :
278+ except Timeout as e :
259279 self .service_check (SERVICE_CHECK_NAME ,
260280 AgentCheck .CRITICAL ,
261281 tags = service_check_tags ,
262282 message = "Request timeout: {0}, {1}" .format (url , e ))
263283 raise
264284
265- except (requests . exceptions . HTTPError ,
266- requests . exceptions . InvalidURL ,
267- requests . exceptions . ConnectionError ) as e :
285+ except (HTTPError ,
286+ InvalidURL ,
287+ ConnectionError ) as e :
268288 self .service_check (SERVICE_CHECK_NAME ,
269289 AgentCheck .CRITICAL ,
270290 tags = service_check_tags ,
@@ -285,3 +305,20 @@ def _rest_request_to_json(self, address, object_path):
285305 message = 'Connection to %s was successful' % url )
286306
287307 return response_json
308+
309+ def _join_url_dir (self , url , * args ):
310+ '''
311+ Join a URL with multiple directories
312+ '''
313+ for path in args :
314+ url = url .rstrip ('/' ) + '/'
315+ url = urljoin (url , path .lstrip ('/' ))
316+
317+ return url
318+
319+ def _get_url_base (self , url ):
320+ '''
321+ Return the base of a URL
322+ '''
323+ s = urlsplit (url )
324+ return urlunsplit ([s .scheme , s .netloc , '' , '' , '' ])
0 commit comments