1
1
from __future__ import absolute_import
2
- from .celery_broker import app
3
- import celery .signals
4
2
import os
5
3
import sys
6
4
import time
7
5
import json
8
- import traceback
9
- import numpy
10
- from next .constants import DEBUG_ON
11
6
import hashlib
7
+ import traceback
12
8
from functools import wraps
9
+ import numpy
10
+ import celery .signals
11
+ from .celery_broker import app
13
12
14
- # import next.logging_client.LoggerHTTP as ell
15
- from next .database_client .DatabaseAPI import DatabaseAPI
16
- db = DatabaseAPI ()
17
- from next .logging_client .LoggerAPI import LoggerAPI
18
- ell = LoggerAPI ()
19
13
import next .utils
20
14
import next .constants
21
- import next .apps .Butler as Butler
22
15
import next .lib .pijemont .verifier as verifier
16
+ from next .constants import DEBUG_ON
17
+ from next .apps .Butler import Butler
18
+ from next .database_client .DatabaseAPI import DatabaseAPI
19
+ from next .logging_client .LoggerAPI import LoggerAPI
20
+
21
+ db = DatabaseAPI ()
22
+ ell = LoggerAPI ()
23
23
24
- Butler = Butler .Butler
25
24
26
25
class App_Wrapper :
27
26
def __init__ (self , app_id , exp_uid , db , ell ):
@@ -98,96 +97,102 @@ def apply(app_id, exp_uid, task_name, args_in_json, enqueue_timestamp):
98
97
return return_value
99
98
100
99
def apply_dashboard (app_id , exp_uid , args_in_json , enqueue_timestamp ):
101
- enqueue_datetime = next .utils .str2datetime (enqueue_timestamp )
102
- dequeue_datetime = next .utils .datetimeNow ()
103
- delta_datetime = dequeue_datetime - enqueue_datetime
104
- time_enqueued = delta_datetime .seconds + delta_datetime .microseconds / 1000000.
105
- dir , _ = os .path .split (__file__ )
106
- reference_dict ,errs = verifier .load_doc ('{}/myApp.yaml' .format (app_id , app_id ),"apps/" )
107
- if len (errs ) > 0 :
108
- raise Exception ("App YAML format errors: \n {}" .format (str (errs )))
109
- args_dict = verifier .verify (args_in_json , reference_dict ['getStats' ]['args' ])
110
- stat_id = args_dict ['args' ].get ('stat_id' ,'none' )
111
-
112
- stat_args = args_dict ['args' ]
113
-
114
- hash_object = hashlib .md5 (stat_id + '_' + json .dumps (stat_args ['params' ]))
115
- stat_uid = hash_object .hexdigest ()
116
- stat_uid += '_' + exp_uid
117
-
118
- app = App_Wrapper (app_id , exp_uid , db , ell )
119
- cached_doc = app .butler .dashboard .get (uid = stat_uid )
120
- cached_response = None
121
- if (int (stat_args .get ('force_recompute' ,0 ))== 0 ) and (cached_doc is not None ):
122
- delta_datetime = (next .utils .datetimeNow () - next .utils .str2datetime (cached_doc ['timestamp' ]))
123
- if delta_datetime .seconds < next .constants .DASHBOARD_STALENESS_IN_SECONDS :
124
- cached_response = json .loads (cached_doc ['data_dict' ])
125
- if 'meta' not in cached_response :
126
- cached_response ['meta' ]= {}
127
- cached_response ['meta' ]['cached' ] = 1
128
- if delta_datetime .seconds / 60 < 1 :
129
- cached_response ['meta' ]['last_dashboard_update' ] = '<1 minute ago'
130
- else :
131
- cached_response ['meta' ]['last_dashboard_update' ] = str (delta_datetime .seconds / 60 )+ ' minutes ago'
132
-
133
- if cached_response == None :
134
- dashboard_string = 'apps.' + app_id + '.dashboard.Dashboard'
135
- dashboard_module = __import__ (dashboard_string , fromlist = ['' ])
136
- dashboard = getattr (dashboard_module , 'MyAppDashboard' )
137
- dashboard = dashboard (db , ell )
138
- stats_method = getattr (dashboard , stat_id )
139
- response ,dt = next .utils .timeit (stats_method )(app ,app .butler ,** args_dict ['args' ]['params' ])
140
-
141
- save_dict = {'exp_uid' :app .exp_uid ,
142
- 'stat_uid' :stat_uid ,
143
- 'timestamp' :next .utils .datetime2str (next .utils .datetimeNow ()),
144
- 'data_dict' :json .dumps (response )}
145
- app .butler .dashboard .set_many (uid = stat_uid ,key_value_dict = save_dict )
146
-
147
- # update the admin timing with the timing of a getModel
148
- if hasattr (app , 'log_entry_durations' ):
149
- app .log_entry_durations ['app_duration' ] = dt
150
- app .log_entry_durations ['duration_enqueued' ] = time_enqueued
151
- app .butler .ell .log (app .app_id + ':ALG-DURATION' , app .log_entry_durations )
100
+ enqueue_datetime = next .utils .str2datetime (enqueue_timestamp )
101
+ dequeue_datetime = next .utils .datetimeNow ()
102
+
103
+ delta_datetime = dequeue_datetime - enqueue_datetime
104
+ time_enqueued = delta_datetime .seconds + delta_datetime .microseconds / 1000000.
105
+ dir , _ = os .path .split (__file__ )
106
+ reference_dict ,errs = verifier .load_doc ('{}/myApp.yaml' .format (app_id , app_id ),"apps/" )
107
+ if len (errs ) > 0 :
108
+ raise Exception ("App YAML format errors: \n {}" .format (str (errs )))
109
+ args_dict = verifier .verify (args_in_json , reference_dict ['getStats' ]['args' ])
110
+ stat_id = args_dict ['args' ].get ('stat_id' ,'none' )
111
+
112
+ stat_args = args_dict ['args' ]
113
+
114
+ hash_object = hashlib .md5 (stat_id + '_' + json .dumps (stat_args ['params' ]))
115
+ stat_uid = hash_object .hexdigest ()
116
+ stat_uid += '_' + exp_uid
117
+
118
+ app = App_Wrapper (app_id , exp_uid , db , ell )
119
+ cached_doc = app .butler .dashboard .get (uid = stat_uid )
120
+ cached_response = None
121
+ if (int (stat_args .get ('force_recompute' ,0 ))== 0 ) and (cached_doc is not None ):
122
+ delta_datetime = (next .utils .datetimeNow () - next .utils .str2datetime (cached_doc ['timestamp' ]))
123
+ if delta_datetime .seconds < next .constants .DASHBOARD_STALENESS_IN_SECONDS :
124
+ cached_response = json .loads (cached_doc ['data_dict' ])
125
+ if 'meta' not in cached_response :
126
+ cached_response ['meta' ]= {}
127
+ cached_response ['meta' ]['cached' ] = 1
128
+ if delta_datetime .seconds / 60 < 1 :
129
+ cached_response ['meta' ]['last_dashboard_update' ] = '<1 minute ago'
152
130
else :
153
- response = cached_response
131
+ cached_response ['meta' ]['last_dashboard_update' ] = str (delta_datetime .seconds / 60 )+ ' minutes ago'
132
+
133
+ if cached_response == None :
134
+ dashboard_string = 'apps.' + app_id + '.dashboard.Dashboard'
135
+ dashboard_module = __import__ (dashboard_string , fromlist = ['' ])
136
+ dashboard = getattr (dashboard_module , 'MyAppDashboard' )
137
+ dashboard = dashboard (db , ell )
138
+ stats_method = getattr (dashboard , stat_id )
139
+ response ,dt = next .utils .timeit (stats_method )(app ,app .butler ,** args_dict ['args' ]['params' ])
140
+
141
+ save_dict = {'exp_uid' :app .exp_uid ,
142
+ 'stat_uid' :stat_uid ,
143
+ 'timestamp' :next .utils .datetime2str (next .utils .datetimeNow ()),
144
+ 'data_dict' :json .dumps (response )}
145
+ app .butler .dashboard .set_many (uid = stat_uid ,key_value_dict = save_dict )
146
+
147
+ # update the admin timing with the timing of a getModel
148
+ if hasattr (app , 'log_entry_durations' ):
149
+ app .log_entry_durations ['app_duration' ] = dt
150
+ app .log_entry_durations ['duration_enqueued' ] = time_enqueued
151
+ app .butler .ell .log (app .app_id + ':ALG-DURATION' , app .log_entry_durations )
152
+ else :
153
+ response = cached_response
154
154
155
- if DEBUG_ON :
156
- next .utils .debug_print ('#### Finished Dashboard %s, time_enqueued=%s, execution_time=%s ####' % (stat_id , time_enqueued , dt ), color = 'white' )
157
- return json .dumps (response ), True , ''
155
+ if DEBUG_ON :
156
+ next .utils .debug_print ('#### Finished Dashboard %s, time_enqueued=%s, execution_time=%s ####' % (stat_id , time_enqueued , dt ), color = 'white' )
157
+ return json .dumps (response ), True , ''
158
158
159
159
160
160
def apply_sync_by_namespace (app_id , exp_uid , alg_id , alg_label , task_name , args , namespace , job_uid , enqueue_timestamp , time_limit ):
161
- enqueue_datetime = next .utils .str2datetime (enqueue_timestamp )
162
- dequeue_datetime = next .utils .datetimeNow ()
163
- delta_datetime = dequeue_datetime - enqueue_datetime
164
- time_enqueued = delta_datetime .seconds + delta_datetime .microseconds / 1000000.
165
-
166
- try :
167
- print '>>>>>>>> Starting namespace:%s, job_uid=%s, time_enqueued=%s <<<<<<<<<' % (namespace ,job_uid ,time_enqueued )
168
- # get stateless app
169
- next_app = next .utils .get_app (app_id , exp_uid , db , ell )
170
- target_manager = next_app .myApp .TargetManager
171
- next_alg = next .utils .get_app_alg (app_id , alg_id )
172
- butler = Butler (app_id , exp_uid , target_manager , db , ell , alg_label , alg_id )
173
- response ,dt = next .utils .timeit (getattr (next_alg , task_name ))(butler , args )
174
- log_entry_durations = { 'exp_uid' :exp_uid ,'alg_label' :alg_label ,'task' :'daemonProcess' ,'duration' :dt }
175
- log_entry_durations .update (butler .algorithms .getDurations ())
176
- log_entry_durations ['app_duration' ] = dt
177
- log_entry_durations ['duration_enqueued' ] = time_enqueued
178
- log_entry_durations ['timestamp' ] = next .utils .datetimeNow ()
179
- ell .log ( app_id + ':ALG-DURATION' , log_entry_durations )
180
- print '########## Finished namespace:%s, job_uid=%s, time_enqueued=%s, execution_time=%s ##########' % (namespace ,job_uid ,time_enqueued ,dt )
181
- return
182
- except Exception , error :
183
- exc_type , exc_value , exc_traceback = sys .exc_info ()
184
- print "tasks Exception: {} {}" .format (error , traceback .format_exc ())
185
- traceback .print_tb (exc_traceback )
186
-
187
- # error = traceback.format_exc()
188
- # log_entry = { 'exp_uid':exp_uid,'task':'daemonProcess','error':error,'timestamp':next.utils.datetimeNow() }
189
- # ell.log( app_id+':APP-EXCEPTION', log_entry )
190
- return None
161
+ enqueue_datetime = next .utils .str2datetime (enqueue_timestamp )
162
+ dequeue_datetime = next .utils .datetimeNow ()
163
+ delta_datetime = dequeue_datetime - enqueue_datetime
164
+ time_enqueued = delta_datetime .seconds + delta_datetime .microseconds / 1000000.
165
+
166
+ try :
167
+ print '>>>>>>>> Starting namespace:%s, job_uid=%s, time_enqueued=%s <<<<<<<<<' % (namespace ,job_uid ,time_enqueued )
168
+
169
+ # get stateless app
170
+ next_app = next .utils .get_app (app_id , exp_uid , db , ell )
171
+ target_manager = next_app .myApp .TargetManager
172
+ next_alg = next .utils .get_app_alg (app_id , alg_id )
173
+ butler = Butler (app_id , exp_uid , target_manager , db , ell , alg_label , alg_id )
174
+
175
+ response ,dt = next .utils .timeit (getattr (next_alg , task_name ))(butler , args )
176
+
177
+ log_entry_durations = { 'exp_uid' :exp_uid ,'alg_label' :alg_label ,'task' :'daemonProcess' ,'duration' :dt }
178
+ log_entry_durations .update (butler .algorithms .getDurations ())
179
+ log_entry_durations ['app_duration' ] = dt
180
+ log_entry_durations ['duration_enqueued' ] = time_enqueued
181
+ log_entry_durations ['timestamp' ] = next .utils .datetimeNow ()
182
+ ell .log ( app_id + ':ALG-DURATION' , log_entry_durations )
183
+
184
+ print '########## Finished namespace:%s, job_uid=%s, time_enqueued=%s, execution_time=%s ##########' % (namespace ,job_uid ,time_enqueued ,dt )
185
+ return
186
+ except Exception , error :
187
+ exc_type , exc_value , exc_traceback = sys .exc_info ()
188
+ print "tasks Exception: {} {}" .format (error , traceback .format_exc ())
189
+ traceback .print_tb (exc_traceback )
190
+
191
+ error = traceback .format_exc ()
192
+ log_entry = {'exp_uid' : exp_uid , 'task' : task_name , 'error' : error , 'timestamp' : next .utils .datetimeNow ()}
193
+ ell .log (app_id + ':APP-EXCEPTION' , log_entry )
194
+
195
+ return None
191
196
192
197
# forces each worker to get its own random seed.
193
198
@celery .signals .worker_process_init .connect ()
@@ -199,7 +204,7 @@ def seed_rng(**_):
199
204
200
205
# If celery isn't off, celery-wrap the functions so they can be called with apply_async
201
206
if next .constants .CELERY_ON :
202
- apply = app .task (apply )
203
- apply_dashboard = app .task (apply_dashboard )
204
- apply_sync_by_namespace = app .task (apply_sync_by_namespace )
207
+ apply = app .task (apply )
208
+ apply_dashboard = app .task (apply_dashboard )
209
+ apply_sync_by_namespace = app .task (apply_sync_by_namespace )
205
210
0 commit comments