@@ -267,14 +267,17 @@ training(cast, In = {sample,Body}, State = #client_statem_state{etsRef = EtsRef}
267
267
gen_statem :cast (WorkerPid , {sample , SourceName ,BatchID ,BatchOfSamples }),
268
268
stats :increment_messages_sent (ClientStatsEts ),
269
269
BatchSize = nerl_tools :calculate_size (BatchOfSamples ),
270
- stats :increment_bytes_sent (ClientStatsEts , BatchSize );
270
+ stats :increment_bytes_sent (ClientStatsEts , BatchSize ),
271
+ perf_stats_memory_usage_update_train ();
271
272
true -> ? LOG_ERROR (" Given worker ~p isn't found in client ~p " ,[WorkerName , ClientName ]) end ,
272
273
{next_state , training , State # client_statem_state {etsRef = EtsRef }};
273
274
274
275
% This action is used for start_stream triggered from a clients' worker and not source
275
276
training (cast , {start_stream , {worker , WorkerName , TargetPair }}, State = # client_statem_state {etsRef = EtsRef }) ->
276
277
ListOfActiveWorkersSources = ets :lookup_element (EtsRef , active_workers_streams , ? DATA_IDX ),
277
278
ets :update_element (EtsRef , active_workers_streams , {? DATA_IDX , ListOfActiveWorkersSources ++ [{WorkerName , TargetPair }]}),
279
+
280
+ perf_stats_memory_usage_update_train (),
278
281
{keep_state , State };
279
282
280
283
% This action is used for start_stream triggered from a source per worker
@@ -287,6 +290,8 @@ training(cast, In = {start_stream , Data}, State = #client_statem_state{etsRef =
287
290
stats :increment_bytes_received (ClientStatsEts , nerl_tools :calculate_size (In )),
288
291
WorkerPid = clientWorkersFunctions :get_worker_pid (EtsRef , WorkerName ),
289
292
gen_statem :cast (WorkerPid , {start_stream , SourceName }),
293
+
294
+ perf_stats_memory_usage_update_train (),
290
295
{keep_state , State };
291
296
292
297
@@ -296,7 +301,9 @@ training(cast, In = {end_stream , Data}, State = #client_statem_state{etsRef = E
296
301
stats :increment_messages_received (ClientStatsEts ),
297
302
stats :increment_bytes_received (ClientStatsEts , nerl_tools :calculate_size (In )),
298
303
WorkerPid = clientWorkersFunctions :get_worker_pid (EtsRef , WorkerName ),
299
- gen_statem :cast (WorkerPid , {end_stream , SourceName }),
304
+ gen_statem :cast (WorkerPid , {end_stream , SourceName }),
305
+
306
+ perf_stats_memory_usage_update_train (),
300
307
{keep_state , State };
301
308
302
309
training (cast , In = {stream_ended , Pair }, State = # client_statem_state {etsRef = EtsRef }) ->
@@ -371,7 +378,8 @@ predict(cast, In = {sample,Body}, State = #client_statem_state{etsRef = EtsRef})
371
378
WorkerPid = clientWorkersFunctions :get_worker_pid (EtsRef , WorkerName ),
372
379
gen_statem :cast (WorkerPid , {sample , SourceName ,BatchID ,BatchOfSamples }),
373
380
stats :increment_messages_sent (ClientStatsEts ),
374
- stats :increment_bytes_sent (ClientStatsEts , nerl_tools :calculate_size (BatchOfSamples ));
381
+ stats :increment_bytes_sent (ClientStatsEts , nerl_tools :calculate_size (BatchOfSamples )),
382
+ perf_stats_memory_usage_update_predict ();
375
383
true -> ? LOG_ERROR (" Given worker ~p isn't found in client ~p " ,[WorkerName , ClientName ])
376
384
end ,
377
385
{next_state , predict , State # client_statem_state {etsRef = EtsRef }};
@@ -380,6 +388,8 @@ predict(cast, In = {sample,Body}, State = #client_statem_state{etsRef = EtsRef})
380
388
predict (cast , {start_stream , {worker , WorkerName , TargetName }}, State = # client_statem_state {etsRef = EtsRef }) ->
381
389
ListOfActiveWorkersSources = ets :lookup_element (EtsRef , active_workers_streams , ? DATA_IDX ),
382
390
ets :update_element (EtsRef , active_workers_streams , {? DATA_IDX , ListOfActiveWorkersSources ++ [{WorkerName , TargetName }]}),
391
+
392
+ perf_stats_memory_usage_update_predict (),
383
393
{keep_state , State };
384
394
385
395
% This action is used for start_stream triggered from a source per worker
@@ -392,6 +402,8 @@ predict(cast, In = {start_stream , Data}, State = #client_statem_state{etsRef =
392
402
stats :increment_bytes_received (ClientStatsEts , nerl_tools :calculate_size (In )),
393
403
WorkerPid = clientWorkersFunctions :get_worker_pid (EtsRef , WorkerName ),
394
404
gen_statem :cast (WorkerPid , {start_stream , SourceName }),
405
+
406
+ perf_stats_memory_usage_update_predict (),
395
407
{keep_state , State };
396
408
397
409
predict (cast , In = {end_stream , Data }, State = # client_statem_state {etsRef = EtsRef }) ->
@@ -400,7 +412,9 @@ predict(cast, In = {end_stream , Data}, State = #client_statem_state{etsRef = Et
400
412
stats :increment_messages_received (ClientStatsEts ),
401
413
stats :increment_bytes_received (ClientStatsEts , nerl_tools :calculate_size (In )),
402
414
WorkerPid = clientWorkersFunctions :get_worker_pid (EtsRef , WorkerName ),
403
- gen_statem :cast (WorkerPid , {end_stream , SourceName }),
415
+ gen_statem :cast (WorkerPid , {end_stream , SourceName }),
416
+
417
+ perf_stats_memory_usage_update_predict (),
404
418
{keep_state , State };
405
419
406
420
predict (cast , In = {stream_ended , Pair }, State = # client_statem_state {etsRef = EtsRef }) ->
@@ -547,4 +561,18 @@ handle_w2w_msg(EtsRef, FromWorker, ToWorker, Data) ->
547
561
nerl_tools :http_router_request (RouterHost , RouterPort , [DestClient ], atom_to_list (worker_to_worker_msg ), MessageBody ),
548
562
stats :increment_messages_sent (ClientStatsEts ),
549
563
stats :increment_bytes_sent (ClientStatsEts , nerl_tools :calculate_size (MessageBody ))
550
- end .
564
+ end .
565
+
566
+ perf_stats_memory_usage_update_train () ->
567
+ % memory usage update
568
+ PerformanceStatsEts = get (performance_stats_ets ),
569
+ MemoryUsageValue = stats :query_memory_usage (),
570
+ stats :update_memory_peak_usage_train (PerformanceStatsEts , MemoryUsageValue ),
571
+ stats :update_memory_train_ema_usage (PerformanceStatsEts , MemoryUsageValue ).
572
+
573
+ perf_stats_memory_usage_update_predict () ->
574
+ % memory usage update
575
+ PerformanceStatsEts = get (performance_stats_ets ),
576
+ MemoryUsageValue = stats :query_memory_usage (),
577
+ stats :update_memory_peak_usage_predict (PerformanceStatsEts , MemoryUsageValue ),
578
+ stats :update_memory_predict_ema_usage (PerformanceStatsEts , MemoryUsageValue ).
0 commit comments