From 16e0c27f648e8c1afaf851a0cfa7f5bde8cbf3a0 Mon Sep 17 00:00:00 2001 From: Sean Yang Date: Wed, 21 Aug 2024 21:33:43 -0700 Subject: [PATCH] ScriptExecutor improvements (#2820) * script executor improvements * move ScriptExecutor to job_config * rename ScriptExecutor to ScriptRunner, add TF versions of in process and ex process executors * fix dead links --------- Co-authored-by: Chester Chen <512707+chesterxgchen@users.noreply.github.com> --- examples/advanced/job_api/pt/README.md | 16 +- ....py => cyclic_cc_script_runner_cifar10.py} | 4 +- ...r10.py => fedavg_script_runner_cifar10.py} | 6 +- ...fedavg_script_runner_dp_filter_cifar10.py} | 4 +- ...fedavg_script_runner_lightning_cifar10.py} | 6 +- ...ar10.py => swarm_script_runner_cifar10.py} | 4 +- examples/advanced/job_api/sklearn/README.md | 4 +- ...higgs.py => kmeans_script_runner_higgs.py} | 8 +- examples/advanced/job_api/tf/README.md | 18 +- ...ar10.py => tf_fl_script_runner_cifar10.py} | 4 +- examples/getting_started/pt/README.md | 62 ------- .../nvflare_lightning_getting_started.ipynb | 6 +- .../pt/nvflare_pt_getting_started.ipynb | 48 ++--- examples/getting_started/sklearn/README.md | 4 +- ...higgs.py => kmeans_script_runner_higgs.py} | 8 +- examples/getting_started/tf/README.md | 12 +- .../tf/nvflare_tf_getting_started.ipynb | 8 +- ...ar10.py => tf_fl_script_runner_cifar10.py} | 4 +- examples/hello-world/hello-cyclic/README.md | 2 +- ...y => cyclic_script_runner_hello-cyclic.py} | 8 +- .../hello-world/hello-fedavg-numpy/README.md | 2 +- ...py => fedavg_script_runner_hello-numpy.py} | 6 +- .../hello-fedavg-numpy_flare_api.ipynb | 2 +- .../hello-fedavg-numpy_getting_started.ipynb | 8 +- .../hello-fedavg/hello-fedavg.ipynb | 6 +- .../pt_fedavg_early_stopping_script.py | 4 +- examples/hello-world/hello-pt/README.md | 4 +- ...pt.py => fedavg_script_runner_hello-pt.py} | 6 +- examples/hello-world/hello-tf/README.md | 4 +- ...tf.py => fedavg_script_runner_hello-tf.py} | 6 +- examples/hello-world/hello_world.ipynb | 2 +- .../app_common/executors/script_executor.py | 109 ----------- .../tf/client_api_launcher_executor.py | 33 ++++ .../tf/in_process_client_api_executor.py | 62 +++++++ nvflare/fuel/utils/pipe/cell_pipe.py | 19 +- nvflare/job_config/base_app_config.py | 3 - nvflare/job_config/fed_job_config.py | 4 +- nvflare/job_config/script_runner.py | 172 ++++++++++++++++++ 38 files changed, 394 insertions(+), 294 deletions(-) rename examples/advanced/job_api/pt/{cyclic_cc_script_executor_cifar10.py => cyclic_cc_script_runner_cifar10.py} (91%) rename examples/advanced/job_api/pt/{fedavg_script_executor_cifar10.py => fedavg_script_runner_cifar10.py} (88%) rename examples/advanced/job_api/pt/{fedavg_script_executor_dp_filter_cifar10.py => fedavg_script_runner_dp_filter_cifar10.py} (89%) rename examples/advanced/job_api/pt/{fedavg_script_executor_lightning_cifar10.py => fedavg_script_runner_lightning_cifar10.py} (83%) rename examples/advanced/job_api/pt/{swarm_script_executor_cifar10.py => swarm_script_runner_cifar10.py} (91%) rename examples/advanced/job_api/sklearn/{kmeans_script_executor_higgs.py => kmeans_script_runner_higgs.py} (96%) rename examples/advanced/job_api/tf/{tf_fl_script_executor_cifar10.py => tf_fl_script_runner_cifar10.py} (96%) rename examples/getting_started/sklearn/{kmeans_script_executor_higgs.py => kmeans_script_runner_higgs.py} (96%) rename examples/getting_started/tf/{tf_fl_script_executor_cifar10.py => tf_fl_script_runner_cifar10.py} (96%) rename examples/hello-world/hello-cyclic/{cyclic_script_executor_hello-cyclic.py => cyclic_script_runner_hello-cyclic.py} (86%) rename examples/hello-world/hello-fedavg-numpy/{fedavg_script_executor_hello-numpy.py => fedavg_script_runner_hello-numpy.py} (85%) rename examples/hello-world/hello-pt/{fedavg_script_executor_hello-pt.py => fedavg_script_runner_hello-pt.py} (86%) rename examples/hello-world/hello-tf/{fedavg_script_executor_hello-tf.py => fedavg_script_runner_hello-tf.py} (86%) delete mode 100644 nvflare/app_common/executors/script_executor.py create mode 100644 nvflare/app_opt/tf/client_api_launcher_executor.py create mode 100644 nvflare/app_opt/tf/in_process_client_api_executor.py create mode 100644 nvflare/job_config/script_runner.py diff --git a/examples/advanced/job_api/pt/README.md b/examples/advanced/job_api/pt/README.md index 180e22d5c4..18830da6a7 100644 --- a/examples/advanced/job_api/pt/README.md +++ b/examples/advanced/job_api/pt/README.md @@ -19,27 +19,27 @@ python "script_name.py" ``` ```commandline -python fedavg_script_executor_lightning_cifar10.py +python fedavg_script_runner_lightning_cifar10.py ``` -### 1. [Federated averaging using the script executor](./fedavg_script_executor_cifar10.py) +### 1. [Federated averaging using the script executor](./fedavg_script_runner_cifar10.py) Implementation of [FedAvg](https://arxiv.org/abs/1602.05629) using the [Client API](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/client_api.html). -### 2. [Federated averaging using script executor and differential privacy filter](./fedavg_script_executor_dp_filter_cifar10.py) +### 2. [Federated averaging using script executor and differential privacy filter](./fedavg_script_runner_dp_filter_cifar10.py) Implementation of [FedAvg](https://arxiv.org/abs/1602.05629) using the [Client API](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/client_api.html) with additional [differential privacy filters](https://arxiv.org/abs/1910.00962) on the client side. ```commandline -python fedavg_script_executor_dp_filter_cifar10.py +python fedavg_script_runner_dp_filter_cifar10.py ``` -### 3. [Swarm learning using script executor](./swarm_script_executor_cifar10.py) +### 3. [Swarm learning using script executor](./swarm_script_runner_cifar10.py) Implementation of [swarm learning](https://www.nature.com/articles/s41586-021-03583-3) using the [Client API](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/client_api.html) ```commandline -python swarm_script_executor_cifar10.py +python swarm_script_runner_cifar10.py ``` -### 4. [Cyclic weight transfer using script executor](./cyclic_cc_script_executor_cifar10.py) +### 4. [Cyclic weight transfer using script executor](./cyclic_cc_script_runner_cifar10.py) Implementation of [cyclic weight transfer](https://arxiv.org/abs/1709.05929) using the [Client API](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/client_api.html) ```commandline -python cyclic_cc_script_executor_cifar10.py +python cyclic_cc_script_runner_cifar10.py ``` ### 5. [Federated averaging using model learning](./fedavg_model_learner_xsite_val_cifar10.py)) Implementation of [FedAvg](https://arxiv.org/abs/1602.05629) using the [model learner class](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/model_learner.html), diff --git a/examples/advanced/job_api/pt/cyclic_cc_script_executor_cifar10.py b/examples/advanced/job_api/pt/cyclic_cc_script_runner_cifar10.py similarity index 91% rename from examples/advanced/job_api/pt/cyclic_cc_script_executor_cifar10.py rename to examples/advanced/job_api/pt/cyclic_cc_script_runner_cifar10.py index 95a21e4a1c..51f6ac9d8d 100644 --- a/examples/advanced/job_api/pt/cyclic_cc_script_executor_cifar10.py +++ b/examples/advanced/job_api/pt/cyclic_cc_script_runner_cifar10.py @@ -16,8 +16,8 @@ from nvflare.app_common.ccwf.ccwf_job import CCWFJob, CyclicClientConfig, CyclicServerConfig from nvflare.app_common.ccwf.comps.simple_model_shareable_generator import SimpleModelShareableGenerator -from nvflare.app_common.executors.script_executor import ScriptExecutor from nvflare.app_opt.pt.file_model_persistor import PTFileModelPersistor +from nvflare.job_config.script_runner import ScriptRunner if __name__ == "__main__": n_clients = 2 @@ -29,7 +29,7 @@ job.add_cyclic( server_config=CyclicServerConfig(num_rounds=num_rounds, max_status_report_interval=300), client_config=CyclicClientConfig( - executor=ScriptExecutor(task_script_path=train_script), + executor=ScriptRunner(script=train_script), persistor=PTFileModelPersistor(model=Net()), shareable_generator=SimpleModelShareableGenerator(), ), diff --git a/examples/advanced/job_api/pt/fedavg_script_executor_cifar10.py b/examples/advanced/job_api/pt/fedavg_script_runner_cifar10.py similarity index 88% rename from examples/advanced/job_api/pt/fedavg_script_executor_cifar10.py rename to examples/advanced/job_api/pt/fedavg_script_runner_cifar10.py index 14e97d3f54..4d3c726894 100644 --- a/examples/advanced/job_api/pt/fedavg_script_executor_cifar10.py +++ b/examples/advanced/job_api/pt/fedavg_script_runner_cifar10.py @@ -14,12 +14,12 @@ from src.net import Net -from nvflare.app_common.executors.script_executor import ScriptExecutor from nvflare.app_common.workflows.fedavg import FedAvg from nvflare.app_opt.pt.job_config.model import PTModel # from nvflare.app_opt.pt.job_config.fed_avg import FedAvgJob from nvflare.job_config.api import FedJob +from nvflare.job_config.script_runner import ScriptRunner if __name__ == "__main__": n_clients = 2 @@ -44,8 +44,8 @@ # Add clients for i in range(n_clients): - executor = ScriptExecutor( - task_script_path=train_script, task_script_args="" # f"--batch_size 32 --data_path /tmp/data/site-{i}" + executor = ScriptRunner( + script=train_script, script_args="" # f"--batch_size 32 --data_path /tmp/data/site-{i}" ) job.to(executor, target=f"site-{i}") # job.to_clients(executor) diff --git a/examples/advanced/job_api/pt/fedavg_script_executor_dp_filter_cifar10.py b/examples/advanced/job_api/pt/fedavg_script_runner_dp_filter_cifar10.py similarity index 89% rename from examples/advanced/job_api/pt/fedavg_script_executor_dp_filter_cifar10.py rename to examples/advanced/job_api/pt/fedavg_script_runner_dp_filter_cifar10.py index 11e4b863b3..0f4463d362 100644 --- a/examples/advanced/job_api/pt/fedavg_script_executor_dp_filter_cifar10.py +++ b/examples/advanced/job_api/pt/fedavg_script_runner_dp_filter_cifar10.py @@ -15,9 +15,9 @@ from src.net import Net from nvflare import FilterType -from nvflare.app_common.executors.script_executor import ScriptExecutor from nvflare.app_common.filters.percentile_privacy import PercentilePrivacy from nvflare.app_opt.pt.job_config.fed_avg import FedAvgJob +from nvflare.job_config.script_runner import ScriptRunner if __name__ == "__main__": n_clients = 2 @@ -27,7 +27,7 @@ job = FedAvgJob(name="cifar10_fedavg_privacy", num_rounds=num_rounds, n_clients=n_clients, initial_model=Net()) for i in range(n_clients): - executor = ScriptExecutor(task_script_path=train_script, task_script_args="") + executor = ScriptRunner(script=train_script, script_args="") job.to(executor, f"site-{i}", tasks=["train"]) # add privacy filter. diff --git a/examples/advanced/job_api/pt/fedavg_script_executor_lightning_cifar10.py b/examples/advanced/job_api/pt/fedavg_script_runner_lightning_cifar10.py similarity index 83% rename from examples/advanced/job_api/pt/fedavg_script_executor_lightning_cifar10.py rename to examples/advanced/job_api/pt/fedavg_script_runner_lightning_cifar10.py index a388c1bf6a..dfc5b5d20a 100644 --- a/examples/advanced/job_api/pt/fedavg_script_executor_lightning_cifar10.py +++ b/examples/advanced/job_api/pt/fedavg_script_runner_lightning_cifar10.py @@ -14,8 +14,8 @@ from src.lit_net import LitNet -from nvflare.app_common.executors.script_executor import ScriptExecutor from nvflare.app_opt.pt.job_config.fed_avg import FedAvgJob +from nvflare.job_config.script_runner import ScriptRunner if __name__ == "__main__": n_clients = 2 @@ -26,8 +26,8 @@ # Add clients for i in range(n_clients): - executor = ScriptExecutor( - task_script_path=train_script, task_script_args="" # f"--batch_size 32 --data_path /tmp/data/site-{i}" + executor = ScriptRunner( + script=train_script, script_args="" # f"--batch_size 32 --data_path /tmp/data/site-{i}" ) job.to(executor, f"site-{i}") diff --git a/examples/advanced/job_api/pt/swarm_script_executor_cifar10.py b/examples/advanced/job_api/pt/swarm_script_runner_cifar10.py similarity index 91% rename from examples/advanced/job_api/pt/swarm_script_executor_cifar10.py rename to examples/advanced/job_api/pt/swarm_script_runner_cifar10.py index ef20a10bab..3730ac72ba 100644 --- a/examples/advanced/job_api/pt/swarm_script_executor_cifar10.py +++ b/examples/advanced/job_api/pt/swarm_script_runner_cifar10.py @@ -18,8 +18,8 @@ from nvflare.app_common.aggregators.intime_accumulate_model_aggregator import InTimeAccumulateWeightedAggregator from nvflare.app_common.ccwf.ccwf_job import CCWFJob, CrossSiteEvalConfig, SwarmClientConfig, SwarmServerConfig from nvflare.app_common.ccwf.comps.simple_model_shareable_generator import SimpleModelShareableGenerator -from nvflare.app_common.executors.script_executor import ScriptExecutor from nvflare.app_opt.pt.file_model_persistor import PTFileModelPersistor +from nvflare.job_config.script_runner import ScriptRunner if __name__ == "__main__": n_clients = 2 @@ -31,7 +31,7 @@ job.add_swarm( server_config=SwarmServerConfig(num_rounds=num_rounds), client_config=SwarmClientConfig( - executor=ScriptExecutor(task_script_path=train_script, evaluate_task_name="validate"), + executor=ScriptRunner(script=train_script, evaluate_task_name="validate"), aggregator=aggregator, persistor=PTFileModelPersistor(model=Net()), shareable_generator=SimpleModelShareableGenerator(), diff --git a/examples/advanced/job_api/sklearn/README.md b/examples/advanced/job_api/sklearn/README.md index e08d217b7c..4c2098d616 100644 --- a/examples/advanced/job_api/sklearn/README.md +++ b/examples/advanced/job_api/sklearn/README.md @@ -15,10 +15,10 @@ You can also run any of the below scripts directly using ```commandline python "script_name.py" ``` -### 1. [Federated K-Means Clustering](./kmeans_script_executor_higgs.py) +### 1. [Federated K-Means Clustering](./kmeans_script_runner_higgs.py) Implementation of [K-Means](https://arxiv.org/abs/1602.05629). For more details see this [example](../../../advanced/sklearn-kmeans/README.md) ```commandline -python kmeans_script_executor_higgs.py +python kmeans_script_runner_higgs.py ``` > [!NOTE] diff --git a/examples/advanced/job_api/sklearn/kmeans_script_executor_higgs.py b/examples/advanced/job_api/sklearn/kmeans_script_runner_higgs.py similarity index 96% rename from examples/advanced/job_api/sklearn/kmeans_script_executor_higgs.py rename to examples/advanced/job_api/sklearn/kmeans_script_runner_higgs.py index 972cf8761c..39663be315 100644 --- a/examples/advanced/job_api/sklearn/kmeans_script_executor_higgs.py +++ b/examples/advanced/job_api/sklearn/kmeans_script_runner_higgs.py @@ -20,11 +20,11 @@ from nvflare import FedJob from nvflare.app_common.aggregators.collect_and_assemble_aggregator import CollectAndAssembleAggregator -from nvflare.app_common.executors.script_executor import ScriptExecutor from nvflare.app_common.shareablegenerators.full_model_shareable_generator import FullModelShareableGenerator from nvflare.app_common.workflows.scatter_and_gather import ScatterAndGather from nvflare.app_opt.sklearn.joblib_model_param_persistor import JoblibModelParamPersistor from nvflare.client.config import ExchangeFormat +from nvflare.job_config.script_runner import ScriptRunner preprocess = True # if False, assume data is already preprocessed and split @@ -137,9 +137,9 @@ def split_higgs(input_data_path, input_header_path, output_dir, site_num, sample # Add clients for i in range(n_clients): - executor = ScriptExecutor( - task_script_path=train_script, - task_script_args=f"--data_root_dir {data_output_dir}", + executor = ScriptRunner( + script=train_script, + script_args=f"--data_root_dir {data_output_dir}", params_exchange_format=ExchangeFormat.RAW, # kmeans requires raw values only rather than PyTorch Tensors (the default) ) job.to(executor, f"site-{i+1}") # HIGGs data splitter assumes site names start from 1 diff --git a/examples/advanced/job_api/tf/README.md b/examples/advanced/job_api/tf/README.md index 4cc57b4f79..24561a6289 100644 --- a/examples/advanced/job_api/tf/README.md +++ b/examples/advanced/job_api/tf/README.md @@ -19,10 +19,10 @@ In this example, the latest Client APIs were used to implement client-side training logics (details in file [`cifar10_tf_fl_alpha_split.py`](src/cifar10_tf_fl_alpha_split.py)), and the new -[`FedJob`](https://github.com/NVIDIA/NVFlare/blob/main/nvflare/job_config/fed_job.py#L106) +[`FedJob`](https://github.com/NVIDIA/NVFlare/blob/main/nvflare/job_config/api.py) APIs were used to programmatically set up an `nvflare` job to be exported or ran by simulator (details in file -[`tf_fl_script_executor_cifar10.py`](tf_fl_script_executor_cifar10.py)), +[`tf_fl_script_runner_cifar10.py`](tf_fl_script_runner_cifar10.py)), alleviating the need of writing job config files, simplifying development process. @@ -41,7 +41,7 @@ pip install -r ./requirements.txt ## 2. Run experiments This example uses simulator to run all experiments. The script -[`tf_fl_script_executor_cifar10.py`](tf_fl_script_executor_cifar10.py) +[`tf_fl_script_runner_cifar10.py`](tf_fl_script_runner_cifar10.py) is the main script to be used to launch different experiments with different arguments (see sections below for details). A script [`run_jobs.sh`](run_jobs.sh) is also provided to run all experiments @@ -55,7 +55,7 @@ any experiment, and you can use `Tensorboard` to visualize the training and validation process as the experiment runs. Data split files, summary logs and results will be saved in a workspace directory, which defaults to `/tmp` and can be configured by setting -`--workspace` argument of the `tf_fl_script_executor_cifar10.py` +`--workspace` argument of the `tf_fl_script_runner_cifar10.py` script. > [!WARNING] @@ -82,7 +82,7 @@ To simulate a centralized training baseline, we run FedAvg algorithm with 1 client for 25 rounds, where each round consists of one single epoch. ``` -python ./tf_fl_script_executor_cifar10.py \ +python ./tf_fl_script_runner_cifar10.py \ --algo centralized \ --n_clients 1 \ --num_rounds 25 \ @@ -101,7 +101,7 @@ in the centralized baseline above (50*4 divided by 8 clients is 25): ``` for alpha in 1.0 0.5 0.3 0.1; do - python ./tf_fl_script_executor_cifar10.py \ + python ./tf_fl_script_runner_cifar10.py \ --algo fedavg \ --n_clients 8 \ --num_rounds 50 \ @@ -120,7 +120,7 @@ Next, let's try some different FL algorithms on a more heterogeneous split: side to update the global model from client-side gradients. Here we use SGD with momentum and cosine learning rate decay: ``` -python ./tf_fl_script_executor_cifar10.py \ +python ./tf_fl_script_runner_cifar10.py \ --algo fedopt \ --n_clients 8 \ --num_rounds 50 \ @@ -130,7 +130,7 @@ python ./tf_fl_script_executor_cifar10.py \ ``` [FedProx](https://arxiv.org/abs/1812.06127) adds a regularizer to the loss: ``` -python ./tf_fl_script_executor_cifar10.py \ +python ./tf_fl_script_runner_cifar10.py \ --algo fedprox \ --n_clients 8 \ --num_rounds 50 \ @@ -145,7 +145,7 @@ during local training following the described in [Li et al.](https://arxiv.org/abs/2102.02079) ``` -python ./tf_fl_script_executor_cifar10.py \ +python ./tf_fl_script_runner_cifar10.py \ --algo scaffold \ --n_clients 8 \ --num_rounds 50 \ diff --git a/examples/advanced/job_api/tf/tf_fl_script_executor_cifar10.py b/examples/advanced/job_api/tf/tf_fl_script_runner_cifar10.py similarity index 96% rename from examples/advanced/job_api/tf/tf_fl_script_executor_cifar10.py rename to examples/advanced/job_api/tf/tf_fl_script_runner_cifar10.py index 709c3a9237..683878ca62 100644 --- a/examples/advanced/job_api/tf/tf_fl_script_executor_cifar10.py +++ b/examples/advanced/job_api/tf/tf_fl_script_runner_cifar10.py @@ -21,8 +21,8 @@ from src.tf_net import ModerateTFNet from nvflare import FedJob -from nvflare.app_common.executors.script_executor import ScriptExecutor from nvflare.app_opt.tf.job_config.model import TFModel +from nvflare.job_config.script_runner import ScriptRunner gpu_devices = tf.config.experimental.list_physical_devices("GPU") for device in gpu_devices: @@ -156,7 +156,7 @@ # Add clients for i, train_idx_path in enumerate(train_idx_paths): curr_task_script_args = task_script_args + f" --train_idx_path {train_idx_path}" - executor = ScriptExecutor(task_script_path=train_script, task_script_args=curr_task_script_args) + executor = ScriptRunner(script=train_script, script_args=curr_task_script_args) job.to(executor, f"site-{i+1}") # Can export current job to folder. diff --git a/examples/getting_started/pt/README.md b/examples/getting_started/pt/README.md index 4509bb7635..25c573b25a 100644 --- a/examples/getting_started/pt/README.md +++ b/examples/getting_started/pt/README.md @@ -4,65 +4,3 @@ We provide several examples to quickly get you started using NVFlare's Job API. All examples in this folder are based on using [PyTorch](https://pytorch.org/) as the model training framework. Furthermore, we support [PyTorch Lightning](https://lightning.ai). - -## Setup environment -First, install nvflare and dependencies: -```commandline -pip install -r requirements.txt -``` - -## Tutorials -A good starting point for understanding the Job API scripts and NVFlare components are the following tutorials. -### 1. [Federated averaging using script executor](./nvflare_pt_getting_started.ipynb) -Tutorial on [FedAvg](https://arxiv.org/abs/1602.05629) using the [Client API](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/client_api.html). - -### 2. [Federated averaging using script executor with Lightning API](./nvflare_lightning_getting_started.ipynb) -Tutorial on [FedAvg](https://arxiv.org/abs/1602.05629) using the [Lightning Client API](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/client_api.html#id4) - -## Examples -You can also run any of the below scripts directly using -```commandline -python "script_name.py" -``` -### 1. [Federated averaging using script executor](./fedavg_script_executor_cifar10.py) -Implementation of [FedAvg](https://arxiv.org/abs/1602.05629) using the [Client API](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/client_api.html). -```commandline -python fedavg_script_executor_cifar10.py -``` -### 2. [Federated averaging using script executor with Lightning API](./fedavg_script_executor_lightning_cifar10.py) -Implementation of [FedAvg](https://arxiv.org/abs/1602.05629) using the [Lightning Client API](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/client_api.html#id4) -```commandline -python fedavg_script_executor_lightning_cifar10.py -``` -### 3. [Federated averaging using the script executor for all clients](./fedavg_script_executor_cifar10_all.py) -Implementation of [FedAvg](https://arxiv.org/abs/1602.05629) using the [Client API](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/client_api.html). -Here, we deploy the same configuration to all clients. -```commandline -python fedavg_script_executor_cifar10_all.py -``` -### 4. [Federated averaging using script executor and differential privacy filter](./fedavg_script_executor_dp_filter_cifar10.py) -Implementation of [FedAvg](https://arxiv.org/abs/1602.05629) using the [Client API](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/client_api.html) -with additional [differential privacy filters](https://arxiv.org/abs/1910.00962) on the client side. -```commandline -python fedavg_script_executor_dp_filter_cifar10.py -``` -### 5. [Swarm learning using script executor](./swarm_script_executor_cifar10.py) -Implementation of [swarm learning](https://www.nature.com/articles/s41586-021-03583-3) using the [Client API](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/client_api.html) -```commandline -python swarm_script_executor_cifar10.py -``` -### 6. [Cyclic weight transfer using script executor](./cyclic_cc_script_executor_cifar10.py) -Implementation of [cyclic weight transfer](https://arxiv.org/abs/1709.05929) using the [Client API](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/client_api.html) -```commandline -python cyclic_cc_script_executor_cifar10.py -``` -### 7. [Federated averaging using model learning](./fedavg_model_learner_xsite_val_cifar10.py)) -Implementation of [FedAvg](https://arxiv.org/abs/1602.05629) using the [model learner class](https://nvflare.readthedocs.io/en/main/programming_guide/execution_api_type/model_learner.html), -followed by [cross site validation](https://nvflare.readthedocs.io/en/main/programming_guide/controllers/cross_site_model_evaluation.html) -for federated model evaluation. -```commandline -python fedavg_model_learner_xsite_val_cifar10.py -``` - -> [!NOTE] -> More examples can be found at https://nvidia.github.io/NVFlare. diff --git a/examples/getting_started/pt/nvflare_lightning_getting_started.ipynb b/examples/getting_started/pt/nvflare_lightning_getting_started.ipynb index 8eda30bd51..11c43227d5 100644 --- a/examples/getting_started/pt/nvflare_lightning_getting_started.ipynb +++ b/examples/getting_started/pt/nvflare_lightning_getting_started.ipynb @@ -330,7 +330,7 @@ "outputs": [], "source": [ "from nvflare import FedJob\n", - "from nvflare.app_common.executors.script_executor import ScriptExecutor\n", + "from nvflare.job_config.script_runner import ScriptRunner\n", "from nvflare.app_common.workflows.fedavg import FedAvg\n", "\n", "job = FedJob(name=\"cifar10_fedavg_lightning\")" @@ -411,8 +411,8 @@ "outputs": [], "source": [ "for i in range(n_clients):\n", - " executor = ScriptExecutor(\n", - " task_script_path=\"src/cifar10_lightning_fl.py\", task_script_args=\"\" # f\"--batch_size 32 --data_path /tmp/data/site-{i}\"\n", + " executor = ScriptRunner(\n", + " script=\"src/cifar10_lightning_fl.py\", script_args=\"\" # f\"--batch_size 32 --data_path /tmp/data/site-{i}\"\n", " )\n", " job.to(executor, f\"site-{i+1}\")" ] diff --git a/examples/getting_started/pt/nvflare_pt_getting_started.ipynb b/examples/getting_started/pt/nvflare_pt_getting_started.ipynb index 8eae8229f2..68ecd364a5 100644 --- a/examples/getting_started/pt/nvflare_pt_getting_started.ipynb +++ b/examples/getting_started/pt/nvflare_pt_getting_started.ipynb @@ -272,7 +272,7 @@ "outputs": [], "source": [ "from nvflare import FedJob\n", - "from nvflare.app_common.executors.script_executor import ScriptExecutor\n", + "from nvflare.job_config.script_runner import ScriptRunner\n", "from nvflare.app_common.workflows.fedavg import FedAvg\n", "\n", "job = FedJob(name=\"cifar10_fedavg\")" @@ -339,7 +339,7 @@ "metadata": {}, "source": [ "#### 5. Add clients\n", - "Next, we can use the `ScriptExecutor` and send it to each of the clients to run our training script.\n", + "Next, we can use the `ScriptRunner` and send it to each of the clients to run our training script.\n", "\n", "Note that our script could have additional input arguments, such as batch size or data path, but we don't use them here for simplicity.\n", "We can also specify, which GPU should be used to run this client, which is helpful for simulated environments." @@ -353,8 +353,8 @@ "outputs": [], "source": [ "for i in range(n_clients):\n", - " executor = ScriptExecutor(\n", - " task_script_path=\"src/cifar10_fl.py\", task_script_args=\"\" # f\"--batch_size 32 --data_path /tmp/data/site-{i}\"\n", + " executor = ScriptRunner(\n", + " script=\"src/cifar10_fl.py\", script_args=\"\" # f\"--batch_size 32 --data_path /tmp/data/site-{i}\"\n", " )\n", " job.to(executor, f\"site-{i+1}\")" ] @@ -502,19 +502,19 @@ "2024-08-16 12:29:30,393 - Communicator - INFO - Received from simulator_server server. getTask: train size: 251.5KB (251471 Bytes) time: 0.049074 seconds\n", "2024-08-16 12:29:30,394 - FederatedClient - INFO - pull_task completed. Task name:train Status:True \n", "2024-08-16 12:29:30,394 - ClientRunner - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job]: got task assignment: name=train, id=c2ea9c72-12a7-4c56-bdac-7dfc83524f20\n", - "2024-08-16 12:29:30,394 - ClientRunner - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=c2ea9c72-12a7-4c56-bdac-7dfc83524f20]: invoking task executor ScriptExecutor\n", - "2024-08-16 12:29:30,394 - ScriptExecutor - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=c2ea9c72-12a7-4c56-bdac-7dfc83524f20]: execute for task (train)\n", - "2024-08-16 12:29:30,394 - ScriptExecutor - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=c2ea9c72-12a7-4c56-bdac-7dfc83524f20]: send data to peer\n", - "2024-08-16 12:29:30,394 - ScriptExecutor - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=c2ea9c72-12a7-4c56-bdac-7dfc83524f20]: sending payload to peer\n", - "2024-08-16 12:29:30,394 - ScriptExecutor - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=c2ea9c72-12a7-4c56-bdac-7dfc83524f20]: Waiting for result from peer\n", + "2024-08-16 12:29:30,394 - ClientRunner - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=c2ea9c72-12a7-4c56-bdac-7dfc83524f20]: invoking task executor ScriptRunner\n", + "2024-08-16 12:29:30,394 - ScriptRunner - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=c2ea9c72-12a7-4c56-bdac-7dfc83524f20]: execute for task (train)\n", + "2024-08-16 12:29:30,394 - ScriptRunner - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=c2ea9c72-12a7-4c56-bdac-7dfc83524f20]: send data to peer\n", + "2024-08-16 12:29:30,394 - ScriptRunner - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=c2ea9c72-12a7-4c56-bdac-7dfc83524f20]: sending payload to peer\n", + "2024-08-16 12:29:30,394 - ScriptRunner - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=c2ea9c72-12a7-4c56-bdac-7dfc83524f20]: Waiting for result from peer\n", "2024-08-16 12:29:30,412 - Communicator - INFO - Received from simulator_server server. getTask: train size: 251.5KB (251471 Bytes) time: 0.049131 seconds\n", "2024-08-16 12:29:30,412 - FederatedClient - INFO - pull_task completed. Task name:train Status:True \n", "2024-08-16 12:29:30,412 - ClientRunner - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job]: got task assignment: name=train, id=8e246740-698d-406e-8cab-9f91a762db52\n", - "2024-08-16 12:29:30,412 - ClientRunner - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=8e246740-698d-406e-8cab-9f91a762db52]: invoking task executor ScriptExecutor\n", - "2024-08-16 12:29:30,413 - ScriptExecutor - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=8e246740-698d-406e-8cab-9f91a762db52]: execute for task (train)\n", - "2024-08-16 12:29:30,413 - ScriptExecutor - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=8e246740-698d-406e-8cab-9f91a762db52]: send data to peer\n", - "2024-08-16 12:29:30,413 - ScriptExecutor - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=8e246740-698d-406e-8cab-9f91a762db52]: sending payload to peer\n", - "2024-08-16 12:29:30,413 - ScriptExecutor - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=8e246740-698d-406e-8cab-9f91a762db52]: Waiting for result from peer\n", + "2024-08-16 12:29:30,412 - ClientRunner - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=8e246740-698d-406e-8cab-9f91a762db52]: invoking task executor ScriptRunner\n", + "2024-08-16 12:29:30,413 - ScriptRunner - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=8e246740-698d-406e-8cab-9f91a762db52]: execute for task (train)\n", + "2024-08-16 12:29:30,413 - ScriptRunner - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=8e246740-698d-406e-8cab-9f91a762db52]: send data to peer\n", + "2024-08-16 12:29:30,413 - ScriptRunner - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=8e246740-698d-406e-8cab-9f91a762db52]: sending payload to peer\n", + "2024-08-16 12:29:30,413 - ScriptRunner - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=8e246740-698d-406e-8cab-9f91a762db52]: Waiting for result from peer\n", "2024-08-16 12:29:30,570 - nvflare.app_common.executors.task_script_runner - INFO - Files already downloaded and verified\n", "2024-08-16 12:29:30,591 - nvflare.app_common.executors.task_script_runner - INFO - Files already downloaded and verified\n", "2024-08-16 12:29:31,221 - nvflare.app_common.executors.task_script_runner - INFO - Files already downloaded and verified\n", @@ -589,11 +589,11 @@ "2024-08-16 12:30:22,244 - Communicator - INFO - Received from simulator_server server. getTask: train size: 251.5KB (251536 Bytes) time: 0.009714 seconds\n", "2024-08-16 12:30:22,245 - FederatedClient - INFO - pull_task completed. Task name:train Status:True \n", "2024-08-16 12:30:22,245 - ClientRunner - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job]: got task assignment: name=train, id=913b8035-b820-4e38-9efb-ba761ae5b0d8\n", - "2024-08-16 12:30:22,245 - ClientRunner - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=913b8035-b820-4e38-9efb-ba761ae5b0d8]: invoking task executor ScriptExecutor\n", - "2024-08-16 12:30:22,245 - ScriptExecutor - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=913b8035-b820-4e38-9efb-ba761ae5b0d8]: execute for task (train)\n", - "2024-08-16 12:30:22,245 - ScriptExecutor - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=913b8035-b820-4e38-9efb-ba761ae5b0d8]: send data to peer\n", - "2024-08-16 12:30:22,245 - ScriptExecutor - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=913b8035-b820-4e38-9efb-ba761ae5b0d8]: sending payload to peer\n", - "2024-08-16 12:30:22,246 - ScriptExecutor - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=913b8035-b820-4e38-9efb-ba761ae5b0d8]: Waiting for result from peer\n", + "2024-08-16 12:30:22,245 - ClientRunner - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=913b8035-b820-4e38-9efb-ba761ae5b0d8]: invoking task executor ScriptRunner\n", + "2024-08-16 12:30:22,245 - ScriptRunner - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=913b8035-b820-4e38-9efb-ba761ae5b0d8]: execute for task (train)\n", + "2024-08-16 12:30:22,245 - ScriptRunner - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=913b8035-b820-4e38-9efb-ba761ae5b0d8]: send data to peer\n", + "2024-08-16 12:30:22,245 - ScriptRunner - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=913b8035-b820-4e38-9efb-ba761ae5b0d8]: sending payload to peer\n", + "2024-08-16 12:30:22,246 - ScriptRunner - INFO - [identity=site-2, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=913b8035-b820-4e38-9efb-ba761ae5b0d8]: Waiting for result from peer\n", "2024-08-16 12:30:22,419 - nvflare.app_common.executors.task_script_runner - INFO - current_round=1\n", "2024-08-16 12:30:23,179 - ServerRunner - INFO - [identity=simulator_server, run=simulate_job, wf=controller, peer=site-1, peer_run=simulate_job, task_name=train, task_id=410b5bfc-db8e-4149-b890-64ee730c4fa4]: assigned task to client site-1: name=train, id=410b5bfc-db8e-4149-b890-64ee730c4fa4\n", "2024-08-16 12:30:23,180 - ServerRunner - INFO - [identity=simulator_server, run=simulate_job, wf=controller, peer=site-1, peer_run=simulate_job, task_name=train, task_id=410b5bfc-db8e-4149-b890-64ee730c4fa4]: sent task assignment to client. client_name:site-1 task_id:410b5bfc-db8e-4149-b890-64ee730c4fa4\n", @@ -601,11 +601,11 @@ "2024-08-16 12:30:23,185 - Communicator - INFO - Received from simulator_server server. getTask: train size: 251.5KB (251536 Bytes) time: 0.006905 seconds\n", "2024-08-16 12:30:23,185 - FederatedClient - INFO - pull_task completed. Task name:train Status:True \n", "2024-08-16 12:30:23,185 - ClientRunner - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job]: got task assignment: name=train, id=410b5bfc-db8e-4149-b890-64ee730c4fa4\n", - "2024-08-16 12:30:23,185 - ClientRunner - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=410b5bfc-db8e-4149-b890-64ee730c4fa4]: invoking task executor ScriptExecutor\n", - "2024-08-16 12:30:23,185 - ScriptExecutor - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=410b5bfc-db8e-4149-b890-64ee730c4fa4]: execute for task (train)\n", - "2024-08-16 12:30:23,185 - ScriptExecutor - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=410b5bfc-db8e-4149-b890-64ee730c4fa4]: send data to peer\n", - "2024-08-16 12:30:23,185 - ScriptExecutor - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=410b5bfc-db8e-4149-b890-64ee730c4fa4]: sending payload to peer\n", - "2024-08-16 12:30:23,186 - ScriptExecutor - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=410b5bfc-db8e-4149-b890-64ee730c4fa4]: Waiting for result from peer\n", + "2024-08-16 12:30:23,185 - ClientRunner - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=410b5bfc-db8e-4149-b890-64ee730c4fa4]: invoking task executor ScriptRunner\n", + "2024-08-16 12:30:23,185 - ScriptRunner - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=410b5bfc-db8e-4149-b890-64ee730c4fa4]: execute for task (train)\n", + "2024-08-16 12:30:23,185 - ScriptRunner - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=410b5bfc-db8e-4149-b890-64ee730c4fa4]: send data to peer\n", + "2024-08-16 12:30:23,185 - ScriptRunner - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=410b5bfc-db8e-4149-b890-64ee730c4fa4]: sending payload to peer\n", + "2024-08-16 12:30:23,186 - ScriptRunner - INFO - [identity=site-1, run=simulate_job, peer=simulator_server, peer_run=simulate_job, task_name=train, task_id=410b5bfc-db8e-4149-b890-64ee730c4fa4]: Waiting for result from peer\n", "2024-08-16 12:30:23,668 - nvflare.app_common.executors.task_script_runner - INFO - current_round=1\n", "2024-08-16 12:30:25,721 - nvflare.app_common.executors.task_script_runner - INFO - [1, 2000] loss: 1.231\n", "2024-08-16 12:30:27,161 - nvflare.app_common.executors.task_script_runner - INFO - [1, 2000] loss: 1.226\n", diff --git a/examples/getting_started/sklearn/README.md b/examples/getting_started/sklearn/README.md index 9c73e9fe34..741dde91f4 100644 --- a/examples/getting_started/sklearn/README.md +++ b/examples/getting_started/sklearn/README.md @@ -15,10 +15,10 @@ You can also run any of the below scripts directly using ```commandline python "script_name.py" ``` -### 1. [Federated K-Means Clustering](./kmeans_script_executor_higgs.py) +### 1. [Federated K-Means Clustering](./kmeans_script_runner_higgs.py) Implementation of [K-Means](https://arxiv.org/abs/1602.05629). For more details see this [example](../../advanced/sklearn-kmeans/README.md). ```commandline -python kmeans_script_executor_higgs.py +python kmeans_script_runner_higgs.py ``` > [!NOTE] diff --git a/examples/getting_started/sklearn/kmeans_script_executor_higgs.py b/examples/getting_started/sklearn/kmeans_script_runner_higgs.py similarity index 96% rename from examples/getting_started/sklearn/kmeans_script_executor_higgs.py rename to examples/getting_started/sklearn/kmeans_script_runner_higgs.py index 972cf8761c..39663be315 100644 --- a/examples/getting_started/sklearn/kmeans_script_executor_higgs.py +++ b/examples/getting_started/sklearn/kmeans_script_runner_higgs.py @@ -20,11 +20,11 @@ from nvflare import FedJob from nvflare.app_common.aggregators.collect_and_assemble_aggregator import CollectAndAssembleAggregator -from nvflare.app_common.executors.script_executor import ScriptExecutor from nvflare.app_common.shareablegenerators.full_model_shareable_generator import FullModelShareableGenerator from nvflare.app_common.workflows.scatter_and_gather import ScatterAndGather from nvflare.app_opt.sklearn.joblib_model_param_persistor import JoblibModelParamPersistor from nvflare.client.config import ExchangeFormat +from nvflare.job_config.script_runner import ScriptRunner preprocess = True # if False, assume data is already preprocessed and split @@ -137,9 +137,9 @@ def split_higgs(input_data_path, input_header_path, output_dir, site_num, sample # Add clients for i in range(n_clients): - executor = ScriptExecutor( - task_script_path=train_script, - task_script_args=f"--data_root_dir {data_output_dir}", + executor = ScriptRunner( + script=train_script, + script_args=f"--data_root_dir {data_output_dir}", params_exchange_format=ExchangeFormat.RAW, # kmeans requires raw values only rather than PyTorch Tensors (the default) ) job.to(executor, f"site-{i+1}") # HIGGs data splitter assumes site names start from 1 diff --git a/examples/getting_started/tf/README.md b/examples/getting_started/tf/README.md index 0d6c8be4dc..5d4e7de334 100644 --- a/examples/getting_started/tf/README.md +++ b/examples/getting_started/tf/README.md @@ -18,10 +18,10 @@ In this example, the latest Client APIs were used to implement client-side training logics (details in file [`cifar10_tf_fl_alpha_split.py`](src/cifar10_tf_fl_alpha_split.py)), and the new -[`FedJob`](https://github.com/NVIDIA/NVFlare/blob/main/nvflare/job_config/fed_job.py#L106) +[`FedJob`](https://github.com/NVIDIA/NVFlare/blob/main/nvflare/job_config/api.py) APIs were used to programmatically set up an `nvflare` job to be exported or ran by simulator (details in file -[`tf_fl_script_executor_cifar10.py`](tf_fl_script_executor_cifar10.py)), +[`tf_fl_script_runner_cifar10.py`](tf_fl_script_runner_cifar10.py)), alleviating the need of writing job config files, simplifying development process. @@ -40,7 +40,7 @@ pip install -r ./requirements.txt ## 2. Run experiments This example uses simulator to run all experiments. The script -[`tf_fl_script_executor_cifar10.py`](tf_fl_script_executor_cifar10.py) +[`tf_fl_script_runner_cifar10.py`](tf_fl_script_runner_cifar10.py) is the main script to be used to launch different experiments with different arguments (see sections below for details). A script [`run_jobs.sh`](run_jobs.sh) is also provided to run all experiments @@ -54,7 +54,7 @@ any experiment, and you can use `Tensorboard` to visualize the training and validation process as the experiment runs. Data split files, summary logs and results will be saved in a workspace directory, which defaults to `/tmp` and can be configured by setting -`--workspace` argument of the `tf_fl_script_executor_cifar10.py` +`--workspace` argument of the `tf_fl_script_runner_cifar10.py` script. > [!WARNING] @@ -81,7 +81,7 @@ To simulate a centralized training baseline, we run FedAvg algorithm with 1 client for 25 rounds, where each round consists of one single epoch. ``` -python ./tf_fl_script_executor_cifar10.py \ +python ./tf_fl_script_runner_cifar10.py \ --algo centralized \ --n_clients 1 \ --num_rounds 25 \ @@ -100,7 +100,7 @@ in the centralized baseline above (50*4 divided by 8 clients is 25): ``` for alpha in 1.0 0.5 0.3 0.1; do - python ./tf_fl_script_executor_cifar10.py \ + python ./tf_fl_script_runner_cifar10.py \ --algo fedavg \ --n_clients 8 \ --num_rounds 50 \ diff --git a/examples/getting_started/tf/nvflare_tf_getting_started.ipynb b/examples/getting_started/tf/nvflare_tf_getting_started.ipynb index a185627365..af40feca93 100644 --- a/examples/getting_started/tf/nvflare_tf_getting_started.ipynb +++ b/examples/getting_started/tf/nvflare_tf_getting_started.ipynb @@ -262,7 +262,7 @@ "outputs": [], "source": [ "from nvflare import FedJob\n", - "from nvflare.app_common.executors.script_executor import ScriptExecutor\n", + "from nvflare.job_config.script_runner import ScriptRunner\n", "from nvflare.app_common.workflows.fedavg import FedAvg\n", "\n", "job = FedJob(name=\"cifar10_tf_fedavg\")" @@ -329,7 +329,7 @@ "metadata": {}, "source": [ "#### 5. Add clients\n", - "Next, we can use the `ScriptExecutor` and send it to each of the clients to run our training script.\n", + "Next, we can use the `ScriptRunner` and send it to each of the clients to run our training script.\n", "\n", "Note that our script could have additional input arguments, such as batch size or data path, but we don't use them here for simplicity.\n", "We can also specify, which GPU should be used to run this client, which is helpful for simulated environments." @@ -345,8 +345,8 @@ "from nvflare.client.config import ExchangeFormat\n", "\n", "for i in range(n_clients):\n", - " executor = ScriptExecutor(\n", - " task_script_path=\"src/cifar10_tf_fl.py\", task_script_args=\"\", # f\"--batch_size 32 --data_path /tmp/data/site-{i}\"\n", + " executor = ScriptRunner(\n", + " script=\"src/cifar10_tf_fl.py\", script_args=\"\", # f\"--batch_size 32 --data_path /tmp/data/site-{i}\"\n", " params_exchange_format=ExchangeFormat.NUMPY,\n", " )\n", " job.to(executor, f\"site-{i+1}\")" diff --git a/examples/getting_started/tf/tf_fl_script_executor_cifar10.py b/examples/getting_started/tf/tf_fl_script_runner_cifar10.py similarity index 96% rename from examples/getting_started/tf/tf_fl_script_executor_cifar10.py rename to examples/getting_started/tf/tf_fl_script_runner_cifar10.py index 709c3a9237..683878ca62 100644 --- a/examples/getting_started/tf/tf_fl_script_executor_cifar10.py +++ b/examples/getting_started/tf/tf_fl_script_runner_cifar10.py @@ -21,8 +21,8 @@ from src.tf_net import ModerateTFNet from nvflare import FedJob -from nvflare.app_common.executors.script_executor import ScriptExecutor from nvflare.app_opt.tf.job_config.model import TFModel +from nvflare.job_config.script_runner import ScriptRunner gpu_devices = tf.config.experimental.list_physical_devices("GPU") for device in gpu_devices: @@ -156,7 +156,7 @@ # Add clients for i, train_idx_path in enumerate(train_idx_paths): curr_task_script_args = task_script_args + f" --train_idx_path {train_idx_path}" - executor = ScriptExecutor(task_script_path=train_script, task_script_args=curr_task_script_args) + executor = ScriptRunner(script=train_script, script_args=curr_task_script_args) job.to(executor, f"site-{i+1}") # Can export current job to folder. diff --git a/examples/hello-world/hello-cyclic/README.md b/examples/hello-world/hello-cyclic/README.md index a5e3f748db..bec40d40b2 100644 --- a/examples/hello-world/hello-cyclic/README.md +++ b/examples/hello-world/hello-cyclic/README.md @@ -29,7 +29,7 @@ bash ./prepare_data.sh Run the script using the job API to create the job and run it with the simulator: ``` -python3 cyclic_script_executor_hello-cyclic.py +python3 cyclic_script_runner_hello-cyclic.py ``` ### 3. Access the logs and results diff --git a/examples/hello-world/hello-cyclic/cyclic_script_executor_hello-cyclic.py b/examples/hello-world/hello-cyclic/cyclic_script_runner_hello-cyclic.py similarity index 86% rename from examples/hello-world/hello-cyclic/cyclic_script_executor_hello-cyclic.py rename to examples/hello-world/hello-cyclic/cyclic_script_runner_hello-cyclic.py index 01150af23f..da06776c6e 100644 --- a/examples/hello-world/hello-cyclic/cyclic_script_executor_hello-cyclic.py +++ b/examples/hello-world/hello-cyclic/cyclic_script_runner_hello-cyclic.py @@ -15,10 +15,10 @@ from src.tf_net import Net from nvflare import FedJob -from nvflare.app_common.executors.script_executor import ScriptExecutor from nvflare.app_common.workflows.cyclic import Cyclic from nvflare.app_opt.pt.job_config.model import PTModel from nvflare.client.config import ExchangeFormat +from nvflare.job_config.script_runner import ScriptRunner if __name__ == "__main__": n_clients = 2 @@ -39,9 +39,9 @@ # Add clients for i in range(n_clients): - executor = ScriptExecutor( - task_script_path=train_script, - task_script_args="", # f"--batch_size 32 --data_path /tmp/data/site-{i}" + executor = ScriptRunner( + script=train_script, + script_args="", # f"--batch_size 32 --data_path /tmp/data/site-{i}" params_exchange_format=ExchangeFormat.NUMPY, ) job.to(executor, f"site-{i+1}") diff --git a/examples/hello-world/hello-fedavg-numpy/README.md b/examples/hello-world/hello-fedavg-numpy/README.md index 380efeb094..9cce626e67 100644 --- a/examples/hello-world/hello-fedavg-numpy/README.md +++ b/examples/hello-world/hello-fedavg-numpy/README.md @@ -22,7 +22,7 @@ Follow the [Installation](https://nvflare.readthedocs.io/en/main/quickstart.html Run the script using the job API to create the job and run it with the simulator: ``` -python3 fedavg_script_executor_hello-numpy.py +python3 fedavg_script_runner_hello-numpy.py ``` ### 3. Access the logs and results diff --git a/examples/hello-world/hello-fedavg-numpy/fedavg_script_executor_hello-numpy.py b/examples/hello-world/hello-fedavg-numpy/fedavg_script_runner_hello-numpy.py similarity index 85% rename from examples/hello-world/hello-fedavg-numpy/fedavg_script_executor_hello-numpy.py rename to examples/hello-world/hello-fedavg-numpy/fedavg_script_runner_hello-numpy.py index 90084031b5..48f34a0180 100644 --- a/examples/hello-world/hello-fedavg-numpy/fedavg_script_executor_hello-numpy.py +++ b/examples/hello-world/hello-fedavg-numpy/fedavg_script_runner_hello-numpy.py @@ -13,9 +13,9 @@ # limitations under the License. from nvflare import FedJob -from nvflare.app_common.executors.script_executor import ScriptExecutor from nvflare.app_common.workflows.fedavg import FedAvg from nvflare.client.config import ExchangeFormat +from nvflare.job_config.script_runner import ScriptRunner if __name__ == "__main__": n_clients = 2 @@ -33,9 +33,7 @@ # Add clients for i in range(n_clients): - executor = ScriptExecutor( - task_script_path=train_script, task_script_args="", params_exchange_format=ExchangeFormat.NUMPY - ) + executor = ScriptRunner(script=train_script, script_args="", params_exchange_format=ExchangeFormat.NUMPY) job.to(executor, f"site-{i+1}") # job.export_job("/tmp/nvflare/jobs/job_config") diff --git a/examples/hello-world/hello-fedavg-numpy/hello-fedavg-numpy_flare_api.ipynb b/examples/hello-world/hello-fedavg-numpy/hello-fedavg-numpy_flare_api.ipynb index 06749fad21..e8b09ed8eb 100644 --- a/examples/hello-world/hello-fedavg-numpy/hello-fedavg-numpy_flare_api.ipynb +++ b/examples/hello-world/hello-fedavg-numpy/hello-fedavg-numpy_flare_api.ipynb @@ -84,7 +84,7 @@ "source": [ "### 4. Submit the Job with the FLARE API\n", "\n", - "With a session successfully connected, you can use `submit_job()` to submit your job. You can change `path_to_example_job` to the location of the job you are submitting (make sure you have exported the job with the line containing `job.export_job()` uncommented in [fedavg_script_executor_hello-numpy.py](fedavg_script_executor_hello-numpy.py)). If your session is not active, go back to the previous step and connect with a session.\n", + "With a session successfully connected, you can use `submit_job()` to submit your job. You can change `path_to_example_job` to the location of the job you are submitting (make sure you have exported the job with the line containing `job.export_job()` uncommented in [fedavg_script_runner_hello-numpy.py](fedavg_script_runner_hello-numpy.py)). If your session is not active, go back to the previous step and connect with a session.\n", "\n", "With POC command, we link the examples to the following directory ``` /tmp/nvflare/poc/example_project/prod_00/admin@nvidia.com/transfer```" ] diff --git a/examples/hello-world/hello-fedavg-numpy/hello-fedavg-numpy_getting_started.ipynb b/examples/hello-world/hello-fedavg-numpy/hello-fedavg-numpy_getting_started.ipynb index 78f6413ba4..6d3b0305cb 100644 --- a/examples/hello-world/hello-fedavg-numpy/hello-fedavg-numpy_getting_started.ipynb +++ b/examples/hello-world/hello-fedavg-numpy/hello-fedavg-numpy_getting_started.ipynb @@ -192,7 +192,9 @@ "metadata": {}, "outputs": [], "source": [ - "from nvflare import FedAvg, FedJob, ScriptExecutor\n", + "from nvflare import FedJob\n", + "from nvflare.app_common.workflows.fedavg import FedAvg\n", + "from nvflare.job_config.script_runner import ScriptRunner\n", "\n", "job = FedJob(name=\"hello-fedavg-numpy\")" ] @@ -247,8 +249,8 @@ "train_script = \"src/hello-numpy_fl.py\"\n", "\n", "for i in range(n_clients):\n", - " executor = ScriptExecutor(\n", - " task_script_path=train_script, task_script_args=\"\", params_exchange_format=ExchangeFormat.NUMPY\n", + " executor = ScriptRunner(\n", + " script=train_script, script_args=\"\", params_exchange_format=ExchangeFormat.NUMPY\n", " )\n", " job.to(executor, f\"site-{i}\")" ] diff --git a/examples/hello-world/hello-fedavg/hello-fedavg.ipynb b/examples/hello-world/hello-fedavg/hello-fedavg.ipynb index 1a17c47209..249e350343 100644 --- a/examples/hello-world/hello-fedavg/hello-fedavg.ipynb +++ b/examples/hello-world/hello-fedavg/hello-fedavg.ipynb @@ -240,7 +240,7 @@ "outputs": [], "source": [ "from nvflare import FedJob\n", - "from nvflare.app_common.executors.script_executor import ScriptExecutor\n", + "from nvflare.job_config.script_runner import ScriptRunner\n", "from nvflare.app_opt.pt.fedavg_early_stopping import PTFedAvgEarlyStopping\n", "\n", "job = FedJob(name=\"cifar10_fedavg_early_stopping\")" @@ -280,7 +280,7 @@ "id": "d5050f5d", "metadata": {}, "source": [ - "Use the `ScriptExecutor` and send to each of the clients to run the train script." + "Use the `ScriptRunner` and send to each of the clients to run the train script." ] }, { @@ -294,7 +294,7 @@ "\n", "# Add clients\n", "for i in range(n_clients):\n", - " executor = ScriptExecutor(task_script_path=train_script, task_script_args=\"\")\n", + " executor = ScriptRunner(script=train_script, script_args=\"\")\n", " job.to(executor, f\"site-{i}\")" ] }, diff --git a/examples/hello-world/hello-fedavg/pt_fedavg_early_stopping_script.py b/examples/hello-world/hello-fedavg/pt_fedavg_early_stopping_script.py index c7932b9445..7ec16e8c7b 100644 --- a/examples/hello-world/hello-fedavg/pt_fedavg_early_stopping_script.py +++ b/examples/hello-world/hello-fedavg/pt_fedavg_early_stopping_script.py @@ -15,8 +15,8 @@ from src.net import Net from nvflare import FedJob -from nvflare.app_common.executors.script_executor import ScriptExecutor from nvflare.app_opt.pt.fedavg_early_stopping import PTFedAvgEarlyStopping +from nvflare.job_config.script_runner import ScriptRunner if __name__ == "__main__": n_clients = 2 @@ -36,7 +36,7 @@ # Add clients for i in range(n_clients): - executor = ScriptExecutor(task_script_path=train_script, task_script_args="") + executor = ScriptRunner(script=train_script, script_args="") job.to(executor, f"site-{i}") # job.export_job("/tmp/nvflare/jobs/job_config") diff --git a/examples/hello-world/hello-pt/README.md b/examples/hello-world/hello-pt/README.md index 6795ce2543..92f3cabc12 100644 --- a/examples/hello-world/hello-pt/README.md +++ b/examples/hello-world/hello-pt/README.md @@ -9,7 +9,7 @@ and [PyTorch](https://pytorch.org/) as the deep learning training framework. You can follow the [Getting Started with NVFlare (PyTorch) notebook](../../getting_started/pt/nvflare_pt_getting_started.ipynb) for a detailed walkthrough of the basic concepts. -See the [Hello PyTorch](https://nvflare.readthedocs.io/en/main/examples/hello_pt.html) example documentation page for details on this +See the [Hello PyTorch](https://nvflare.readthedocs.io/en/main/examples/hello_pt_job_api.html#hello-pt-job-api) example documentation page for details on this example. To run this example with the FLARE API, you can follow the [hello_world notebook](../hello_world.ipynb), or you can quickly get @@ -31,7 +31,7 @@ pip3 install -r requirements.txt Run the script using the job API to create the job and run it with the simulator: ``` -python3 fedavg_script_executor_hello-pt.py +python3 fedavg_script_runner_hello-pt.py ``` ### 3. Access the logs and results diff --git a/examples/hello-world/hello-pt/fedavg_script_executor_hello-pt.py b/examples/hello-world/hello-pt/fedavg_script_runner_hello-pt.py similarity index 86% rename from examples/hello-world/hello-pt/fedavg_script_executor_hello-pt.py rename to examples/hello-world/hello-pt/fedavg_script_runner_hello-pt.py index 69798fcc5e..c8bdbe6e4f 100644 --- a/examples/hello-world/hello-pt/fedavg_script_executor_hello-pt.py +++ b/examples/hello-world/hello-pt/fedavg_script_runner_hello-pt.py @@ -15,9 +15,9 @@ from src.simple_network import SimpleNetwork from nvflare import FedJob -from nvflare.app_common.executors.script_executor import ScriptExecutor from nvflare.app_common.workflows.fedavg import FedAvg from nvflare.app_opt.pt.job_config.model import PTModel +from nvflare.job_config.script_runner import ScriptRunner if __name__ == "__main__": n_clients = 2 @@ -38,8 +38,8 @@ # Add clients for i in range(n_clients): - executor = ScriptExecutor( - task_script_path=train_script, task_script_args="" # f"--batch_size 32 --data_path /tmp/data/site-{i}" + executor = ScriptRunner( + script=train_script, script_args="" # f"--batch_size 32 --data_path /tmp/data/site-{i}" ) job.to(executor, f"site-{i+1}") diff --git a/examples/hello-world/hello-tf/README.md b/examples/hello-world/hello-tf/README.md index f909715378..34c2e5b230 100644 --- a/examples/hello-world/hello-tf/README.md +++ b/examples/hello-world/hello-tf/README.md @@ -6,7 +6,7 @@ and [TensorFlow](https://tensorflow.org/) as the deep learning training framewor > **_NOTE:_** This example uses the [MNIST](http://yann.lecun.com/exdb/mnist/) handwritten digits dataset and will load its data within the trainer code. -See the [Hello TensorFlow](https://nvflare.readthedocs.io/en/main/examples/hello_tf.html) example documentation page for details on this +See the [Hello TensorFlow](https://nvflare.readthedocs.io/en/main/examples/hello_tf_job_api.html#hello-tf-job-api) example documentation page for details on this example. To run this example with the FLARE API, you can follow the [hello_world notebook](../hello_world.ipynb), or you can quickly get @@ -27,7 +27,7 @@ pip3 install tensorflow Run the script using the job API to create the job and run it with the simulator: ``` -python3 fedavg_script_executor_hello-tf.py +python3 fedavg_script_runner_hello-tf.py ``` ### 3. Access the logs and results diff --git a/examples/hello-world/hello-tf/fedavg_script_executor_hello-tf.py b/examples/hello-world/hello-tf/fedavg_script_runner_hello-tf.py similarity index 86% rename from examples/hello-world/hello-tf/fedavg_script_executor_hello-tf.py rename to examples/hello-world/hello-tf/fedavg_script_runner_hello-tf.py index 050c87b07d..fa518293d7 100644 --- a/examples/hello-world/hello-tf/fedavg_script_executor_hello-tf.py +++ b/examples/hello-world/hello-tf/fedavg_script_runner_hello-tf.py @@ -15,9 +15,9 @@ from src.tf_net import TFNet from nvflare import FedJob -from nvflare.app_common.executors.script_executor import ScriptExecutor from nvflare.app_common.workflows.fedavg import FedAvg from nvflare.app_opt.tf.job_config.model import TFModel +from nvflare.job_config.script_runner import ScriptRunner if __name__ == "__main__": n_clients = 2 @@ -38,8 +38,8 @@ # Add clients for i in range(n_clients): - executor = ScriptExecutor( - task_script_path=train_script, task_script_args="" # f"--batch_size 32 --data_path /tmp/data/site-{i}" + executor = ScriptRunner( + script=train_script, script_args="" # f"--batch_size 32 --data_path /tmp/data/site-{i}" ) job.to(executor, f"site-{i+1}") diff --git a/examples/hello-world/hello_world.ipynb b/examples/hello-world/hello_world.ipynb index f5b9cca8fc..f470f40fef 100644 --- a/examples/hello-world/hello_world.ipynb +++ b/examples/hello-world/hello_world.ipynb @@ -19,7 +19,7 @@ "job.simulator_run(\"/tmp/nvflare/jobs/workdir\")\n", "```\n", "\n", - "Simply executing the script (for example `python fedavg_script_executor_hello-numpy.py`) will create and run the job with the FLARE Simulator.\n", + "Simply executing the script (for example `python fedavg_script_runner_hello-numpy.py`) will create and run the job with the FLARE Simulator.\n", "\n", "## Running with POC mode and FLARE API\n", "\n", diff --git a/nvflare/app_common/executors/script_executor.py b/nvflare/app_common/executors/script_executor.py deleted file mode 100644 index 591dfd4104..0000000000 --- a/nvflare/app_common/executors/script_executor.py +++ /dev/null @@ -1,109 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Optional - -from nvflare.app_common.app_constant import AppConstants -from nvflare.app_common.executors.in_process_client_api_executor import InProcessClientAPIExecutor -from nvflare.client.config import ExchangeFormat, TransferType -from nvflare.fuel.utils import fobs -from nvflare.fuel.utils.import_utils import optional_import - -torch, torch_ok = optional_import(module="torch") -if torch_ok: - from nvflare.app_opt.pt.decomposers import TensorDecomposer - from nvflare.app_opt.pt.params_converter import NumpyToPTParamsConverter, PTToNumpyParamsConverter - - DEFAULT_PARAMS_EXCHANGE_FORMAT = ExchangeFormat.PYTORCH -else: - DEFAULT_PARAMS_EXCHANGE_FORMAT = ExchangeFormat.NUMPY - -tensorflow, tf_ok = optional_import(module="tensorflow") -if tf_ok: - from nvflare.app_opt.tf.params_converter import KerasModelToNumpyParamsConverter, NumpyToKerasModelParamsConverter - - -class ScriptExecutor(InProcessClientAPIExecutor): - def __init__( - self, - task_script_path: str, - task_script_args: str = "", - task_wait_time: Optional[float] = None, - result_pull_interval: float = 0.5, - log_pull_interval: Optional[float] = None, - params_transfer_type: TransferType = TransferType.FULL, - from_nvflare_converter_id: Optional[str] = None, - to_nvflare_converter_id: Optional[str] = None, - train_with_evaluation: bool = True, - train_task_name: str = "train", - evaluate_task_name: str = "evaluate", - submit_model_task_name: str = "submit_model", - params_exchange_format=DEFAULT_PARAMS_EXCHANGE_FORMAT, - ): - """Wrapper around InProcessClientAPIExecutor for different params_exchange_format. Currently defaulting to `params_exchange_format=ExchangeFormat.PYTORCH`. - - Args: - """ - super(ScriptExecutor, self).__init__( - task_script_path=task_script_path, - task_script_args=task_script_args, - task_wait_time=task_wait_time, - result_pull_interval=result_pull_interval, - train_with_evaluation=train_with_evaluation, - train_task_name=train_task_name, - evaluate_task_name=evaluate_task_name, - submit_model_task_name=submit_model_task_name, - from_nvflare_converter_id=from_nvflare_converter_id, - to_nvflare_converter_id=to_nvflare_converter_id, - params_exchange_format=params_exchange_format, - params_transfer_type=params_transfer_type, - log_pull_interval=log_pull_interval, - ) - self.task_script_path = task_script_path - if torch_ok: - if params_exchange_format == ExchangeFormat.PYTORCH: - fobs.register(TensorDecomposer) - - if self._from_nvflare_converter is None: - self._from_nvflare_converter = NumpyToPTParamsConverter( - [AppConstants.TASK_TRAIN, AppConstants.TASK_VALIDATION] - ) - if self._to_nvflare_converter is None: - self._to_nvflare_converter = PTToNumpyParamsConverter( - [AppConstants.TASK_TRAIN, AppConstants.TASK_SUBMIT_MODEL] - ) - if tf_ok: - if params_exchange_format == ExchangeFormat.NUMPY: - if self._from_nvflare_converter is None: - self._from_nvflare_converter = NumpyToKerasModelParamsConverter( - [AppConstants.TASK_TRAIN, AppConstants.TASK_VALIDATION] - ) - if self._to_nvflare_converter is None: - self._to_nvflare_converter = KerasModelToNumpyParamsConverter( - [AppConstants.TASK_TRAIN, AppConstants.TASK_SUBMIT_MODEL] - ) - # TODO: support other params_exchange_format - - def add_to_fed_job(self, job, ctx, **kwargs): - """This method is used by Job API. - - Args: - job: the Job object to add to - ctx: Job Context - - Returns: - - """ - super().add_to_fed_job(job, ctx, **kwargs) - job.add_resources(resources=[self.task_script_path], ctx=ctx) diff --git a/nvflare/app_opt/tf/client_api_launcher_executor.py b/nvflare/app_opt/tf/client_api_launcher_executor.py new file mode 100644 index 0000000000..59960c2fd2 --- /dev/null +++ b/nvflare/app_opt/tf/client_api_launcher_executor.py @@ -0,0 +1,33 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nvflare.apis.fl_context import FLContext +from nvflare.app_common.app_constant import AppConstants +from nvflare.app_common.executors.client_api_launcher_executor import ClientAPILauncherExecutor +from nvflare.app_opt.tf.params_converter import KerasModelToNumpyParamsConverter, NumpyToKerasModelParamsConverter +from nvflare.client.config import ExchangeFormat + + +class TFClientAPILauncherExecutor(ClientAPILauncherExecutor): + def initialize(self, fl_ctx: FLContext) -> None: + self._params_exchange_format = ExchangeFormat.NUMPY + super().initialize(fl_ctx) + if self._from_nvflare_converter is None: + self._from_nvflare_converter = NumpyToKerasModelParamsConverter( + [AppConstants.TASK_TRAIN, AppConstants.TASK_VALIDATION] + ) + if self._to_nvflare_converter is None: + self._to_nvflare_converter = KerasModelToNumpyParamsConverter( + [AppConstants.TASK_TRAIN, AppConstants.TASK_SUBMIT_MODEL] + ) diff --git a/nvflare/app_opt/tf/in_process_client_api_executor.py b/nvflare/app_opt/tf/in_process_client_api_executor.py new file mode 100644 index 0000000000..5526a61c71 --- /dev/null +++ b/nvflare/app_opt/tf/in_process_client_api_executor.py @@ -0,0 +1,62 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Optional + +from nvflare.app_common.app_constant import AppConstants +from nvflare.app_common.executors.in_process_client_api_executor import InProcessClientAPIExecutor +from nvflare.app_opt.tf.params_converter import KerasModelToNumpyParamsConverter, NumpyToKerasModelParamsConverter +from nvflare.client.config import ExchangeFormat, TransferType + + +class TFInProcessClientAPIExecutor(InProcessClientAPIExecutor): + def __init__( + self, + task_script_path: str, + task_script_args: str = "", + task_wait_time: Optional[float] = None, + result_pull_interval: float = 0.5, + log_pull_interval: Optional[float] = None, + params_transfer_type: TransferType = TransferType.FULL, + from_nvflare_converter_id: Optional[str] = None, + to_nvflare_converter_id: Optional[str] = None, + train_with_evaluation: bool = True, + train_task_name: str = "train", + evaluate_task_name: str = "evaluate", + submit_model_task_name: str = "submit_model", + params_exchange_format=ExchangeFormat.NUMPY, + ): + super(TFInProcessClientAPIExecutor, self).__init__( + task_script_path=task_script_path, + task_script_args=task_script_args, + task_wait_time=task_wait_time, + result_pull_interval=result_pull_interval, + train_with_evaluation=train_with_evaluation, + train_task_name=train_task_name, + evaluate_task_name=evaluate_task_name, + submit_model_task_name=submit_model_task_name, + from_nvflare_converter_id=from_nvflare_converter_id, + to_nvflare_converter_id=to_nvflare_converter_id, + params_exchange_format=params_exchange_format, + params_transfer_type=params_transfer_type, + log_pull_interval=log_pull_interval, + ) + + if self._from_nvflare_converter is None: + self._from_nvflare_converter = NumpyToKerasModelParamsConverter( + [AppConstants.TASK_TRAIN, AppConstants.TASK_VALIDATION] + ) + if self._to_nvflare_converter is None: + self._to_nvflare_converter = KerasModelToNumpyParamsConverter( + [AppConstants.TASK_TRAIN, AppConstants.TASK_SUBMIT_MODEL] + ) diff --git a/nvflare/fuel/utils/pipe/cell_pipe.py b/nvflare/fuel/utils/pipe/cell_pipe.py index 56f6716454..82dbb31651 100644 --- a/nvflare/fuel/utils/pipe/cell_pipe.py +++ b/nvflare/fuel/utils/pipe/cell_pipe.py @@ -18,6 +18,7 @@ import time from typing import Tuple, Union +from nvflare.apis.fl_constant import SystemVarName from nvflare.fuel.f3.cellnet.cell import Cell from nvflare.fuel.f3.cellnet.cell import Message as CellMessage from nvflare.fuel.f3.cellnet.defs import MessageHeaderKey, ReturnCode @@ -178,18 +179,24 @@ def __init__( super().__init__(mode) self.logger = logging.getLogger(self.__class__.__name__) + self.site_name = site_name + self.token = token + self.root_url = root_url + self.secure_mode = secure_mode + self.workspace_dir = workspace_dir + + # this section is needed by job config to prevent building cell when using SystemVarName arguments + # TODO: enhance this part + sysvarname_placeholders = ["{" + varname + "}" for varname in dir(SystemVarName)] + if any([arg in sysvarname_placeholders for arg in [site_name, token, root_url, secure_mode, workspace_dir]]): + return + check_str("root_url", root_url) check_object_type("secure_mode", secure_mode, bool) check_str("token", token) check_str("site_name", site_name) check_str("workspace_dir", workspace_dir) - self.root_url = root_url - self.secure_mode = secure_mode - self.workspace_dir = workspace_dir - self.site_name = site_name - self.token = token - mode = f"{mode}".strip().lower() # convert to lower case string self.ci = self._build_cell(mode, root_url, site_name, token, secure_mode, workspace_dir) self.cell = self.ci.cell diff --git a/nvflare/job_config/base_app_config.py b/nvflare/job_config/base_app_config.py index 3318d788a2..344c61e658 100644 --- a/nvflare/job_config/base_app_config.py +++ b/nvflare/job_config/base_app_config.py @@ -58,9 +58,6 @@ def add_ext_script(self, ext_script: str): if not (os.path.isabs(ext_script) or os.path.exists(ext_script)): raise RuntimeError(f"Could not locate external script: {ext_script}") - if not ext_script.endswith(".py"): - raise RuntimeError(f"External script: {ext_script} must be a '.py' file.") - self.ext_scripts.append(ext_script) def add_ext_dir(self, ext_dir: str): diff --git a/nvflare/job_config/fed_job_config.py b/nvflare/job_config/fed_job_config.py index 2e98752274..9ceb604151 100644 --- a/nvflare/job_config/fed_job_config.py +++ b/nvflare/job_config/fed_job_config.py @@ -172,7 +172,7 @@ def _copy_ext_scripts(self, custom_dir, ext_scripts): relative_script = self._get_relative_script(script) else: relative_script = script - dest_file = os.path.join(custom_dir, relative_script) + dest_file = os.path.join(custom_dir, os.path.basename(relative_script)) module = "".join(relative_script.rsplit(".py", 1)).replace(os.sep, ".") self._copy_source_file(custom_dir, module, script, dest_file) @@ -209,7 +209,7 @@ def _get_custom_file(self, custom_dir, module, source_file): self.custom_modules.append(module) os.makedirs(custom_dir, exist_ok=True) # dest_file = os.path.join(custom_dir, module.replace(".", os.sep) + ".py") - dest_file = os.path.join(custom_dir, dest) + dest_file = os.path.join(custom_dir, os.path.basename(dest)) self._copy_source_file(custom_dir, module, source_file, dest_file) diff --git a/nvflare/job_config/script_runner.py b/nvflare/job_config/script_runner.py new file mode 100644 index 0000000000..b28d71e43b --- /dev/null +++ b/nvflare/job_config/script_runner.py @@ -0,0 +1,172 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from typing import Type + +from nvflare.app_common.executors.client_api_launcher_executor import ClientAPILauncherExecutor +from nvflare.app_common.executors.in_process_client_api_executor import InProcessClientAPIExecutor +from nvflare.client.config import ExchangeFormat +from nvflare.fuel.utils.import_utils import optional_import + + +class FrameworkType: + RAW = "raw" + NUMPY = "numpy" + PYTORCH = "pytorch" + TENSORFLOW = "tensorflow" + + +class ScriptRunner: + def __init__( + self, + script: str, + script_args: str = "", + launch_external_process: bool = False, + command: str = "python3", + framework: FrameworkType = FrameworkType.PYTORCH, + ): + """ScriptRunner is used with FedJob API to run or launch a script. + + in-process `launch_external_process=False` uses InProcessClientAPIExecutor (default). + ex-process `launch_external_process=True` uses ClientAPILauncherExecutor. + + Args: + script (str): Script to run. For in-process must be a python script path. For ex-process can be any script support by `command`. + script_args (str): Optional arguments for script (appended to script). + launch_external_process (bool): Whether to launch the script in external process. Defaults to False. + command (str): If launch_external_process=True, command to run script (preprended to script). Defaults to "python3". + framework (str): Framework type to connfigure converter and params exchange formats. Defaults to FrameworkType.PYTORCH. + """ + self._script = script + self._script_args = script_args + self._command = command + self._launch_external_process = launch_external_process + self._framework = framework + + self._params_exchange_format = None + + if self._framework == FrameworkType.PYTORCH: + _, torch_ok = optional_import(module="torch") + if torch_ok: + self._params_exchange_format = ExchangeFormat.PYTORCH + else: + raise ValueError("Using FrameworkType.PYTORCH, but unable to import torch") + elif self._framework == FrameworkType.TENSORFLOW: + _, tf_ok = optional_import(module="tensorflow") + if tf_ok: + self._params_exchange_format = ExchangeFormat.NUMPY + else: + raise ValueError("Using FrameworkType.TENSORFLOW, but unable to import tensorflow") + elif self._framework == FrameworkType.NUMPY: + self._params_exchange_format = ExchangeFormat.NUMPY + elif self._framework == FrameworkType.RAW: + self._params_exchange_format = ExchangeFormat.RAW + else: + raise ValueError(f"Framework {self._framework} unsupported") + + def add_to_fed_job(self, job, ctx, **kwargs): + """This method is used by Job API. + + Args: + job: the Job object to add to + ctx: Job Context + + Returns: + + """ + job.check_kwargs(args_to_check=kwargs, args_expected={"tasks": False}) + tasks = kwargs.get("tasks", ["*"]) + + if self._launch_external_process: + from nvflare.app_common.launchers.subprocess_launcher import SubprocessLauncher + from nvflare.app_common.widgets.external_configurator import ExternalConfigurator + from nvflare.app_common.widgets.metric_relay import MetricRelay + from nvflare.fuel.utils.pipe.cell_pipe import CellPipe + + component = CellPipe( + mode="PASSIVE", + site_name="{SITE_NAME}", + token="{JOB_ID}", + root_url="{ROOT_URL}", + secure_mode="{SECURE_MODE}", + workspace_dir="{WORKSPACE}", + ) + pipe_id = job.add_component("pipe", component, ctx) + + component = SubprocessLauncher( + script=self._command + " custom/" + os.path.basename(self._script) + " " + self._script_args, + ) + launcher_id = job.add_component("launcher", component, ctx) + + executor = self._get_ex_process_executor_cls(self._framework)( + pipe_id=pipe_id, + launcher_id=launcher_id, + params_exchange_format=self._params_exchange_format, + ) + job.add_executor(executor, tasks=tasks, ctx=ctx) + + component = CellPipe( + mode="PASSIVE", + site_name="{SITE_NAME}", + token="{JOB_ID}", + root_url="{ROOT_URL}", + secure_mode="{SECURE_MODE}", + workspace_dir="{WORKSPACE}", + ) + metric_pipe_id = job.add_component("metrics_pipe", component, ctx) + + component = MetricRelay( + pipe_id=metric_pipe_id, + event_type="fed.analytix_log_stats", + ) + metric_relay_id = job.add_component("metric_relay", component, ctx) + + component = ExternalConfigurator( + component_ids=[metric_relay_id], + ) + job.add_component("config_preparer", component, ctx) + else: + executor = self._get_in_process_executor_cls(self._framework)( + task_script_path=os.path.basename(self._script), + task_script_args=self._script_args, + params_exchange_format=self._params_exchange_format, + ) + job.add_executor(executor, tasks=tasks, ctx=ctx) + + job.add_resources(resources=[self._script], ctx=ctx) + + def _get_ex_process_executor_cls(self, framework: FrameworkType) -> Type[ClientAPILauncherExecutor]: + if framework == FrameworkType.PYTORCH: + from nvflare.app_opt.pt.client_api_launcher_executor import PTClientAPILauncherExecutor + + return PTClientAPILauncherExecutor + elif framework == FrameworkType.TENSORFLOW: + from nvflare.app_opt.tf.client_api_launcher_executor import TFClientAPILauncherExecutor + + return TFClientAPILauncherExecutor + else: + return ClientAPILauncherExecutor + + def _get_in_process_executor_cls(self, framework: FrameworkType) -> Type[InProcessClientAPIExecutor]: + if framework == FrameworkType.PYTORCH: + from nvflare.app_opt.pt.in_process_client_api_executor import PTInProcessClientAPIExecutor + + return PTInProcessClientAPIExecutor + elif framework == FrameworkType.TENSORFLOW: + from nvflare.app_opt.tf.in_process_client_api_executor import TFInProcessClientAPIExecutor + + return TFInProcessClientAPIExecutor + else: + return InProcessClientAPIExecutor