@@ -75,7 +75,10 @@ init({MyName,NerlnetGraph, ClientWorkers , WorkerShaMap , WorkerToClientMap , Sh
75
75
EtsRef = ets :new (client_data , [set , public ]), % % client_data is responsible for functional attributes
76
76
EtsStats = ets :new (ets_stats , [set ]), % % ets_stats is responsible for holding all the ets stats (client + workers)
77
77
ClientStatsEts = stats :generate_stats_ets (), % % client stats ets inside ets_stats
78
+ % TODO add flag to control generate performance stats ets
79
+ ClientPerformanceEts = stats :generate_performance_stats_ets (), % % client performance stats ets inside ets_stats
78
80
ets :insert (EtsStats , {MyName , ClientStatsEts }),
81
+ ets :insert (EtsStats , {performance_stats , ClientPerformanceEts }),
79
82
put (ets_stats , EtsStats ),
80
83
ets :insert (EtsRef , {workerToClient , WorkerToClientMap }), % All workers in the network (map to their client)
81
84
ets :insert (EtsRef , {workersNames , ClientWorkers }), % All THIS Client's workers
@@ -105,6 +108,7 @@ init({MyName,NerlnetGraph, ClientWorkers , WorkerShaMap , WorkerToClientMap , Sh
105
108
put (client_data , EtsRef ),
106
109
put (ets_stats , EtsStats ),
107
110
put (client_stats_ets , ClientStatsEts ),
111
+ put (performance_stats_ets , ClientPerformanceEts ),
108
112
put (my_pid , self ()),
109
113
110
114
{ok , idle , # client_statem_state {myName = MyName , etsRef = EtsRef }}.
@@ -130,8 +134,7 @@ waitforWorkers(cast, In = {stateChange,WorkerName}, State = #client_statem_state
130
134
stats :increment_messages_sent (ClientStatsEts ),
131
135
? LOG_INFO (" Client ~p and its workers are ready~n " ,[MyName ]),
132
136
{next_state , NextState , State # client_statem_state {waitforWorkers = []}};
133
- _ -> % io:format("Client ~p is waiting for workers ~p~n",[MyName,NewWaitforWorkers]),
134
- {next_state , waitforWorkers , State # client_statem_state {waitforWorkers = NewWaitforWorkers }}
137
+ _ -> {next_state , waitforWorkers , State # client_statem_state {waitforWorkers = NewWaitforWorkers }}
135
138
end ;
136
139
137
140
waitforWorkers (cast , In = {worker_to_worker_msg , FromWorker , ToWorker , Data }, State = # client_statem_state {etsRef = EtsRef }) ->
@@ -173,8 +176,10 @@ idle(cast, _In = {statistics}, State = #client_statem_state{ myName = MyName, et
173
176
ClientStatsEncStr = stats :encode_ets_to_http_bin_str (ClientStatsEts ),
174
177
stats :increment_messages_received (ClientStatsEts ),
175
178
ListStatsEts = ets :tab2list (EtsStats ) -- [{MyName , ClientStatsEts }],
179
+ PerformenceStatsEts = get (performance_stats_ets ),
180
+ ClientPerformenceStatsEncStr = ? PERF_STATS_SEPERATOR ++ stats :encode_ets_to_http_bin_str (PerformenceStatsEts ) ++ ? PERF_STATS_SEPERATOR ,
176
181
WorkersStatsEncStr = create_encoded_stats_str (ListStatsEts ),
177
- DataToSend = ClientStatsEncStr ++ WorkersStatsEncStr ,
182
+ DataToSend = ClientStatsEncStr ++ ClientPerformenceStatsEncStr ++ WorkersStatsEncStr ,
178
183
StatsBody = {MyName , DataToSend },
179
184
{RouterHost ,RouterPort } = ets :lookup_element (EtsRef , my_router , ? DATA_IDX ),
180
185
nerl_tools :http_router_request (RouterHost , RouterPort , [? MAIN_SERVER_ATOM ], atom_to_list (statistics ), StatsBody ),
@@ -184,19 +189,27 @@ idle(cast, _In = {statistics}, State = #client_statem_state{ myName = MyName, et
184
189
% Main Server triggers this state
185
190
idle (cast , In = {training }, State = # client_statem_state {myName = _MyName , etsRef = EtsRef }) ->
186
191
ClientStatsEts = get (client_stats_ets ),
192
+ PerformanceStatsEts = get (performance_stats_ets ),
187
193
stats :increment_messages_received (ClientStatsEts ),
188
194
stats :increment_bytes_received (ClientStatsEts , nerl_tools :calculate_size (In )),
189
195
MessageToCast = {training },
190
196
cast_message_to_workers (EtsRef , MessageToCast ),
191
197
ets :update_element (EtsRef , all_workers_done , {? DATA_IDX , false }),
198
+ stats :performance_stats_reset (PerformanceStatsEts ),
199
+ stats :tic (ClientStatsEts , time_train_total ),
200
+ stats :reset_query_cpu_util_cores (),
192
201
{next_state , waitforWorkers , State # client_statem_state {waitforWorkers = clientWorkersFunctions :get_workers_names (EtsRef ), nextState = training }};
193
202
194
203
idle (cast , In = {predict }, State = # client_statem_state {etsRef = EtsRef }) ->
195
204
ClientStatsEts = get (client_stats_ets ),
205
+ PerformanceStatsEts = get (performance_stats_ets ),
196
206
stats :increment_messages_received (ClientStatsEts ),
197
207
stats :increment_bytes_received (ClientStatsEts , nerl_tools :calculate_size (In )),
198
208
MessageToCast = {predict },
199
209
cast_message_to_workers (EtsRef , MessageToCast ),
210
+ stats :performance_stats_reset (PerformanceStatsEts ),
211
+ stats :tic (ClientStatsEts , time_predict_total ),
212
+ stats :reset_query_cpu_util_cores (),
200
213
{next_state , waitforWorkers , State # client_statem_state {waitforWorkers = clientWorkersFunctions :get_workers_names (EtsRef ),nextState = predict }};
201
214
202
215
idle (cast , EventContent , State = # client_statem_state {etsRef = EtsRef , myName = MyName }) ->
@@ -302,6 +315,7 @@ training(cast, In = {stream_ended , Pair}, State = #client_statem_state{etsRef =
302
315
% From MainServer
303
316
training (cast , In = {idle }, State = # client_statem_state {myName = MyName , etsRef = EtsRef }) ->
304
317
ClientStatsEts = get (client_stats_ets ),
318
+ ClientPerformanceEts = get (performance_stats_ets ),
305
319
stats :increment_messages_received (ClientStatsEts ),
306
320
stats :increment_bytes_received (ClientStatsEts , nerl_tools :calculate_size (In )),
307
321
MessageToCast = {idle },
@@ -310,6 +324,9 @@ training(cast, In = {idle}, State = #client_statem_state{myName = MyName, etsRef
310
324
true -> cast_message_to_workers (EtsRef , MessageToCast ),
311
325
Workers = clientWorkersFunctions :get_workers_names (EtsRef ),
312
326
? LOG_INFO (" ~p sent idle to workers: ~p , waiting for confirmation...~n " ,[MyName , ets :lookup_element (EtsRef , workersNames , ? DATA_IDX )]),
327
+ Elapsed = stats :toc (ClientStatsEts , time_train_total ),
328
+ stats :increment_time_train_total (ClientPerformanceEts , Elapsed ),
329
+ stats :update_cpu_util_per_core (ClientPerformanceEts , train ), % Update CPU utilization for training phase
313
330
{next_state , waitforWorkers , State # client_statem_state {etsRef = EtsRef , waitforWorkers = Workers , nextState = idle }};
314
331
false -> MyPid = get (my_pid ),
315
332
spawn (fun () -> timer :sleep (10 ), gen_statem :cast (MyPid , {idle }) end ), % Trigger this action until all workers are done
@@ -323,9 +340,11 @@ training(cast, _In = {predict}, State = #client_statem_state{myName = MyName, et
323
340
324
341
training (cast , In = {loss , WorkerName ,SourceName ,LossTensor ,TimeNIF , WorkerToken ,BatchID ,BatchTS }, State = # client_statem_state {myName = MyName ,etsRef = EtsRef }) ->
325
342
ClientStatsEts = get (client_stats_ets ),
343
+ ClientPerformanceEts = get (performance_stats_ets ),
326
344
stats :increment_messages_received (ClientStatsEts ),
327
345
stats :increment_bytes_received (ClientStatsEts , nerl_tools :calculate_size (In )),
328
346
{RouterHost ,RouterPort } = ets :lookup_element (EtsRef , my_router , ? DATA_IDX ),
347
+ stats :increment_time_train_active (ClientPerformanceEts , trunc (TimeNIF )), % in microseconds
329
348
MessageBody = {WorkerName , SourceName , LossTensor , TimeNIF , WorkerToken , BatchID , BatchTS },
330
349
nerl_tools :http_router_request (RouterHost , RouterPort , [? MAIN_SERVER_ATOM ], atom_to_list (lossFunction ), MessageBody ), % % Change lossFunction atom to lossValue
331
350
stats :increment_messages_sent (ClientStatsEts ),
@@ -401,6 +420,7 @@ predict(cast, In = {stream_ended , Pair}, State = #client_statem_state{etsRef =
401
420
% From MainServer
402
421
predict (cast , In = {idle }, State = # client_statem_state {myName = MyName , etsRef = EtsRef }) ->
403
422
ClientStatsEts = get (client_stats_ets ),
423
+ ClientPerformanceEts = get (performance_stats_ets ),
404
424
stats :increment_messages_received (ClientStatsEts ),
405
425
stats :increment_bytes_received (ClientStatsEts , nerl_tools :calculate_size (In )),
406
426
MessageToCast = {idle },
@@ -409,13 +429,17 @@ predict(cast, In = {idle}, State = #client_statem_state{myName = MyName, etsRef
409
429
true -> cast_message_to_workers (EtsRef , MessageToCast ),
410
430
Workers = clientWorkersFunctions :get_workers_names (EtsRef ),
411
431
? LOG_INFO (" ~p sent idle to workers: ~p , waiting for confirmation...~n " ,[MyName , ets :lookup_element (EtsRef , workersNames , ? DATA_IDX )]),
432
+ Elapsed = stats :toc (ClientStatsEts , time_predict_total ),
433
+ stats :increment_time_predict_total (ClientPerformanceEts , Elapsed ),
434
+ stats :update_cpu_util_per_core (ClientPerformanceEts , predict ), % Update CPU utilization for predict phase
412
435
{next_state , waitforWorkers , State # client_statem_state {etsRef = EtsRef , waitforWorkers = Workers , nextState = idle }};
413
436
false -> gen_statem :cast (get (my_pid ) , {idle }), % Trigger this action until all workers are done
414
437
{keep_state , State }
415
438
end ;
416
439
417
440
predict (cast , In = {predictRes ,WorkerName , SourceName ,{PredictNerlTensor , NetlTensorType } , TimeTook , WorkerToken , BatchID , BatchTS }, State = # client_statem_state {myName = _MyName , etsRef = EtsRef }) ->
418
441
ClientStatsEts = get (client_stats_ets ),
442
+ ClientPerformanceEts = get (performance_stats_ets ),
419
443
stats :increment_messages_received (ClientStatsEts ),
420
444
stats :increment_bytes_received (ClientStatsEts , nerl_tools :calculate_size (In )),
421
445
@@ -425,6 +449,7 @@ predict(cast, In = {predictRes,WorkerName, SourceName ,{PredictNerlTensor, NetlT
425
449
426
450
stats :increment_messages_sent (ClientStatsEts ),
427
451
stats :increment_bytes_sent (ClientStatsEts , nerl_tools :calculate_size (MessageBody )),
452
+ stats :increment_time_predict_active (ClientPerformanceEts , trunc (TimeTook )), % in microseconds
428
453
{next_state , predict , State # client_statem_state {etsRef = EtsRef }};
429
454
430
455
% TODO from predict directly to training?!?!?
0 commit comments