Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions inputJsonsFiles/vision/conn_paper_mnist_3d_1s_4r_1c_1w.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"connectionsMap":
{
"r1":["mainServer", "c1", "s1", "r2"],
"r2":["r3"],
"r3":["r4"],
"r4":["r1"]
}
}
112 changes: 112 additions & 0 deletions inputJsonsFiles/vision/dc_paper_mnist_3d_1s_4r_1c_1w.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
{
"nerlnetSettings": {
"frequency": "100",
"batchSize": "25"
},
"mainServer": {
"port": "8080",
"args": ""
},
"apiServer": {
"port": "8081",
"args": ""
},
"devices": [
{
"name": "NerlControllerG",
"ipv4": "172.31.91.176",
"entities": "mainServer,apiServer,r1"
},
{
"name": "NerlControllerD",
"ipv4": "172.31.93.88",
"entities": "s1,r2,r3"
},
{
"name": "Nerl-Powerful",
"ipv4": "172.31.44.250",
"entities": "c1,r4"
}
],
"routers": [
{
"name": "r1",
"port": "8090",
"policy": "0"
},
{
"name": "r2",
"port": "8091",
"policy": "0"
},
{
"name": "r3",
"port": "8092",
"policy": "0"
},
{
"name": "r4",
"port": "8093",
"policy": "0"
}
],
"sources": [
{
"name": "s1",
"port": "8086",
"frequency": "25",
"policy": "0",
"epochs": "4",
"type": "0"
}
],
"clients": [
{
"name": "c1",
"port": "8082",
"workers": "w1"
}
],
"workers": [
{
"name": "w1",
"model_sha": "9c5f1261068be7be96487a2cae282aa22e8c1cb482a5bf8d557bc8e1e2b6fef0"
}
],
"model_sha": {
"9c5f1261068be7be96487a2cae282aa22e8c1cb482a5bf8d557bc8e1e2b6fef0": {
"modelType": "0",
"_doc_modelType": " nn:0 | approximation:1 | classification:2 | forecasting:3 | image_classification:4 | text_classification:5 | text_generation:6 | auto_association:7 | autoencoder:8 | ae_classifier:9 |",
"modelArgs": "",
"_doc_modelArgs": "Extra arguments to model",
"layersSizes": "28x28x1k5x5x1x6p0s1t1,28x28x6k2x2p0s2,14x14x6k4x4x6x12p0s1t0,1,32,10",
"_doc_layersSizes": "List of postive integers [L0, L1, ..., LN]",
"layerTypesList": "2,4,2,9,3,5",
"_doc_LayerTypes": " Default:0 | Scaling:1 | Conv:2 | Perceptron:3 | Pooling:4 | Probabilistic:5 | LSTM:6 | Reccurrent:7 | Unscaling:8 | Flatten:9 | Bounding:10 |",
"layers_functions": "6,2,6,1,6,4",
"_doc_layers_functions_activation": " Threshold:1 | Sign:2 | Logistic:3 | Tanh:4 | Linear:5 | ReLU:6 | eLU:7 | SeLU:8 | Soft-plus:9 | Soft-sign:10 | Hard-sigmoid:11 |",
"_doc_layer_functions_pooling": " none:1 | Max:2 | Avg:3 |",
"_doc_layer_functions_probabilistic": " Binary:1 | Logistic:2 | Competitive:3 | Softmax:4 |",
"_doc_layer_functions_scaler": " none:1 | MinMax:2 | MeanStd:3 | STD:4 | Log:5 |",
"lossMethod": "2",
"lossArgs": "",
"_doc_lossMethod": " SSE:1 | MSE:2 | NSE:3 | MinkowskiE:4 | WSE:5 | CEE:6 |",
"lr": "0.01",
"_doc_lr": "Positve float",
"epochs": "1",
"_doc_epochs": "Positve Integer",
"optimizer": "5",
"_doc_optimizer": " GD:0 | CGD:1 | SGD:2 | QuasiNeuton:3 | LVM:4 | ADAM:5 |",
"optimizerArgs": "none",
"_doc_optimizerArgs": "String",
"infraType": "0",
"_doc_infraType": " opennn:0 | wolfengine:1 |",
"distributedSystemType": "0",
"_doc_distributedSystemType": " none:0 | FedClientAvg:1 | FedServerAvg:2 | FedClientWeightedAvgClassification:3 | FedServerWeightedAvgClassification:4 | FedClientAE:5 | FedServerAE:6 | tiles:7 |",
"distributedSystemArgs": "none",
"_doc_distributedSystemArgs": "String",
"distributedSystemToken": "none",
"_doc_distributedSystemToken": "Token that associates distributed group of workers and parameter-server"
}
}
}
56 changes: 56 additions & 0 deletions inputJsonsFiles/vision/exp_paper_mnist_3d_1s_4r_1c_1w.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
{
"experimentName": "mnist_rr",
"experimentType": "classification",
"batchSize": 25,
"csvFilePath": "/tmp/nerlnet/data/NerlnetData-master/nerlnet/mnist_norm/mnist_train_255_norm.csv",
"numOfFeatures": "784",
"numOfLabels": "10",
"headersNames": "0,1,2,3,4,5,6,7,8,9",
"Phases":
[
{
"phaseName": "training_phase1",
"phaseType": "training",
"sourcePieces":
[
{
"sourceName": "s1",
"startingSample": "0",
"numOfBatches": "400",
"workers": "w1",
"nerltensorType": "float"
}
]
},
{
"phaseName": "training_phase2",
"phaseType": "training",
"sourcePieces":
[
{
"sourceName": "s1",
"startingSample": "20000",
"numOfBatches": "400",
"workers": "w1",
"nerltensorType": "float"
}
]
},
{
"phaseName": "prediction_phase",
"phaseType": "prediction",
"sourcePieces":
[
{
"sourceName": "s1",
"startingSample": "50000",
"numOfBatches": "400",
"workers": "w1",
"nerltensorType": "float"
}
]
}
]
}


