diff --git a/inputJsonsFiles/vision/conn_paper_mnist_3d_1s_4r_1c_1w.json b/inputJsonsFiles/vision/conn_paper_mnist_3d_1s_4r_1c_1w.json new file mode 100644 index 00000000..354f2e72 --- /dev/null +++ b/inputJsonsFiles/vision/conn_paper_mnist_3d_1s_4r_1c_1w.json @@ -0,0 +1,9 @@ +{ + "connectionsMap": + { + "r1":["mainServer", "c1", "s1", "r2"], + "r2":["r3"], + "r3":["r4"], + "r4":["r1"] + } +} diff --git a/inputJsonsFiles/vision/dc_paper_mnist_3d_1s_4r_1c_1w.json b/inputJsonsFiles/vision/dc_paper_mnist_3d_1s_4r_1c_1w.json new file mode 100644 index 00000000..ce0fd271 --- /dev/null +++ b/inputJsonsFiles/vision/dc_paper_mnist_3d_1s_4r_1c_1w.json @@ -0,0 +1,112 @@ +{ + "nerlnetSettings": { + "frequency": "100", + "batchSize": "25" + }, + "mainServer": { + "port": "8080", + "args": "" + }, + "apiServer": { + "port": "8081", + "args": "" + }, + "devices": [ + { + "name": "NerlControllerG", + "ipv4": "172.31.91.176", + "entities": "mainServer,apiServer,r1" + }, + { + "name": "NerlControllerD", + "ipv4": "172.31.93.88", + "entities": "s1,r2,r3" + }, + { + "name": "Nerl-Powerful", + "ipv4": "172.31.44.250", + "entities": "c1,r4" + } + ], + "routers": [ + { + "name": "r1", + "port": "8090", + "policy": "0" + }, + { + "name": "r2", + "port": "8091", + "policy": "0" + }, + { + "name": "r3", + "port": "8092", + "policy": "0" + }, + { + "name": "r4", + "port": "8093", + "policy": "0" + } + ], + "sources": [ + { + "name": "s1", + "port": "8086", + "frequency": "25", + "policy": "0", + "epochs": "4", + "type": "0" + } + ], + "clients": [ + { + "name": "c1", + "port": "8082", + "workers": "w1" + } + ], + "workers": [ + { + "name": "w1", + "model_sha": "9c5f1261068be7be96487a2cae282aa22e8c1cb482a5bf8d557bc8e1e2b6fef0" + } + ], + "model_sha": { + "9c5f1261068be7be96487a2cae282aa22e8c1cb482a5bf8d557bc8e1e2b6fef0": { + "modelType": "0", + "_doc_modelType": " nn:0 | approximation:1 | classification:2 | forecasting:3 | image_classification:4 | text_classification:5 | text_generation:6 | auto_association:7 | autoencoder:8 | ae_classifier:9 |", + "modelArgs": "", + "_doc_modelArgs": "Extra arguments to model", + "layersSizes": "28x28x1k5x5x1x6p0s1t1,28x28x6k2x2p0s2,14x14x6k4x4x6x12p0s1t0,1,32,10", + "_doc_layersSizes": "List of postive integers [L0, L1, ..., LN]", + "layerTypesList": "2,4,2,9,3,5", + "_doc_LayerTypes": " Default:0 | Scaling:1 | Conv:2 | Perceptron:3 | Pooling:4 | Probabilistic:5 | LSTM:6 | Reccurrent:7 | Unscaling:8 | Flatten:9 | Bounding:10 |", + "layers_functions": "6,2,6,1,6,4", + "_doc_layers_functions_activation": " Threshold:1 | Sign:2 | Logistic:3 | Tanh:4 | Linear:5 | ReLU:6 | eLU:7 | SeLU:8 | Soft-plus:9 | Soft-sign:10 | Hard-sigmoid:11 |", + "_doc_layer_functions_pooling": " none:1 | Max:2 | Avg:3 |", + "_doc_layer_functions_probabilistic": " Binary:1 | Logistic:2 | Competitive:3 | Softmax:4 |", + "_doc_layer_functions_scaler": " none:1 | MinMax:2 | MeanStd:3 | STD:4 | Log:5 |", + "lossMethod": "2", + "lossArgs": "", + "_doc_lossMethod": " SSE:1 | MSE:2 | NSE:3 | MinkowskiE:4 | WSE:5 | CEE:6 |", + "lr": "0.01", + "_doc_lr": "Positve float", + "epochs": "1", + "_doc_epochs": "Positve Integer", + "optimizer": "5", + "_doc_optimizer": " GD:0 | CGD:1 | SGD:2 | QuasiNeuton:3 | LVM:4 | ADAM:5 |", + "optimizerArgs": "none", + "_doc_optimizerArgs": "String", + "infraType": "0", + "_doc_infraType": " opennn:0 | wolfengine:1 |", + "distributedSystemType": "0", + "_doc_distributedSystemType": " none:0 | FedClientAvg:1 | FedServerAvg:2 | FedClientWeightedAvgClassification:3 | FedServerWeightedAvgClassification:4 | FedClientAE:5 | FedServerAE:6 | tiles:7 |", + "distributedSystemArgs": "none", + "_doc_distributedSystemArgs": "String", + "distributedSystemToken": "none", + "_doc_distributedSystemToken": "Token that associates distributed group of workers and parameter-server" + } + } +} \ No newline at end of file diff --git a/inputJsonsFiles/vision/exp_paper_mnist_3d_1s_4r_1c_1w.json b/inputJsonsFiles/vision/exp_paper_mnist_3d_1s_4r_1c_1w.json new file mode 100644 index 00000000..3f5aa5ae --- /dev/null +++ b/inputJsonsFiles/vision/exp_paper_mnist_3d_1s_4r_1c_1w.json @@ -0,0 +1,56 @@ +{ + "experimentName": "mnist_rr", + "experimentType": "classification", + "batchSize": 25, + "csvFilePath": "/tmp/nerlnet/data/NerlnetData-master/nerlnet/mnist_norm/mnist_train_255_norm.csv", + "numOfFeatures": "784", + "numOfLabels": "10", + "headersNames": "0,1,2,3,4,5,6,7,8,9", + "Phases": + [ + { + "phaseName": "training_phase1", + "phaseType": "training", + "sourcePieces": + [ + { + "sourceName": "s1", + "startingSample": "0", + "numOfBatches": "400", + "workers": "w1", + "nerltensorType": "float" + } + ] + }, + { + "phaseName": "training_phase2", + "phaseType": "training", + "sourcePieces": + [ + { + "sourceName": "s1", + "startingSample": "20000", + "numOfBatches": "400", + "workers": "w1", + "nerltensorType": "float" + } + ] + }, + { + "phaseName": "prediction_phase", + "phaseType": "prediction", + "sourcePieces": + [ + { + "sourceName": "s1", + "startingSample": "50000", + "numOfBatches": "400", + "workers": "w1", + "nerltensorType": "float" + } + ] + } + ] + } + + \ No newline at end of file diff --git a/src_erl/NerlnetApp/src/Client/clientStatem.erl b/src_erl/NerlnetApp/src/Client/clientStatem.erl index 4e69eb33..f20b8490 100644 --- a/src_erl/NerlnetApp/src/Client/clientStatem.erl +++ b/src_erl/NerlnetApp/src/Client/clientStatem.erl @@ -196,6 +196,7 @@ idle(cast, In = {training}, State = #client_statem_state{myName = _MyName, etsRe cast_message_to_workers(EtsRef, MessageToCast), ets:update_element(EtsRef, all_workers_done, {?DATA_IDX, false}), stats:performance_stats_reset(PerformanceStatsEts), + stats:communication_stats_reset(ClientStatsEts), stats:tic(ClientStatsEts, time_train_total), stats:reset_query_cpu_util_cores(), {next_state, waitforWorkers, State#client_statem_state{waitforWorkers = clientWorkersFunctions:get_workers_names(EtsRef), nextState = training}}; @@ -208,6 +209,7 @@ idle(cast, In = {predict}, State = #client_statem_state{etsRef = EtsRef}) -> MessageToCast = {predict}, cast_message_to_workers(EtsRef, MessageToCast), stats:performance_stats_reset(PerformanceStatsEts), + stats:communication_stats_reset(ClientStatsEts), stats:tic(ClientStatsEts, time_predict_total), stats:reset_query_cpu_util_cores(), {next_state, waitforWorkers, State#client_statem_state{waitforWorkers = clientWorkersFunctions:get_workers_names(EtsRef),nextState = predict}}; diff --git a/src_erl/NerlnetApp/src/Stats/stats.erl b/src_erl/NerlnetApp/src/Stats/stats.erl index 94f9118c..91652e41 100644 --- a/src_erl/NerlnetApp/src/Stats/stats.erl +++ b/src_erl/NerlnetApp/src/Stats/stats.erl @@ -17,7 +17,7 @@ % performance stats -export([generate_performance_stats_ets/0]). -export([start_os_mon/0]). --export([performance_stats_reset/1]). +-export([performance_stats_reset/1, communication_stats_reset/1]). % perofmance stats getters/setters -export([get_time_train_active/1, increment_time_train_active/2]). -export([get_time_train_total/1, increment_time_train_total/2]). @@ -237,6 +237,19 @@ get_bad_messages(StatsEts) -> increment_bad_messages(StatsEts) -> ets:update_counter(StatsEts, ?STATS_ATOM_BAD_MSG, 1). +communication_stats_reset(ComStatsEts) -> + % Reset all communication stats to zero + ets:update_element(ComStatsEts, ?STATS_ATOM_MSG_RECV, {?STATS_KEYVAL_VAL_IDX, 0}), + ets:update_element(ComStatsEts, ?STATS_ATOM_MSG_SENT, {?STATS_KEYVAL_VAL_IDX, 0}), + ets:update_element(ComStatsEts, ?STATS_ATOM_MSG_DROP, {?STATS_KEYVAL_VAL_IDX, 0}), + ets:update_element(ComStatsEts, ?STATS_ATOM_BYTES_RECV, {?STATS_KEYVAL_VAL_IDX, 0}), + ets:update_element(ComStatsEts, ?STATS_ATOM_BYTES_SENT, {?STATS_KEYVAL_VAL_IDX, 0}), + ets:update_element(ComStatsEts, ?STATS_ATOM_BAD_MSG, {?STATS_KEYVAL_VAL_IDX, 0}), + ets:update_element(ComStatsEts, ?STATS_ATOM_BATCHES_RECEIVED, {?STATS_KEYVAL_VAL_IDX, 0}), % related with client only + ets:update_element(ComStatsEts, ?STATS_ATOM_BATCHES_DROPPED, {?STATS_KEYVAL_VAL_IDX, 0}), % related with client only + ets:update_element(ComStatsEts, ?STATS_ATOM_BATCHES_SENT, {?STATS_KEYVAL_VAL_IDX, 0}), % related with source only + ets:update_element(ComStatsEts, ?STATS_ATOM_ACTUAL_FREQUENCY, {?STATS_KEYVAL_VAL_IDX, 0}), % related with source only + ok. performance_stats_reset(PerfStatsEts) -> % Reset all performance stats to zero diff --git a/src_erl/NerlnetApp/src/Stats/stats.hrl b/src_erl/NerlnetApp/src/Stats/stats.hrl index 0c572f51..928cb52b 100644 --- a/src_erl/NerlnetApp/src/Stats/stats.hrl +++ b/src_erl/NerlnetApp/src/Stats/stats.hrl @@ -7,6 +7,7 @@ -define(STATS_ATOM_BATCHES_RECEIVED, batches_received). -define(STATS_ATOM_BATCHES_DROPPED, batches_dropped). -define(STATS_ATOM_BATCHES_SENT, batches_sent). +-define(STATS_ATOM_ACTUAL_FREQUENCY, actual_frequency). -define(STATS_KEYVAL_KEY_IDX, 1). -define(STATS_KEYVAL_VAL_IDX, 2). diff --git a/src_py/apiServer/apiServer.py b/src_py/apiServer/apiServer.py index 9f87bd28..984a9395 100644 --- a/src_py/apiServer/apiServer.py +++ b/src_py/apiServer/apiServer.py @@ -215,6 +215,107 @@ def experiment_phase_is_valid(self): current_exp_flow = globe.experiment_focused_on return current_exp_flow.current_exp_phase_index < len(current_exp_flow.exp_phase_list) + def run_all_experiment_phases(self): + """ + Runs all experiment phases sequentially from the experiment JSON file. + + This function iterates through all phases defined in the experiment flow JSON, + executing each phase in order (training phases followed by prediction phases). + It collects statistics for each completed phase and returns them as a list. + + The function will: + 1. Print information about the current phase being executed (name and type) + 2. Execute each phase using run_current_experiment_phase() + 3. Generate and collect Stats objects for each completed phase + 4. Move to the next phase using next_experiment_phase() + 5. Continue until all phases are completed + + Returns: + list: A list of Stats objects, one for each successfully completed phase. + Each Stats object contains performance metrics, communication statistics, + and other phase-specific data. + + Raises: + AssertionError: If no valid experiment is currently focused or if required + setup (initialization, send_jsons_to_devices) is not completed. + + Example: + # After initialization and sending JSONs to devices + api_server.send_jsons_to_devices() + all_stats = api_server.run_all_experiment_phases() + + # Process results + for i, stats in enumerate(all_stats): + print(f"Phase {i+1}: {stats.get_name()} ({stats.get_phase()})") + if stats.get_phase() == "training": + loss_data = stats.get_loss_ts() + elif stats.get_phase() == "prediction": + confusion_matrices = stats.get_confusion_matrices() + + Note: + - Requires that initialization() and send_jsons_to_devices() have been called first + - All NerlNet devices must be running and accessible + - The experiment JSON must contain valid phase definitions + """ + # Ensure we have a valid experiment focused + if self.current_exp is None: + raise AssertionError("No experiment is currently focused. Call initialization() and experiment_focused_on() first.") + + # Ensure JSONs have been sent to devices + send_jsons_event = self.apiserver_event_sync.get_event_status(EventSync.SEND_JSONS) + if send_jsons_event != EventSync.DONE: + raise AssertionError("JSONs must be sent to devices first. Call send_jsons_to_devices() before running phases.") + + all_phases_stats = [] + + # Get the initial phase information + current_exp_flow = self.current_exp + total_phases = len(current_exp_flow.exp_phase_list) + + if total_phases == 0: + LOG_WARNING("No experiment phases found in the experiment flow") + return all_phases_stats + + LOG_INFO(f"Starting to run all {total_phases} experiment phases for experiment: {current_exp_flow.get_exp_name()}") + + # Run phases sequentially + phase_count = 1 + while True: + if not self.next_expertiment_phase_exist: + LOG_WARNING("No valid experiment phase available to run") + break + + current_phase = current_exp_flow.get_current_experiment_phase() + phase_name = current_phase.get_name() + phase_type = current_phase.get_phase_type() + + LOG_INFO(f"Running phase {phase_count}/{total_phases}: '{phase_name}' (Type: {phase_type})") + + try: + # Run the current phase + self.run_current_experiment_phase() + + # Generate stats for the completed phase + phase_stats = current_exp_flow.generate_stats(current_phase) + all_phases_stats.append(phase_stats) + + LOG_INFO(f"Completed phase {phase_count}/{total_phases}: '{phase_name}' ({phase_type})") + + except Exception as e: + LOG_ERROR(f"Error running phase {phase_count}/{total_phases} '{phase_name}': {str(e)}") + # Continue with next phase instead of stopping completely + + # Move to next phase + next_phase_type = self.next_experiment_phase() + if next_phase_type is None: + LOG_INFO("All experiment phases completed successfully") + break + + phase_count += 1 + + LOG_INFO(f"Finished running all experiment phases. Total phases executed: {len(all_phases_stats)}") + return all_phases_stats + def list_datasets(self): with open(HF_DATA_REPO_PATHS_JSON) as file: repo_ids = json.load(file) diff --git a/src_py/apiServer/apiServerHelp.py b/src_py/apiServer/apiServerHelp.py index 156f4b2c..2398fbc9 100644 --- a/src_py/apiServer/apiServerHelp.py +++ b/src_py/apiServer/apiServerHelp.py @@ -41,6 +41,7 @@ -experiment_phase_is_valid() returns True if there are more experiment phases to run -run_current_experiment_phase() runs the current experiment phase -next_experiment_phase() moves to the next experiment phase and returns the phase type +-run_all_experiment_phases() runs all experiment phases sequentially and returns a list of Stats objects for each phase ======== Retrieving statistics ====== -get_experiment_flow(experiment_name).generate_stats() returns statistics object (E.g., assigned to StatsInst) class for the current experiment phase diff --git a/src_py/apiServer/exp_summary.py b/src_py/apiServer/exp_summary.py new file mode 100644 index 00000000..ac847d87 --- /dev/null +++ b/src_py/apiServer/exp_summary.py @@ -0,0 +1,545 @@ +from stats import Stats +from typing import List +import pandas as pd +import numpy as np +import os +from collections import OrderedDict +from definitions import PHASE_PREDICTION_STR, PHASE_TRAINING_STR + +class ExperimentSummary: + def __init__(self, stats_list: List[Stats]): + self.stats_list = stats_list + assert stats_list, "Stats list cannot be empty" + self.workers = self.stats_list[0].workers_list + + def summary_headers(self): + headers_ml_comm = ["Batch Size", "Frequency", "Num Of Sources", "Batches/Second", "Min. Accuracy", "Avg. Accuracy", "Min. Precision", "Avg. Precision", "Min. F1-Score", "Avg. F1-Score", "WX % Dropped Training", "WX % Dropped Prediction", "WX # Dropped Training", "WX # Dropped Prediction", "WX Total Batches Training", "WX Total Batches Prediction"] + headers_perf = ["WX Accumulated Time Train Active", "WX Accumulated Time Train Total", "WX Accumulated Time Predict Active", "WX Accumulated Time Predict Total", "WX Memory Train EMA Usage", "WX Memory Predict EMA Usage", "WX Memory Train Peak Usage", "WX Memory Predict Peak Usage", "WX Num Of Cores", "WX CPU Train Util Core X", "WX CPU Predict Util Core X"] + all_headers = headers_ml_comm + headers_perf + return all_headers + + def expand_headers_workers(self): + """ + Expand headers to replace WX with worker names and Core X with specific core numbers + """ + all_headers = self.summary_headers() + new_headers_list = [] + + # Get the number of cores from performance stats (assuming first stats object has client data) + if self.stats_list: + perf_stats = self.stats_list[0].get_performance_stats_clients() + num_cores = 0 + if perf_stats: + first_client = list(perf_stats.values())[0] + num_cores = first_client.get('num_of_cores', 0) + + for header in all_headers: + if "WX" in header: + for worker_name in self.workers: + if "Core X" in header: + # Handle CPU utilization headers with specific cores + for core_num in range(num_cores): + worker_core_header = header.replace("WX", worker_name).replace("Core X", f"Core {core_num}") + new_headers_list.append(worker_core_header) + else: + # Regular worker-specific headers + worker_header = header.replace("WX", worker_name) + new_headers_list.append(worker_header) + else: + new_headers_list.append(header) + return new_headers_list + + def calculate_samples_per_second(self, stats_obj): + """ + Calculate samples per second based on frequency and batch size + """ + freq = stats_obj.freq + batch_size = stats_obj.batch_size + return freq * batch_size if freq and batch_size else 0 + + def get_model_performance_aggregates(self, stats_obj): + """ + Get min/avg accuracy, precision, and F1-score from model performance stats + """ + try: + # First we need confusion matrices to get model performance + confusion_matrices_source, confusion_matrices_worker = stats_obj.get_confusion_matrices() + model_perf_df = stats_obj.get_model_performence_stats(confusion_matrices_worker) + + if not model_perf_df.empty: + accuracies = model_perf_df['Accuracy'].values + precisions = model_perf_df['Precision'].values + f1_scores = model_perf_df['F1'].values + + return { + 'min_accuracy': np.min(accuracies) if len(accuracies) > 0 else 0, + 'avg_accuracy': np.mean(accuracies) if len(accuracies) > 0 else 0, + 'min_precision': np.min(precisions) if len(precisions) > 0 else 0, + 'avg_precision': np.mean(precisions) if len(precisions) > 0 else 0, + 'min_f1': np.min(f1_scores) if len(f1_scores) > 0 else 0, + 'avg_f1': np.mean(f1_scores) if len(f1_scores) > 0 else 0, + } + else: + return { + 'min_accuracy': 0, 'avg_accuracy': 0, + 'min_precision': 0, 'avg_precision': 0, + 'min_f1': 0, 'avg_f1': 0, + } + except Exception as e: + # Return zeros if model performance can't be calculated (e.g., for training phase) + return { + 'min_accuracy': 0, 'avg_accuracy': 0, + 'min_precision': 0, 'avg_precision': 0, + 'min_f1': 0, 'avg_f1': 0, + } + + def get_training_aggregates(self, prediction_stats_obj, debug=False): + """ + Get aggregated training statistics from all training phase Stats objects that belong to the same experiment + """ + prediction_name = prediction_stats_obj.get_name() + training_stats_objects = [] + + if debug: + print(f"\n=== DEBUG: get_training_aggregates for experiment: {prediction_name} ===") + print(f"Total stats objects in list: {len(self.stats_list)}") + for i, obj in enumerate(self.stats_list): + print(f" Stats object {i}: name='{obj.get_name()}', phase='{obj.get_phase()}'") + + # Strategy 1: Try exact name match first + for stats_obj in self.stats_list: + if (stats_obj.get_phase() == PHASE_TRAINING_STR and + stats_obj.get_name() == prediction_name): + training_stats_objects.append(stats_obj) + if debug: + print(f" → Found exact name match: {stats_obj.get_name()} - {stats_obj.get_phase()}") + + # Strategy 2: If no exact match, try to find training phases with similar base names + if not training_stats_objects: + if debug: + print(f"No exact name matches found. Trying pattern matching...") + + # Extract base name patterns (remove common suffixes/prefixes) + prediction_base = prediction_name.replace('prediction_', '').replace('_prediction', '').replace('pred_', '').replace('_pred', '') + + for stats_obj in self.stats_list: + if stats_obj.get_phase() == PHASE_TRAINING_STR: + training_name = stats_obj.get_name() + training_base = training_name.replace('training_', '').replace('_training', '').replace('train_', '').replace('_train', '').replace('phase1', '').replace('phase2', '').replace('phase3', '').replace('_phase', '').replace('phase_', '') + + if debug: + print(f" Comparing: prediction_base='{prediction_base}' vs training_base='{training_base}' (from '{training_name}')") + + # Check if they share the same base name or if training name contains prediction base + if (prediction_base and training_base and + (prediction_base in training_base or training_base in prediction_base or + prediction_base == training_base)): + training_stats_objects.append(stats_obj) + if debug: + print(f" → Found pattern match: {stats_obj.get_name()} - {stats_obj.get_phase()}") + + # Strategy 3: If still no matches, include ALL training phases (fallback) + if not training_stats_objects: + if debug: + print(f"No pattern matches found. Using all training phases as fallback...") + + for stats_obj in self.stats_list: + if stats_obj.get_phase() == PHASE_TRAINING_STR: + training_stats_objects.append(stats_obj) + if debug: + print(f" → Using training phase: {stats_obj.get_name()} - {stats_obj.get_phase()}") + + if not training_stats_objects: + if debug: + print(f"No training phase Stats objects found for experiment: {prediction_name}") + return {} + + if debug: + print(f"Final result: Found {len(training_stats_objects)} training phase Stats objects for experiment: {prediction_name}") + for obj in training_stats_objects: + print(f" → Will use: {obj.get_name()}") + + # Rest of the aggregation logic remains the same... + # Aggregate training statistics across all training phase Stats objects + aggregated = {} + + # Initialize aggregated data structure for each worker + for worker_name in self.workers: + aggregated[worker_name] = { + 'memory_train_ema_usage_list': [], + 'memory_train_peak_usage_list': [], + 'cpu_train_util_per_core_list': {}, + 'time_train_active_list': [], + 'time_train_total_list': [] + } + + # Collect data from each training phase Stats object + for idx, train_stats_obj in enumerate(training_stats_objects): + if debug: + print(f"\n--- Processing training Stats object {idx+1}/{len(training_stats_objects)} ---") + print(f"Name: {train_stats_obj.get_name()}, Phase: {train_stats_obj.get_phase()}") + + # Get performance stats from this training phase + try: + perf_stats = train_stats_obj.get_performance_stats_clients() + if debug: + print(f"Performance stats keys: {list(perf_stats.keys())}") + for key, value in perf_stats.items(): + print(f" Client '{key}': {list(value.keys()) if isinstance(value, dict) else 'Not a dict'}") + except Exception as e: + if debug: + print(f"Error getting performance stats: {e}") + perf_stats = {} + + for worker_name in self.workers: + if debug: + print(f"\n Processing worker: {worker_name}") + + # Map worker to client using the same logic as before + client_perf = {} + possible_client_names = [ + worker_name, + f"c{worker_name}", + f"client{worker_name}", + f"c{worker_name.replace('w', '')}", + f"client{worker_name.replace('w', '')}" + ] + + if debug: + print(f" Trying client mappings: {possible_client_names}") + + for possible_name in possible_client_names: + if possible_name in perf_stats: + client_perf = perf_stats[possible_name] + if debug: + print(f" ✓ Training mapping found: {worker_name} -> {possible_name}") + print(f" Client perf data keys: {list(client_perf.keys())}") + break + + if not client_perf and len(perf_stats) == 1: + client_perf = list(perf_stats.values())[0] + if debug: + print(f" ✓ Training: Using single available client data for worker {worker_name}") + print(f" Client perf data keys: {list(client_perf.keys())}") + + if not client_perf: + if debug: + print(f" ✗ No client performance data found for worker {worker_name}") + + # Collect performance data from this training phase + if client_perf: + memory_ema = client_perf.get('memory_train_ema_usage', 0) + memory_peak = client_perf.get('memory_train_peak_usage', 0) + time_active = client_perf.get('time_train_active', 0) + time_total = client_perf.get('time_train_total', 0) + + aggregated[worker_name]['memory_train_ema_usage_list'].append(memory_ema) + aggregated[worker_name]['memory_train_peak_usage_list'].append(memory_peak) + aggregated[worker_name]['time_train_active_list'].append(time_active) + aggregated[worker_name]['time_train_total_list'].append(time_total) + + if debug: + print(f" Performance data collected:") + print(f" memory_train_ema_usage: {memory_ema}") + print(f" memory_train_peak_usage: {memory_peak}") + print(f" time_train_active: {time_active}") + print(f" time_train_total: {time_total}") + + # CPU utilization per core from this training phase + cpu_train_util = client_perf.get('cpu_train_util_per_core', {}) + if debug: + print(f" cpu_train_util_per_core: {cpu_train_util}") + + for core_num, util in cpu_train_util.items(): + if core_num not in aggregated[worker_name]['cpu_train_util_per_core_list']: + aggregated[worker_name]['cpu_train_util_per_core_list'][core_num] = [] + aggregated[worker_name]['cpu_train_util_per_core_list'][core_num].append(util) + + # Debug: Print final aggregated data + if debug: + print(f"\n=== FINAL AGGREGATED DATA ===") + for worker_name in self.workers: + print(f"Worker {worker_name}:") + agg_data = aggregated[worker_name] + print(f" memory_train_ema_usage_list: {agg_data['memory_train_ema_usage_list']}") + print(f" memory_train_peak_usage_list: {agg_data['memory_train_peak_usage_list']}") + print(f" time_train_active_list: {agg_data['time_train_active_list']}") + print(f" time_train_total_list: {agg_data['time_train_total_list']}") + print(f" cpu_train_util_per_core_list: {agg_data['cpu_train_util_per_core_list']}") + + return aggregated + + def generate_summary_row(self, stats_obj, debug=False): + """ + Generate a single row of summary data for one stats object + """ + row_data = OrderedDict() + + # Basic experiment info + row_data["Batch Size"] = stats_obj.batch_size + row_data["Frequency"] = stats_obj.freq + row_data["Num Of Sources"] = stats_obj.num_of_sources + row_data["Samples/Second"] = self.calculate_samples_per_second(stats_obj) + + # Model performance aggregates + model_perf = self.get_model_performance_aggregates(stats_obj) + row_data["Min. Accuracy"] = model_perf['min_accuracy'] + row_data["Avg. Accuracy"] = model_perf['avg_accuracy'] + row_data["Min. Precision"] = model_perf['min_precision'] + row_data["Avg. Precision"] = model_perf['avg_precision'] + row_data["Min. F1-Score"] = model_perf['min_f1'] + row_data["Avg. F1-Score"] = model_perf['avg_f1'] + + # Get aggregated training statistics + training_aggregates = self.get_training_aggregates(stats_obj, debug=debug) + + # Communication stats per worker + comm_stats = stats_obj.get_communication_stats_workers() + for worker_name in self.workers: + worker_comm = comm_stats.get(worker_name, {}) + + # Current prediction phase stats + predict_received = worker_comm.get('batches_received_predict', 0) + predict_dropped = worker_comm.get('batches_dropped_predict', 0) + predict_total = predict_received + predict_dropped + predict_drop_pct = (predict_dropped / predict_total * 100) if predict_total > 0 else 0 + + # Training batch counts - get from current stats object (prediction phase) + # instead of aggregating from training phases to avoid double counting + current_worker_comm = comm_stats.get(worker_name, {}) + train_dropped_current = current_worker_comm.get('batches_dropped_train', 0) + train_received_current = current_worker_comm.get('batches_received_train', 0) + train_total_current = train_dropped_current + train_received_current + train_drop_pct_current = (train_dropped_current / train_total_current * 100) if train_total_current > 0 else 0 + + row_data[f"{worker_name} % Dropped Training"] = train_drop_pct_current + row_data[f"{worker_name} % Dropped Prediction"] = predict_drop_pct + row_data[f"{worker_name} # Dropped Training"] = train_dropped_current + row_data[f"{worker_name} # Dropped Prediction"] = predict_dropped + row_data[f"{worker_name} Total Batches Training"] = train_total_current + row_data[f"{worker_name} Total Batches Prediction"] = predict_total + + # Performance stats per worker (from clients) + perf_stats = stats_obj.get_performance_stats_clients() + + # Debug: print available clients to understand the mapping + if debug: + print(f"Available clients in performance stats: {list(perf_stats.keys())}") + print(f"Workers list: {self.workers}") + + for worker_name in self.workers: + # Try different possible mappings: worker_name, c+worker_name, client+worker_name + client_perf = {} + possible_client_names = [ + worker_name, + f"c{worker_name}", + f"client{worker_name}", + f"c{worker_name.replace('w', '')}", # if worker is 'w1', try 'c1' + f"client{worker_name.replace('w', '')}" # if worker is 'w1', try 'client1' + ] + + for possible_name in possible_client_names: + if possible_name in perf_stats: + client_perf = perf_stats[possible_name] + if debug: + print(f"Found mapping: {worker_name} -> {possible_name}") + break + + if not client_perf: + # If no direct mapping found, use the first available client if there's only one + if len(perf_stats) == 1: + client_perf = list(perf_stats.values())[0] + if debug: + print(f"Warning: Using single available client data for worker {worker_name}") + + # Aggregated training performance stats + if worker_name in training_aggregates: + train_agg = training_aggregates[worker_name] + + if debug: + print(f"\n=== APPLYING AGGREGATION RULES FOR {worker_name} ===") + print(f"Raw training data:") + print(f" time_train_active_list: {train_agg['time_train_active_list']}") + print(f" time_train_total_list: {train_agg['time_train_total_list']}") + print(f" memory_train_ema_usage_list: {train_agg['memory_train_ema_usage_list']}") + print(f" memory_train_peak_usage_list: {train_agg['memory_train_peak_usage_list']}") + + # Accumulated time (sum across all training phases) + accumulated_train_active = sum(train_agg['time_train_active_list']) + accumulated_train_total = sum(train_agg['time_train_total_list']) + + # Average EMA memory usage across training phases + avg_memory_train_ema = (np.mean(train_agg['memory_train_ema_usage_list']) + if train_agg['memory_train_ema_usage_list'] else 0) + + # Max peak memory usage across training phases + max_memory_train_peak = (max(train_agg['memory_train_peak_usage_list']) + if train_agg['memory_train_peak_usage_list'] else 0) + + if debug: + print(f"Applied aggregation rules:") + print(f" accumulated_train_active (sum): {accumulated_train_active}") + print(f" accumulated_train_total (sum): {accumulated_train_total}") + print(f" avg_memory_train_ema (avg): {avg_memory_train_ema}") + print(f" max_memory_train_peak (max): {max_memory_train_peak}") + else: + if debug: + print(f"\n=== NO TRAINING AGGREGATES FOUND FOR {worker_name} ===") + accumulated_train_active = 0 + accumulated_train_total = 0 + avg_memory_train_ema = 0 + max_memory_train_peak = 0 + + # Current prediction phase stats + row_data[f"{worker_name} Accumulated Time Train Active"] = accumulated_train_active + row_data[f"{worker_name} Accumulated Time Train Total"] = accumulated_train_total + row_data[f"{worker_name} Accumulated Time Predict Active"] = client_perf.get('time_predict_active', 0) + row_data[f"{worker_name} Accumulated Time Predict Total"] = client_perf.get('time_predict_total', 0) + row_data[f"{worker_name} Memory Train EMA Usage"] = avg_memory_train_ema + row_data[f"{worker_name} Memory Predict EMA Usage"] = client_perf.get('memory_predict_ema_usage', 0) + row_data[f"{worker_name} Memory Train Peak Usage"] = max_memory_train_peak + row_data[f"{worker_name} Memory Predict Peak Usage"] = client_perf.get('memory_predict_peak_usage', 0) + row_data[f"{worker_name} Num Of Cores"] = client_perf.get('num_of_cores', 0) + + if debug: + print(f"\n=== FINAL VALUES ASSIGNED TO CSV FOR {worker_name} ===") + print(f" {worker_name} Accumulated Time Train Active: {accumulated_train_active}") + print(f" {worker_name} Accumulated Time Train Total: {accumulated_train_total}") + print(f" {worker_name} Memory Train EMA Usage: {avg_memory_train_ema}") + print(f" {worker_name} Memory Train Peak Usage: {max_memory_train_peak}") + print(f" {worker_name} Accumulated Time Predict Active: {client_perf.get('time_predict_active', 0)}") + print(f" {worker_name} Memory Predict EMA Usage: {client_perf.get('memory_predict_ema_usage', 0)}") + + # CPU utilization per core + cpu_predict_util = client_perf.get('cpu_predict_util_per_core', {}) + num_cores = client_perf.get('num_of_cores', 0) + + for core_num in range(num_cores): + # Average CPU training utilization across all training phases + if (worker_name in training_aggregates and + core_num in training_aggregates[worker_name]['cpu_train_util_per_core_list']): + avg_cpu_train_util = np.mean(training_aggregates[worker_name]['cpu_train_util_per_core_list'][core_num]) + else: + avg_cpu_train_util = 0 + + row_data[f"{worker_name} CPU Train Util Core {core_num}"] = avg_cpu_train_util + row_data[f"{worker_name} CPU Predict Util Core {core_num}"] = cpu_predict_util.get(core_num, 0) + + if debug: + print(f" {worker_name} CPU Train Util Core {core_num}: {avg_cpu_train_util}") + print(f" {worker_name} CPU Predict Util Core {core_num}: {cpu_predict_util.get(core_num, 0)}") + + return row_data + + def generate_summary_csv(self, output_path=None, debug=False, force_append=False): + """ + Generate a comprehensive summary CSV from all stats objects + Only includes prediction phase experiments + + Args: + output_path: Path to save the CSV file + debug: Enable debug printing + force_append: If True, always append new rows without checking for duplicates. + If False (default), replace existing experiments with same name. + """ + import os + + summary_rows = [] + + for stats_obj in self.stats_list: + # Only include prediction phases + if stats_obj.get_phase() == PHASE_PREDICTION_STR: + row_data = self.generate_summary_row(stats_obj, debug=debug) + # Add identifiers for this experiment + row_data_with_id = OrderedDict() + row_data_with_id["Phase"] = stats_obj.get_phase() + row_data_with_id["Experiment"] = f"{stats_obj.get_name()}" + row_data_with_id.update(row_data) + summary_rows.append(row_data_with_id) + + # Create DataFrame for new data + new_summary_df = pd.DataFrame(summary_rows) + + # Save to CSV if path provided + if output_path: + if os.path.exists(output_path): + # File exists, append new rows + try: + existing_df = pd.read_csv(output_path) + + if debug: + print(f"Existing file has {len(existing_df)} rows") + print(f"Adding {len(new_summary_df)} new rows") + print(f"New experiments being added: {new_summary_df['Experiment'].tolist()}") + if not existing_df.empty: + print(f"Existing experiments: {existing_df['Experiment'].tolist()}") + print(f"Force append mode: {force_append}") + + if force_append: + # Always append without checking for duplicates + combined_df = pd.concat([existing_df, new_summary_df], ignore_index=True) + print(f"Force appended {len(new_summary_df)} experiment(s) to existing file") + else: + # Check for exact duplicate experiments and handle them + if not existing_df.empty and not new_summary_df.empty: + new_experiment_names = set(new_summary_df['Experiment'].tolist()) + existing_experiment_names = set(existing_df['Experiment'].tolist()) + + # Find which experiments are duplicates + duplicate_experiments = new_experiment_names.intersection(existing_experiment_names) + truly_new_experiments = new_experiment_names - existing_experiment_names + + if debug: + print(f"Duplicate experiments to replace: {list(duplicate_experiments)}") + print(f"Truly new experiments to add: {list(truly_new_experiments)}") + + if duplicate_experiments: + # Remove only the exact duplicates from existing data + existing_df_filtered = existing_df[~existing_df['Experiment'].isin(duplicate_experiments)] + print(f"Replacing {len(duplicate_experiments)} existing experiment(s) with updated data") + else: + existing_df_filtered = existing_df + + if truly_new_experiments: + print(f"Adding {len(truly_new_experiments)} new experiment(s)") + + # Combine filtered existing data with new data + combined_df = pd.concat([existing_df_filtered, new_summary_df], ignore_index=True) + else: + # If either DataFrame is empty, just concatenate + combined_df = pd.concat([existing_df, new_summary_df], ignore_index=True) + + # Save the combined DataFrame + combined_df.to_csv(output_path, index=False) + + if debug: + print(f"Final file has {len(combined_df)} rows") + print(f"Experiments in final file: {combined_df['Experiment'].tolist()}") + + print(f"Updated file: {output_path} (now contains {len(combined_df)} total experiments)") + + except Exception as e: + print(f"Warning: Could not read existing file {output_path}. Creating new file. Error: {e}") + new_summary_df.to_csv(output_path, index=False) + print(f"New summary file created: {output_path}") + else: + # File doesn't exist, create new file + new_summary_df.to_csv(output_path, index=False) + print(f"New summary file created with {len(new_summary_df)} experiment(s): {output_path}") + + return new_summary_df + + def print_summary_stats(self): + """ + Print basic summary statistics + """ + print("Experiment Summary Statistics:") + print(f"Number of experiments: {len(self.stats_list)}") + print(f"Workers: {self.workers}") + + for i, stats_obj in enumerate(self.stats_list): + print(f"\nExperiment {i+1}: {stats_obj.get_name()} ({stats_obj.get_phase()})") + print(f" Batch Size: {stats_obj.batch_size}") + print(f" Frequency: {stats_obj.freq}") + print(f" Number of Sources: {stats_obj.num_of_sources}") + print(f" Samples/Second: {self.calculate_samples_per_second(stats_obj)}") diff --git a/src_py/apiServer/experiment_phase.py b/src_py/apiServer/experiment_phase.py index 9ef438d6..24033443 100644 --- a/src_py/apiServer/experiment_phase.py +++ b/src_py/apiServer/experiment_phase.py @@ -6,19 +6,19 @@ from decoderHttpMainServer import * class ExperimentPhase(): - def __init__(self, experiment_flow_name : str, experiment_flow_type: str, name : str, phase_type: str, network_componenets: NetworkComponents, num_of_features: str): + def __init__(self, experiment_flow_name : str, experiment_flow_type: str, name : str, phase_type: str, network_components: NetworkComponents, num_of_features: str): self.experiment_flow_name = experiment_flow_name self.experiment_flow_type = experiment_flow_type self.name = name self.phase_type = phase_type # training/prediction assert self.phase_type in [PHASE_TRAINING_STR, PHASE_PREDICTION_STR] - self.nerl_comm_db = NerlComDB(network_componenets) - self.nerl_perf_db = NerlPerfDB(network_componenets) + self.nerl_comm_db = NerlComDB(network_components) + self.nerl_perf_db = NerlPerfDB(network_components) self.nerl_model_db = NerlModelDB(self.phase_type) self.source_pieces_dict = {} # Dict of SourcePieceDS self.num_of_features = num_of_features self.raw_data_buffer = [] - self.network_componenets = network_componenets + self.network_components = network_components def get_raw_data_buffer(self): return self.raw_data_buffer @@ -31,7 +31,7 @@ def process_experiment_phase_data(self): list_of_decoded_data = decode_phase_result_data_json_from_main_server(self.raw_data_buffer[0]) for decoded_data in list_of_decoded_data: worker_name, source_name, duration, batch_id, batch_ts, distributed_token, np_tensor = decoded_data - client_name = self.network_componenets.get_client_name_by_worker_name(worker_name) + client_name = self.network_components.get_client_name_by_worker_name(worker_name) self.nerl_model_db.get_client(client_name).get_worker(worker_name).create_batch(batch_id, source_name, np_tensor, duration, distributed_token, batch_ts) self.clean_raw_data_buffer() @@ -42,12 +42,18 @@ def get_phase_type(self): def get_name(self): return self.name + def get_experiment_name(self): + return self.experiment_flow_name + def get_experiment_flow_name(self): return self.experiment_flow_name def get_experiment_flow_type(self): return self.experiment_flow_type + def get_network_components(self): + return self.network_components + def get_sources_str_list(self): return ",".join(self.source_pieces_dict.keys()) diff --git a/src_py/apiServer/networkComponents.py b/src_py/apiServer/networkComponents.py index d0387794..bf15b2ed 100644 --- a/src_py/apiServer/networkComponents.py +++ b/src_py/apiServer/networkComponents.py @@ -100,6 +100,9 @@ def __init__(self, dc_json: dict): def get_map_worker_to_client(self): return self.map_worker_to_client + def get_source_epochs_dict(self): + return self.sourceEpochs + def get_client_name_by_worker_name(self, worker_name): return self.map_worker_to_client[worker_name] @@ -112,7 +115,18 @@ def get_api_server_ip_port(self): api_server_port = self.jsonData[API_SERVER_STR][GetFields.get_port_field_name()] api_server_ip = self.map_device_to_ip[self.map_entity_to_device[API_SERVER_STR]] return api_server_ip, api_server_port - + + def get_freq(self): + return self.frequency + + def get_batch_size(self): + return self.batchSize + + def get_num_of_sources(self): + return len(self.sources) + + def get_workers_list(self): + return self.workers def printComponents(self): LOG_INFO(f"\nNetwork components:\n \ diff --git a/src_py/apiServer/stats.py b/src_py/apiServer/stats.py index e8194bac..6f4dbdfb 100644 --- a/src_py/apiServer/stats.py +++ b/src_py/apiServer/stats.py @@ -24,7 +24,13 @@ def __init__(self, experiment_phase: ExperimentPhase): self.nerl_comm_db = self.experiment_phase.get_nerl_comm_db() self.nerl_perf_db = self.experiment_phase.get_nerl_perf_db() self.phase = self.experiment_phase.get_phase_type() - self.name = self.experiment_phase.get_name() + self.name = self.experiment_phase.get_experiment_name() + self.net_comps = self.experiment_phase.get_network_components() + self.workers_list = self.net_comps.get_workers_list() + self.freq = self.net_comps.get_freq() + self.batch_size = self.net_comps.get_batch_size() + self.num_of_sources = self.net_comps.get_num_of_sources() + self.sources_epochs_dict = self.net_comps.get_source_epochs_dict() self.loss_ts_pd = None self.missed_batches_warning_msg = False self.experiment_flow_type = self.experiment_phase.get_experiment_flow_type() @@ -299,42 +305,63 @@ def build_worker_label_df(original_df, batch_ids, batch_size): if plot: workers = sorted(list({tup[0] for tup in confusion_matrix_worker_dict.keys()})) classes = sorted(list({tup[1] for tup in confusion_matrix_worker_dict.keys()})) - if len(workers) > 1: - fig, ax = plt.subplots(nrows=len(workers), ncols=len(classes),figsize=(4*len(classes), 4*len(workers)), dpi=140) - if len(classes) > 1: - for i , worker in enumerate(workers): - for j , pred_class in enumerate(classes): - conf_mat = confusion_matrix_worker_dict[(worker , pred_class)] - heatmap = sns.heatmap(data=conf_mat ,ax=ax[i,j], annot=True , fmt="d", cmap='Blues',annot_kws={"size": 8}, cbar_kws={'pad': 0.1}) - cbar = heatmap.collections[0].colorbar - cbar.ax.tick_params(labelsize = 8) - ax[i, j].set_title(f"{worker} , Class '{pred_class}'" , fontsize=12) - ax[i, j].tick_params(axis='both', which='major', labelsize=8) - ax[i, j].set_xlabel("Predicted Label" , fontsize=8) - ax[i, j].set_ylabel("True Label" , fontsize=8) - ax[i, j].set_aspect('equal') - else: - for i, worker in enumerate(workers): - conf_mat = confusion_matrix_worker_dict[(worker , classes[0])] - heatmap = sns.heatmap(data=conf_mat ,ax=ax[i], annot=True , fmt="d", cmap='Blues',annot_kws={"size": 8}, cbar_kws={'pad': 0.1}) - cbar = heatmap.collections[0].colorbar - cbar.ax.tick_params(labelsize = 8) - ax[i].set_title(f"{worker} , Class '{classes[0]}'" , fontsize=12) - ax[i].tick_params(axis='both', which='major', labelsize=8) - ax[i].set_xlabel("Predicted Label" , fontsize=8) - ax[i].set_ylabel("True Label" , fontsize=8) - ax[i].set_aspect('equal') - fig.subplots_adjust(wspace=0.4 , hspace=0.4) - else: - plt.figure(figsize=(4*len(classes), 3), dpi=140) - conf_mat = confusion_matrix_worker_dict[(workers[0] , classes[0])] - heatmap = sns.heatmap(data=conf_mat , annot=True , fmt="d", cmap='Blues',annot_kws={"size": 8}, cbar_kws={'pad': 0.1}) + fig, ax = plt.subplots(nrows=len(workers), ncols=len(classes),figsize=(4*len(classes), 4*len(workers)), dpi=140) + + # Handle different combinations of workers and classes + if len(workers) == 1 and len(classes) == 1: + # Single worker, single class + worker = workers[0] + pred_class = classes[0] + conf_mat = confusion_matrix_worker_dict[(worker, pred_class)] + heatmap = sns.heatmap(data=conf_mat, ax=ax, annot=True, fmt="d", cmap='Blues', annot_kws={"size": 8}, cbar_kws={'pad': 0.1}) cbar = heatmap.collections[0].colorbar - cbar.ax.tick_params(labelsize = 8) - plt.title(f"{workers[0]} , Class '{classes[0]}'" , fontsize=12) - plt.xlabel("Predicted Label" , fontsize=8) - plt.ylabel("True Label" , fontsize=8) - plt.tick_params(axis='both', which='major', labelsize=8) + cbar.ax.tick_params(labelsize=8) + ax.set_title(f"{worker} , Class '{pred_class}'", fontsize=12) + ax.tick_params(axis='both', which='major', labelsize=8) + ax.set_xlabel("Predicted Label", fontsize=8) + ax.set_ylabel("True Label", fontsize=8) + ax.set_aspect('equal') + elif len(workers) == 1 and len(classes) > 1: + # Single worker, multiple classes + worker = workers[0] + for j, pred_class in enumerate(classes): + conf_mat = confusion_matrix_worker_dict[(worker, pred_class)] + heatmap = sns.heatmap(data=conf_mat, ax=ax[j], annot=True, fmt="d", cmap='Blues', annot_kws={"size": 8}, cbar_kws={'pad': 0.1}) + cbar = heatmap.collections[0].colorbar + cbar.ax.tick_params(labelsize=8) + ax[j].set_title(f"{worker} , Class '{pred_class}'", fontsize=12) + ax[j].tick_params(axis='both', which='major', labelsize=8) + ax[j].set_xlabel("Predicted Label", fontsize=8) + ax[j].set_ylabel("True Label", fontsize=8) + ax[j].set_aspect('equal') + elif len(workers) > 1 and len(classes) == 1: + # Multiple workers, single class + pred_class = classes[0] + for i, worker in enumerate(workers): + conf_mat = confusion_matrix_worker_dict[(worker, pred_class)] + heatmap = sns.heatmap(data=conf_mat, ax=ax[i], annot=True, fmt="d", cmap='Blues', annot_kws={"size": 8}, cbar_kws={'pad': 0.1}) + cbar = heatmap.collections[0].colorbar + cbar.ax.tick_params(labelsize=8) + ax[i].set_title(f"{worker} , Class '{pred_class}'", fontsize=12) + ax[i].tick_params(axis='both', which='major', labelsize=8) + ax[i].set_xlabel("Predicted Label", fontsize=8) + ax[i].set_ylabel("True Label", fontsize=8) + ax[i].set_aspect('equal') + else: + # Multiple workers, multiple classes + for i, worker in enumerate(workers): + for j, pred_class in enumerate(classes): + conf_mat = confusion_matrix_worker_dict[(worker, pred_class)] + heatmap = sns.heatmap(data=conf_mat, ax=ax[i, j], annot=True, fmt="d", cmap='Blues', annot_kws={"size": 8}, cbar_kws={'pad': 0.1}) + cbar = heatmap.collections[0].colorbar + cbar.ax.tick_params(labelsize=8) + ax[i, j].set_title(f"{worker} , Class '{pred_class}'", fontsize=12) + ax[i, j].tick_params(axis='both', which='major', labelsize=8) + ax[i, j].set_xlabel("Predicted Label", fontsize=8) + ax[i, j].set_ylabel("True Label", fontsize=8) + ax[i, j].set_aspect('equal') + + fig.subplots_adjust(wspace=0.4, hspace=0.4) plt.show() return confusion_matrix_source_dict, confusion_matrix_worker_dict