34
34
log = logging .getLogger (__name__ )
35
35
36
36
SERVICE_SLEEP_INTERVAL = 1
37
- MAX_FAILED_HEARTBEATS = 8 # runs of collector
37
+ MAX_FAILED_HEARTBEATS = 8 # runs of collector
38
+
38
39
39
40
class AgentSvc (win32serviceutil .ServiceFramework ):
40
41
_svc_name_ = "DatadogAgent"
@@ -63,14 +64,18 @@ def __init__(self, args):
63
64
self ._max_failed_heartbeats = \
64
65
MAX_FAILED_HEARTBEATS * agentConfig ['check_freq' ] / SERVICE_SLEEP_INTERVAL
65
66
67
+ # Watch JMXFetch restarts
68
+ self ._MAX_JMXFETCH_RESTARTS = 3
69
+ self ._count_jmxfetch_restarts = 0
70
+
66
71
# Keep a list of running processes so we can start/end as needed.
67
72
# Processes will start started in order and stopped in reverse order.
68
73
self .procs = {
69
- 'forwarder' : DDForwarder (config , self .hostname ),
70
- 'collector' : DDAgent (agentConfig , self .hostname ,
71
- heartbeat = self ._collector_send_heartbeat ),
72
- 'dogstatsd' : DogstatsdProcess (config , self .hostname ),
73
- 'jmxfetch' : JMXFetchProcess (config , self .hostname ),
74
+ 'forwarder' : ProcessWatchDog ( "forwarder" , DDForwarder (config , self .hostname ) ),
75
+ 'collector' : ProcessWatchDog ( "collector" , DDAgent (agentConfig , self .hostname ,
76
+ heartbeat = self ._collector_send_heartbeat ) ),
77
+ 'dogstatsd' : ProcessWatchDog ( "dogstatsd" , DogstatsdProcess (config , self .hostname ) ),
78
+ 'jmxfetch' : ProcessWatchDog ( "jmxfetch" , JMXFetchProcess (config , self .hostname ), 3 ),
74
79
}
75
80
76
81
def SvcStop (self ):
@@ -99,9 +104,9 @@ def SvcDoRun(self):
99
104
while self .running :
100
105
# Restart any processes that might have died.
101
106
for name , proc in self .procs .iteritems ():
102
- if not proc .is_alive () and proc .is_enabled :
103
- servicemanager .LogInfoMsg ("%s has died. Restarting..." % proc . name )
104
- self . _restart_proc ( name )
107
+ if not proc .is_alive () and proc .is_enabled () :
108
+ servicemanager .LogInfoMsg ("%s has died. Restarting..." % name )
109
+ proc . restart ( )
105
110
106
111
self ._check_collector_blocked ()
107
112
@@ -117,25 +122,49 @@ def _check_collector_blocked(self):
117
122
if self ._collector_failed_heartbeats > self ._max_failed_heartbeats :
118
123
servicemanager .LogInfoMsg (
119
124
"%s was unresponsive for too long. Restarting..." % 'collector' )
120
- self ._restart_proc ( 'collector' )
125
+ self .procs [ 'collector' ]. restart ( )
121
126
self ._collector_failed_heartbeats = 0
122
127
123
- def _restart_proc (self , proc_name ):
128
+
129
+ class ProcessWatchDog (object ):
130
+ """
131
+ Monitor the attached process.
132
+ Restarts when it exits until the limit set is reached.
133
+ """
134
+ def __init__ (self , name , process , max_restarts = 5 ):
135
+ self ._name = name
136
+ self ._process = process
137
+ self ._count_restarts = 0
138
+ self ._MAX_RESTARTS = max_restarts
139
+
140
+ def start (self ):
141
+ return self ._process .start ()
142
+
143
+ def terminate (self ):
144
+ return self ._process .terminate ()
145
+
146
+ def is_alive (self ):
147
+ return self ._process .is_alive ()
148
+
149
+ def is_enabled (self ):
150
+ return self ._process .is_enabled
151
+
152
+ def restart (self ):
153
+ self ._count_restarts += 1
154
+ if self ._count_restarts >= self ._MAX_RESTARTS :
155
+ servicemanager .LogInfoMsg (
156
+ "%s reached the limit of restarts. Not restarting..." % self ._name )
157
+ self ._process .is_enabled = False
158
+ return
159
+
124
160
# Make a new proc instances because multiprocessing
125
161
# won't let you call .start() twice on the same instance.
126
- old_proc = self .procs [proc_name ]
127
- if proc_name == 'collector' :
128
- new_proc = old_proc .__class__ (
129
- old_proc .config , self .hostname , heartbeat = self ._collector_send_heartbeat )
130
- else :
131
- new_proc = old_proc .__class__ (old_proc .config , self .hostname )
162
+ if self ._process .is_alive ():
163
+ self ._process .terminate ()
132
164
133
- if old_proc .is_alive ():
134
- old_proc .terminate ()
135
- del self .procs [proc_name ]
165
+ self ._process = self ._process .__class__ (self ._process .config , self ._process .hostname )
166
+ self ._process .start ()
136
167
137
- new_proc .start ()
138
- self .procs [proc_name ] = new_proc
139
168
140
169
class DDAgent (multiprocessing .Process ):
141
170
def __init__ (self , agentConfig , hostname , heartbeat = None ):
@@ -240,25 +269,24 @@ class JMXFetchProcess(multiprocessing.Process):
240
269
def __init__ (self , agentConfig , hostname ):
241
270
multiprocessing .Process .__init__ (self , name = 'jmxfetch' )
242
271
self .config = agentConfig
243
- self .is_enabled = True
244
272
self .hostname = hostname
245
273
246
- osname = get_os ()
247
274
try :
275
+ osname = get_os ()
248
276
confd_path = get_confd_path (osname )
249
- except PathNotFound , e :
250
- log . error ( "No conf.d folder found at '%s' or in the directory where"
251
- "the Agent is currently deployed. \n " % e . args [ 0 ] )
277
+ self . jmx_daemon = JMXFetch ( confd_path , agentConfig )
278
+ self . jmx_daemon . configure ()
279
+ self . is_enabled = self . jmx_daemon . should_run ( )
252
280
253
- self .jmx_daemon = JMXFetch (confd_path , agentConfig )
281
+ except PathNotFound :
282
+ self .is_enabled = False
254
283
255
284
def run (self ):
256
- log . debug ( "Windows Service - Starting JMXFetch" )
257
- self .jmx_daemon .run ()
285
+ if self . is_enabled :
286
+ self .jmx_daemon .run ()
258
287
259
288
def stop (self ):
260
- log .debug ("Windows Service - Stopping JMXFetch" )
261
- self .jmx_daemon .terminate ()
289
+ pass
262
290
263
291
264
292
if __name__ == '__main__' :
0 commit comments