-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprobers.py
executable file
·269 lines (216 loc) · 8.99 KB
/
probers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# Copyright 2014 Google Inc. All Rights Reserved.
"""AppRTC Probers.
This module implements CEOD and collider probers.
"""
import json
import logging
import numbers
import compute_page
import constants
import webapp2
from google.appengine.api import app_identity
from google.appengine.api import mail
from google.appengine.api import memcache
from google.appengine.api import urlfetch
PROBER_FETCH_DEADLINE = 30
def is_prober_enabled():
"""Check the application ID so that other projects hosting AppRTC code does
not hit CEOD/Collider unnecessarily."""
return app_identity.get_application_id() == 'apprtc'
def send_alert_email(tag, message):
"""Send an alert email to [email protected]."""
receiver = '[email protected]'
sender_address = 'AppRTC Notification <[email protected]>'
subject = 'AppRTC Prober Alert: ' + tag
body = """
AppRTC Prober detected an error:
%s
Goto go/apprtc-sheriff for how to handle this error.
""" % message
logging.info('Sending email to %s: subject=%s, message=%s',
receiver, subject, message)
mail.send_mail(sender_address, receiver, subject, body)
def has_non_empty_string_value(dictionary, key):
return (key in dictionary and
isinstance(dictionary[key], basestring) and
dictionary[key])
def has_non_empty_array_value(dictionary, key):
return (key in dictionary and
isinstance(dictionary[key], list) and
dictionary[key])
def get_collider_probe_success_key(instance_host):
"""Returns the memcache key for the last collider instance probing result."""
return 'last_collider_probe_success_' + instance_host
class ProbeCEODPage(webapp2.RequestHandler):
"""Page to probe CEOD server."""
def handle_ceod_response(self, error_message, status_code):
self.response.set_status(status_code)
if error_message is not None:
send_alert_email('CEOD Error', error_message)
logging.warning('CEOD prober error: ' + error_message)
self.response.out.write(error_message)
else:
self.response.out.write('Success!')
def probe_ceod(self):
ceod_url = (constants.TURN_URL_TEMPLATE
% (constants.TURN_BASE_URL, 'prober', constants.CEOD_KEY))
sanitized_url = (constants.TURN_URL_TEMPLATE %
(constants.TURN_BASE_URL, 'prober', '<obscured>'))
error_message = None
result = None
try:
result = urlfetch.fetch(
url=ceod_url, method=urlfetch.GET, deadline=PROBER_FETCH_DEADLINE)
except urlfetch.Error as e:
error_message = ('urlfetch throws exception: %s' % str(e))
return (error_message, 500)
status_code = result.status_code
if status_code != 200:
error_message = ('Unexpected CEOD response: %d, requested URL: %s'
% (result.status_code, sanitized_url))
else:
try:
turn_server = json.loads(result.content)
if (not has_non_empty_string_value(turn_server, 'username') or
not has_non_empty_string_value(turn_server, 'password') or
not has_non_empty_array_value(turn_server, 'uris')):
error_message = ('CEOD response does not contain valid '
'username/password/uris: response = %s, url = %s'
% (result.content, sanitized_url))
status_code = 500
except ValueError as e:
error_message = """
CEOD response cannot be decoded as JSON:
exception = %s,
response = %s,
url = %s
""" % (str(e), result.content, sanitized_url)
status_code = 500
return (error_message, status_code)
def get(self):
if not is_prober_enabled():
return
error_message, status_code = self.probe_ceod()
if error_message is not None:
logging.info("Retry probing CEOD.")
error_message, status_code = self.probe_ceod()
self.handle_ceod_response(error_message, status_code)
class ProbeColliderPage(webapp2.RequestHandler):
"""Page to probe Collider instances."""
def handle_collider_response(
self, error_message, status_code, collider_instance):
"""Send an alert email and restart the instance if needed.
Args:
error_message: The error message for the response, or None if no error.
status_code: The status code of the HTTP response.
collider_instance: One of constants.WSS_INSTANCES representing the
instance being handled.
Returns:
A dictionary object containing the result.
"""
result = {
constants.WSS_HOST_STATUS_CODE_KEY: status_code
}
memcache_key = get_collider_probe_success_key(
collider_instance[constants.WSS_INSTANCE_NAME_KEY])
host = collider_instance[constants.WSS_INSTANCE_HOST_KEY]
if error_message is not None:
logging.warning(
'Collider prober error: ' + error_message + ' for ' + host)
result[constants.WSS_HOST_ERROR_MESSAGE_KEY] = error_message
result[constants.WSS_HOST_IS_UP_KEY] = False
last_probe_success = memcache.get(memcache_key)
# Restart the collider instance if the last probing was successful.
if last_probe_success is None or last_probe_success is True:
logging.info('Restarting the collider instance')
compute_page.enqueue_restart_task(
collider_instance[constants.WSS_INSTANCE_NAME_KEY],
collider_instance[constants.WSS_INSTANCE_ZONE_KEY])
error_message += """
Restarting the collider instance automatically.
"""
send_alert_email('Collider %s error' % host, error_message)
else:
result[constants.WSS_HOST_IS_UP_KEY] = True
memcache.set(memcache_key, result[constants.WSS_HOST_IS_UP_KEY])
return result
def store_instance_state(self, probing_results):
# Store an active collider host to memcache to be served to clients.
# If the currently active host is still up, keep it. If not, pick a
# new active host that is up.
memcache_client = memcache.Client()
for retries in xrange(constants.MEMCACHE_RETRY_LIMIT):
active_host = memcache_client.gets(constants.WSS_HOST_ACTIVE_HOST_KEY)
if active_host is None:
memcache_client.set(constants.WSS_HOST_ACTIVE_HOST_KEY, '')
active_host = memcache_client.gets(constants.WSS_HOST_ACTIVE_HOST_KEY)
active_host = self.create_collider_active_host(active_host,
probing_results)
if memcache_client.cas(constants.WSS_HOST_ACTIVE_HOST_KEY, active_host):
logging.info('collider active host saved to memcache: ' +
str(active_host))
break
logging.warning('retry # ' + str(retries) + ' to set collider status')
def create_collider_active_host(self, old_active_host, probing_results):
# If the old_active_host is still up, keep it. If not, pick a new active
# host that is up.
try:
if (old_active_host in probing_results and
probing_results[old_active_host].get(
constants.WSS_HOST_IS_UP_KEY, False)):
return old_active_host
except TypeError:
pass
for instance in probing_results:
if probing_results[instance].get(constants.WSS_HOST_IS_UP_KEY, False):
return instance
return None
def get(self):
if not is_prober_enabled():
return
results = {}
for instance in constants.WSS_INSTANCES:
host = instance[constants.WSS_INSTANCE_HOST_KEY]
results[host] = self.probe_collider_instance(instance)
self.response.write(json.dumps(results, indent=2, sort_keys=True))
self.store_instance_state(results)
def probe_collider_instance(self, collider_instance):
collider_host = collider_instance[constants.WSS_INSTANCE_HOST_KEY]
url = 'https://' + collider_host + '/status'
error_message = None
result = None
try:
result = urlfetch.fetch(
url=url, method=urlfetch.GET, deadline=PROBER_FETCH_DEADLINE)
except urlfetch.Error as e:
error_message = ('urlfetch throws exception: %s' % str(e))
return self.handle_collider_response(
error_message, 500, collider_instance)
status_code = result.status_code
if status_code != 200:
error_message = ('Unexpected collider response: %d, requested URL: %s'
% (result.status_code, url))
else:
try:
status_report = json.loads(result.content)
if ('upsec' not in status_report or
not isinstance(status_report['upsec'], numbers.Number)):
error_message = """
Invalid 'upsec' field in Collider status response,
status = %s
""" % result.content
status_code = 500
except ValueError as e:
error_message = """
Collider status response cannot be decoded as JSON:
exception = %s,
response = %s,
url = %s
""" % (str(e), result.content, url)
status_code = 500
return self.handle_collider_response(
error_message, status_code, collider_instance)
app = webapp2.WSGIApplication([
('/probe/ceod', ProbeCEODPage),
('/probe/collider', ProbeColliderPage),
], debug=True)