10
10
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11
11
# ANY KIND, either express or implied. See the License for the specific
12
12
# language governing permissions and limitations under the License.
13
-
13
+
14
14
import asyncio
15
15
import json
16
16
import time
17
17
import traceback
18
18
from contextlib import suppress
19
19
from datetime import datetime
20
-
20
+
21
21
from notebook .utils import url_path_join
22
-
23
-
22
+
23
+
24
24
class IdleChecker (object ):
25
25
def __init__ (self ):
26
26
self .interval = 10 # frequency for checking idle sessions in seconds
@@ -36,7 +36,7 @@ def __init__(self):
36
36
self .app_url = "http://0.0.0.0:8888"
37
37
self .keep_terminals = False
38
38
self .inservice_apps = {}
39
-
39
+
40
40
# Function to GET the xsrf token
41
41
async def fetch_xsrf_token (self ):
42
42
url = url_path_join (self .app_url , self .base_url , "tree" )
@@ -45,9 +45,9 @@ async def fetch_xsrf_token(self):
45
45
self .log .info ("response headers: " + str (response .headers ))
46
46
if "Set-Cookie" in response .headers :
47
47
return response .headers ["Set-Cookie" ].split (";" )[0 ].split ("=" )[1 ]
48
-
48
+
49
49
return None
50
-
50
+
51
51
# Invoke idle_checks() function
52
52
async def run_idle_checks (self ):
53
53
while True :
@@ -59,7 +59,7 @@ async def run_idle_checks(self):
59
59
except Exception :
60
60
self .errors = traceback .format_exc ()
61
61
self .log .error (self .errors )
62
-
62
+
63
63
# Entrypoint function to get the value from handlers(POST API call) and start background job
64
64
def start (self , base_url , log_handler , client , idle_time , keep_terminals ):
65
65
self .idle_time = idle_time
@@ -68,26 +68,26 @@ def start(self, base_url, log_handler, client, idle_time, keep_terminals):
68
68
self .log = log_handler
69
69
self .keep_terminals = keep_terminals
70
70
self .errors = None # clear error array at start
71
-
71
+
72
72
if not self ._running :
73
73
self .count += 1
74
74
self ._running = True
75
75
self .task = asyncio .ensure_future (self .run_idle_checks ())
76
-
76
+
77
77
async def stop (self ):
78
78
if self ._running :
79
79
self ._running = False
80
80
if self .task :
81
81
self .task .cancel ()
82
82
with suppress (asyncio .CancelledError ):
83
83
await self .task
84
-
84
+
85
85
def get_runcounts (self ):
86
86
return self .count
87
-
87
+
88
88
def get_runerrors (self ):
89
89
return self .errors
90
-
90
+
91
91
# Function to check if the notebook is in Idle state
92
92
def is_idle (self , last_activity , seconds = False ):
93
93
last_activity = datetime .strptime (last_activity , "%Y-%m-%dT%H:%M:%S.%fz" )
@@ -107,43 +107,43 @@ def is_idle(self, last_activity, seconds=False):
107
107
"Notebook is not idle. Last activity time = " + str (last_activity )
108
108
)
109
109
return False
110
-
110
+
111
111
# Function to get the list of Kernel sessions
112
112
async def get_sessions (self ):
113
113
url = url_path_join (self .app_url , self .base_url , "api" , "sessions" )
114
114
response = await self .tornado_client .fetch (url , method = "GET" )
115
115
sessions = json .loads (response .body )
116
116
self .log .info (" Kernel Session is = " + str (sessions ))
117
117
return sessions
118
-
118
+
119
119
# Function to get the list of System Terminals
120
120
async def get_terminals (self ):
121
121
terminal_url = url_path_join (self .app_url , self .base_url , "api" , "terminals" )
122
122
terminal_response = await self .tornado_client .fetch (terminal_url , method = "GET" )
123
123
terminals = json .loads (terminal_response .body )
124
124
return terminals
125
-
125
+
126
126
# Function to get the list of running Apps
127
127
async def get_apps (self ):
128
128
url = url_path_join (self .app_url , self .base_url , "sagemaker" , "api" , "apps" )
129
129
response = await self .tornado_client .fetch (url , method = "GET" )
130
130
apps = json .loads (response .body )
131
131
self .log .info (" Running App name is = " + str (apps ))
132
132
return apps
133
-
133
+
134
134
# Function to build app information ( kernel sessions and image terminals)
135
135
async def build_app_info (self ):
136
136
apps = await self .get_apps ()
137
137
apps_info = {}
138
138
for app in apps :
139
139
apps_info [app ["app_name" ]] = {"app" : app , "sessions" : [], "terminals" : []}
140
-
140
+
141
141
sessions = await self .get_sessions ()
142
142
for notebook in sessions :
143
143
if notebook ["kernel" ]:
144
144
notebook_app_name = notebook ["kernel" ]["app_name" ]
145
145
apps_info [notebook_app_name ]["sessions" ].append (notebook )
146
-
146
+
147
147
terminals = await self .get_terminals ()
148
148
for terminal in terminals :
149
149
if terminal ["name" ].find ("arn:" ) != 0 :
@@ -154,18 +154,18 @@ async def build_app_info(self):
154
154
self .log .info ("Env Arn = " + str (env_arn ))
155
155
self .log .info ("Terminal Id = " + str (terminal_id ))
156
156
self .log .info ("Instance Type = " + str (instance_type ))
157
-
157
+
158
158
for app in apps :
159
159
if (
160
160
app ["environment_arn" ] == env_arn
161
161
and app ["instance_type" ] == instance_type
162
162
):
163
163
apps_info [app ["app_name" ]]["terminals" ].append (terminal )
164
164
break
165
-
165
+
166
166
self .log .info (str (apps_info ))
167
167
return apps_info
168
-
168
+
169
169
# Function to delete a kernel session
170
170
async def delete_session (self , session ):
171
171
headers = {}
@@ -178,7 +178,7 @@ async def delete_session(self, session):
178
178
)
179
179
deleted = await self .tornado_client .fetch (url , method = "DELETE" , headers = headers )
180
180
self .log .info ("Delete kernel response: " + str (deleted ))
181
-
181
+
182
182
# Function to delete an application
183
183
async def delete_application (self , app_id ):
184
184
headers = {}
@@ -194,12 +194,12 @@ async def delete_application(self, app_id):
194
194
self .log .info ("Delete App response: " + str (deleted_apps ))
195
195
if deleted_apps .code == 204 or deleted_apps .code == 200 :
196
196
self .inservice_apps .pop (app_id , None )
197
-
197
+
198
198
# Function to check the notebook status
199
199
def check_notebook (self , notebook ):
200
200
terminate = True
201
- if notebook ["kernel" ]["execution_state" ] == "idle" :
202
- self .log .info ("found idle session:" + str (notebook ))
201
+ if notebook ["kernel" ]["execution_state" ] in ( "idle" , "starting" ) :
202
+ self .log .info ("found idle/starting session:" + str (notebook ))
203
203
if not self .ignore_connections :
204
204
if notebook ["kernel" ]["connections" ] == 0 :
205
205
if not self .is_idle (notebook ["kernel" ]["last_activity" ]):
@@ -212,7 +212,7 @@ def check_notebook(self, notebook):
212
212
else :
213
213
terminate = False
214
214
return terminate
215
-
215
+
216
216
# Run idle checks apps and image terminals
217
217
async def idle_checks (self ):
218
218
apps_info = await self .build_app_info ()
@@ -221,38 +221,38 @@ async def idle_checks(self):
221
221
for deleted_app in deleted_apps :
222
222
inservice_apps .pop (deleted_app , None )
223
223
self .log .info ("inservice app not inservice anymore : " + str (deleted_app ))
224
-
224
+
225
225
for app_name , app in apps_info .items ():
226
226
num_sessions = len (app ["sessions" ])
227
227
num_terminals = len (app ["terminals" ])
228
-
228
+
229
229
if num_sessions > 0 or num_terminals > 0 :
230
230
self .log .info (
231
231
"# of sessions: "
232
232
+ str (num_sessions )
233
233
+ "; # of terminals: "
234
234
+ str (num_terminals )
235
235
)
236
-
236
+
237
237
if num_sessions == 0 and num_terminals == 0 :
238
238
# Check if app is active and kill
239
239
# Check if the current app is part of the in service apps
240
240
if app_name not in inservice_apps :
241
241
# Regsiter a new inservice app
242
242
inservice_apps [app_name ] = time .time ()
243
-
243
+
244
244
else :
245
245
if int (time .time () - inservice_apps [app_name ]) > self .idle_time :
246
246
self .log .info (
247
247
"Keep alive time for terminal reached : " + str (app_name )
248
248
)
249
249
await self .delete_application (app_name )
250
-
250
+
251
251
# elif num_sessions < 1 and num_terminals > 0 and self.keep_terminals == True:
252
252
elif num_sessions < 1 and num_terminals > 0 and self .keep_terminals :
253
253
self .log .info ("keep terminals flag is True. Not killing the terminals." )
254
254
pass
255
-
255
+
256
256
elif (
257
257
# num_sessions < 1 and num_terminals > 0 and self.keep_terminals == False
258
258
num_sessions < 1
@@ -262,25 +262,29 @@ async def idle_checks(self):
262
262
self .log .info ("keep terminals flag: " + str (self .keep_terminals ))
263
263
# Wait for the inservice app
264
264
self .log .info ("New inservice app found : " + str (app_name ))
265
-
265
+
266
266
# Check if the current app is part of the in service apps
267
267
if app_name not in inservice_apps :
268
268
# Regsiter a new inservice app
269
269
inservice_apps [app_name ] = time .time ()
270
-
270
+
271
271
else :
272
272
if int (time .time () - inservice_apps [app_name ]) > self .idle_time :
273
273
self .log .info (
274
274
"Keepalive time for terminal reached : " + str (app_name )
275
275
)
276
276
await self .delete_application (app_name )
277
-
277
+
278
278
elif num_sessions > 0 :
279
279
# let's check if we have idle notebooks to kill
280
280
nb_deleted = 0
281
281
for notebook in app ["sessions" ]:
282
282
if self .check_notebook (notebook ):
283
- await self .delete_session (notebook )
284
- nb_deleted += 1
283
+ # handle kernel sessions which are stuck in "starting" state
284
+ if notebook ["kernel" ]["execution_state" ] == "starting" :
285
+ nb_deleted += 1
286
+ else :
287
+ await self .delete_session (notebook )
288
+ nb_deleted += 1
285
289
if num_sessions == nb_deleted and (not self .keep_terminals or num_terminals == 0 ):
286
290
await self .delete_application (app_name )
0 commit comments