2 changes: 2 additions & 0 deletions src_erl/NerlnetApp/src/Client/clientStatem.erl
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ idle(cast, In = {training}, State = #client_statem_state{myName = _MyName, etsRe
cast_message_to_workers(EtsRef, MessageToCast),
ets:update_element(EtsRef, all_workers_done, {?DATA_IDX, false}),
stats:performance_stats_reset(PerformanceStatsEts),
stats:communication_stats_reset(ClientStatsEts),
stats:tic(ClientStatsEts, time_train_total),
stats:reset_query_cpu_util_cores(),
{next_state, waitforWorkers, State#client_statem_state{waitforWorkers = clientWorkersFunctions:get_workers_names(EtsRef), nextState = training}};
Expand All @@ -208,6 +209,7 @@ idle(cast, In = {predict}, State = #client_statem_state{etsRef = EtsRef}) ->
MessageToCast = {predict},
cast_message_to_workers(EtsRef, MessageToCast),
stats:performance_stats_reset(PerformanceStatsEts),
stats:communication_stats_reset(ClientStatsEts),
stats:tic(ClientStatsEts, time_predict_total),
stats:reset_query_cpu_util_cores(),
{next_state, waitforWorkers, State#client_statem_state{waitforWorkers = clientWorkersFunctions:get_workers_names(EtsRef),nextState = predict}};
Expand Down
15 changes: 14 additions & 1 deletion src_erl/NerlnetApp/src/Stats/stats.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
% performance stats
-export([generate_performance_stats_ets/0]).
-export([start_os_mon/0]).
-export([performance_stats_reset/1]).
-export([performance_stats_reset/1, communication_stats_reset/1]).
% perofmance stats getters/setters
-export([get_time_train_active/1, increment_time_train_active/2]).
-export([get_time_train_total/1, increment_time_train_total/2]).
Expand Down Expand Up @@ -237,6 +237,19 @@ get_bad_messages(StatsEts) ->
increment_bad_messages(StatsEts) ->
ets:update_counter(StatsEts, ?STATS_ATOM_BAD_MSG, 1).

communication_stats_reset(ComStatsEts) ->
% Reset all communication stats to zero
ets:update_element(ComStatsEts, ?STATS_ATOM_MSG_RECV, {?STATS_KEYVAL_VAL_IDX, 0}),
ets:update_element(ComStatsEts, ?STATS_ATOM_MSG_SENT, {?STATS_KEYVAL_VAL_IDX, 0}),
ets:update_element(ComStatsEts, ?STATS_ATOM_MSG_DROP, {?STATS_KEYVAL_VAL_IDX, 0}),
ets:update_element(ComStatsEts, ?STATS_ATOM_BYTES_RECV, {?STATS_KEYVAL_VAL_IDX, 0}),
ets:update_element(ComStatsEts, ?STATS_ATOM_BYTES_SENT, {?STATS_KEYVAL_VAL_IDX, 0}),
ets:update_element(ComStatsEts, ?STATS_ATOM_BAD_MSG, {?STATS_KEYVAL_VAL_IDX, 0}),
ets:update_element(ComStatsEts, ?STATS_ATOM_BATCHES_RECEIVED, {?STATS_KEYVAL_VAL_IDX, 0}), % related with client only
ets:update_element(ComStatsEts, ?STATS_ATOM_BATCHES_DROPPED, {?STATS_KEYVAL_VAL_IDX, 0}), % related with client only
ets:update_element(ComStatsEts, ?STATS_ATOM_BATCHES_SENT, {?STATS_KEYVAL_VAL_IDX, 0}), % related with source only
ets:update_element(ComStatsEts, ?STATS_ATOM_ACTUAL_FREQUENCY, {?STATS_KEYVAL_VAL_IDX, 0}), % related with source only
ok.

performance_stats_reset(PerfStatsEts) ->
% Reset all performance stats to zero
Expand Down
1 change: 1 addition & 0 deletions src_erl/NerlnetApp/src/Stats/stats.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
-define(STATS_ATOM_BATCHES_RECEIVED, batches_received).
-define(STATS_ATOM_BATCHES_DROPPED, batches_dropped).
-define(STATS_ATOM_BATCHES_SENT, batches_sent).
-define(STATS_ATOM_ACTUAL_FREQUENCY, actual_frequency).

-define(STATS_KEYVAL_KEY_IDX, 1).
-define(STATS_KEYVAL_VAL_IDX, 2).
Expand Down
101 changes: 101 additions & 0 deletions src_py/apiServer/apiServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,107 @@ def experiment_phase_is_valid(self):
current_exp_flow = globe.experiment_focused_on
return current_exp_flow.current_exp_phase_index < len(current_exp_flow.exp_phase_list)

def run_all_experiment_phases(self):
"""
Runs all experiment phases sequentially from the experiment JSON file.

This function iterates through all phases defined in the experiment flow JSON,
executing each phase in order (training phases followed by prediction phases).
It collects statistics for each completed phase and returns them as a list.

The function will:
1. Print information about the current phase being executed (name and type)
2. Execute each phase using run_current_experiment_phase()
3. Generate and collect Stats objects for each completed phase
4. Move to the next phase using next_experiment_phase()
5. Continue until all phases are completed

Returns:
list: A list of Stats objects, one for each successfully completed phase.
Each Stats object contains performance metrics, communication statistics,
and other phase-specific data.

Raises:
AssertionError: If no valid experiment is currently focused or if required
setup (initialization, send_jsons_to_devices) is not completed.

Example:
# After initialization and sending JSONs to devices
api_server.send_jsons_to_devices()
all_stats = api_server.run_all_experiment_phases()

# Process results
for i, stats in enumerate(all_stats):
print(f"Phase {i+1}: {stats.get_name()} ({stats.get_phase()})")
if stats.get_phase() == "training":
loss_data = stats.get_loss_ts()
elif stats.get_phase() == "prediction":
confusion_matrices = stats.get_confusion_matrices()

Note:
- Requires that initialization() and send_jsons_to_devices() have been called first
- All NerlNet devices must be running and accessible
- The experiment JSON must contain valid phase definitions
"""
# Ensure we have a valid experiment focused
if self.current_exp is None:
raise AssertionError("No experiment is currently focused. Call initialization() and experiment_focused_on() first.")

# Ensure JSONs have been sent to devices
send_jsons_event = self.apiserver_event_sync.get_event_status(EventSync.SEND_JSONS)
if send_jsons_event != EventSync.DONE:
raise AssertionError("JSONs must be sent to devices first. Call send_jsons_to_devices() before running phases.")

all_phases_stats = []

# Get the initial phase information
current_exp_flow = self.current_exp
total_phases = len(current_exp_flow.exp_phase_list)

if total_phases == 0:
LOG_WARNING("No experiment phases found in the experiment flow")
return all_phases_stats

LOG_INFO(f"Starting to run all {total_phases} experiment phases for experiment: {current_exp_flow.get_exp_name()}")

# Run phases sequentially
phase_count = 1
while True:
if not self.next_expertiment_phase_exist:
LOG_WARNING("No valid experiment phase available to run")
break

current_phase = current_exp_flow.get_current_experiment_phase()
phase_name = current_phase.get_name()
phase_type = current_phase.get_phase_type()

LOG_INFO(f"Running phase {phase_count}/{total_phases}: '{phase_name}' (Type: {phase_type})")

try:
# Run the current phase
self.run_current_experiment_phase()

# Generate stats for the completed phase
phase_stats = current_exp_flow.generate_stats(current_phase)
all_phases_stats.append(phase_stats)

LOG_INFO(f"Completed phase {phase_count}/{total_phases}: '{phase_name}' ({phase_type})")

except Exception as e:
LOG_ERROR(f"Error running phase {phase_count}/{total_phases} '{phase_name}': {str(e)}")
# Continue with next phase instead of stopping completely

# Move to next phase
next_phase_type = self.next_experiment_phase()
if next_phase_type is None:
LOG_INFO("All experiment phases completed successfully")
break

phase_count += 1

LOG_INFO(f"Finished running all experiment phases. Total phases executed: {len(all_phases_stats)}")
return all_phases_stats

def list_datasets(self):
with open(HF_DATA_REPO_PATHS_JSON) as file:
repo_ids = json.load(file)
Expand Down
1 change: 1 addition & 0 deletions src_py/apiServer/apiServerHelp.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
-experiment_phase_is_valid() returns True if there are more experiment phases to run
-run_current_experiment_phase() runs the current experiment phase
-next_experiment_phase() moves to the next experiment phase and returns the phase type
-run_all_experiment_phases() runs all experiment phases sequentially and returns a list of Stats objects for each phase

======== Retrieving statistics ======
-get_experiment_flow(experiment_name).generate_stats() returns statistics object (E.g., assigned to StatsInst) class for the current experiment phase
Expand Down
Loading