diff --git a/examples/advanced/job_api/pt/fedavg_model_learner_xsite_val_cifar10.py b/examples/advanced/job_api/pt/fedavg_model_learner_xsite_val_cifar10.py index ac22692267..4a0efd24c1 100644 --- a/examples/advanced/job_api/pt/fedavg_model_learner_xsite_val_cifar10.py +++ b/examples/advanced/job_api/pt/fedavg_model_learner_xsite_val_cifar10.py @@ -22,11 +22,9 @@ from pt.utils.cifar10_data_splitter import Cifar10DataSplitter from pt.utils.cifar10_data_utils import load_cifar10_data -from nvflare import FedJob from nvflare.app_common.executors.model_learner_executor import ModelLearnerExecutor from nvflare.app_common.workflows.cross_site_model_eval import CrossSiteModelEval -from nvflare.app_common.workflows.fedavg import FedAvg -from nvflare.app_opt.pt.job_config.model import PTModel +from nvflare.app_opt.pt.job_config.fed_avg import FedAvgJob if __name__ == "__main__": n_clients = 2 @@ -35,13 +33,9 @@ alpha = 0.1 train_split_root = f"/tmp/cifar10_splits/clients{n_clients}_alpha{alpha}" # avoid overwriting results - job = FedJob(name="cifar10_fedavg") + job = FedAvgJob(name="cifar10_fedavg", n_clients=n_clients, num_rounds=num_rounds, initial_model=ModerateCNN()) - ctrl1 = FedAvg( - num_clients=n_clients, - num_rounds=num_rounds, - ) - ctrl2 = CrossSiteModelEval() + ctrl = CrossSiteModelEval() load_cifar10_data() # preload CIFAR10 data data_splitter = Cifar10DataSplitter( @@ -50,17 +44,18 @@ alpha=alpha, ) - job.to(ctrl1, "server") - job.to(ctrl2, "server") + job.to(ctrl, "server") job.to(data_splitter, "server") - # Define the initial global model and send to server - job.to(PTModel(ModerateCNN()), "server") - for i in range(n_clients): - learner = CIFAR10ModelLearner(train_idx_root=train_split_root, aggregation_epochs=aggregation_epochs, lr=0.01) - executor = ModelLearnerExecutor(learner_id=job.as_id(learner)) - job.to(executor, f"site-{i+1}") # data splitter assumes client names start from 1 + site_name = f"site-{i+1}" + learner_id = job.to( + CIFAR10ModelLearner(train_idx_root=train_split_root, aggregation_epochs=aggregation_epochs, lr=0.01), + site_name, + id="learner", + ) + executor = ModelLearnerExecutor(learner_id=learner_id) + job.to(executor, site_name) # data splitter assumes client names start from 1 # job.export_job("/tmp/nvflare/jobs/job_config") job.simulator_run("/tmp/nvflare/jobs/workdir", gpu="0") diff --git a/examples/advanced/job_api/pt/fedavg_script_runner_cifar10.py b/examples/advanced/job_api/pt/fedavg_script_runner_cifar10.py index 4d3c726894..38b32747c6 100644 --- a/examples/advanced/job_api/pt/fedavg_script_runner_cifar10.py +++ b/examples/advanced/job_api/pt/fedavg_script_runner_cifar10.py @@ -14,6 +14,7 @@ from src.net import Net +from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector from nvflare.app_common.workflows.fedavg import FedAvg from nvflare.app_opt.pt.job_config.model import PTModel @@ -39,6 +40,8 @@ # Define the initial global model and send to server job.to(PTModel(Net()), "server") + job.to(IntimeModelSelector(key_metric="accuracy"), "server") + # Note: We can optionally replace the above code with the FedAvgJob, which is a pattern to simplify FedAvg job creations # job = FedAvgJob(name="cifar10_fedavg", num_rounds=num_rounds, n_clients=n_clients, initial_model=Net()) diff --git a/examples/advanced/job_api/pt/fedavg_script_runner_dp_filter_cifar10.py b/examples/advanced/job_api/pt/fedavg_script_runner_dp_filter_cifar10.py index 0f4463d362..a9789a80c7 100644 --- a/examples/advanced/job_api/pt/fedavg_script_runner_dp_filter_cifar10.py +++ b/examples/advanced/job_api/pt/fedavg_script_runner_dp_filter_cifar10.py @@ -27,12 +27,13 @@ job = FedAvgJob(name="cifar10_fedavg_privacy", num_rounds=num_rounds, n_clients=n_clients, initial_model=Net()) for i in range(n_clients): + site_name = f"site-{i}" executor = ScriptRunner(script=train_script, script_args="") - job.to(executor, f"site-{i}", tasks=["train"]) + job.to(executor, site_name, tasks=["train"]) # add privacy filter. pp_filter = PercentilePrivacy(percentile=10, gamma=0.01) - job.to(pp_filter, f"site-{i}", tasks=["train"], filter_type=FilterType.TASK_RESULT) + job.to(pp_filter, site_name, tasks=["train"], filter_type=FilterType.TASK_RESULT) # job.export_job("/tmp/nvflare/jobs/job_config") job.simulator_run("/tmp/nvflare/jobs/workdir", gpu="0") diff --git a/examples/advanced/job_api/sklearn/kmeans_script_runner_higgs.py b/examples/advanced/job_api/sklearn/kmeans_script_runner_higgs.py index 39663be315..592b934d8f 100644 --- a/examples/advanced/job_api/sklearn/kmeans_script_runner_higgs.py +++ b/examples/advanced/job_api/sklearn/kmeans_script_runner_higgs.py @@ -21,6 +21,7 @@ from nvflare import FedJob from nvflare.app_common.aggregators.collect_and_assemble_aggregator import CollectAndAssembleAggregator from nvflare.app_common.shareablegenerators.full_model_shareable_generator import FullModelShareableGenerator +from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector from nvflare.app_common.workflows.scatter_and_gather import ScatterAndGather from nvflare.app_opt.sklearn.joblib_model_param_persistor import JoblibModelParamPersistor from nvflare.client.config import ExchangeFormat @@ -117,24 +118,29 @@ def split_higgs(input_data_path, input_header_path, output_dir, site_num, sample # ScatterAndGather also expects an "aggregator" which we define here. # The actual aggregation function is defined by an "assembler" to specify how to handle the collected updates. # We use KMeansAssembler which is the assembler designed for k-Means algorithm. - aggregator = CollectAndAssembleAggregator(assembler_id=job.as_id(KMeansAssembler())) + assembler_id = job.to_server(KMeansAssembler(), id="assembler") + aggregator_id = job.to_server(CollectAndAssembleAggregator(assembler_id=assembler_id), id="aggregator") # For kmeans with sklean, we need a custom persistor # JoblibModelParamPersistor is a persistor which save/read the model to/from file with JobLib format. - persistor = JoblibModelParamPersistor(initial_params={"n_clusters": 2}) + persistor_id = job.to_server(JoblibModelParamPersistor(initial_params={"n_clusters": 2}), id="persistor") + + shareable_generator_id = job.to_server(FullModelShareableGenerator(), id="shareable_generator") controller = ScatterAndGather( min_clients=n_clients, num_rounds=num_rounds, wait_time_after_min_received=0, - aggregator_id=job.as_id(aggregator), - persistor_id=job.as_id(persistor), - shareable_generator_id=job.as_id(FullModelShareableGenerator()), + aggregator_id=aggregator_id, + persistor_id=persistor_id, + shareable_generator_id=shareable_generator_id, train_task_name="train", # Client will start training once received such task. train_timeout=0, ) job.to(controller, "server") + job.to(IntimeModelSelector(key_metric="accuracy"), "server") + # Add clients for i in range(n_clients): executor = ScriptRunner( diff --git a/examples/advanced/job_api/tf/tf_fl_script_runner_cifar10.py b/examples/advanced/job_api/tf/tf_fl_script_runner_cifar10.py index 683878ca62..b2d809a2d4 100644 --- a/examples/advanced/job_api/tf/tf_fl_script_runner_cifar10.py +++ b/examples/advanced/job_api/tf/tf_fl_script_runner_cifar10.py @@ -21,6 +21,7 @@ from src.tf_net import ModerateTFNet from nvflare import FedJob +from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector from nvflare.app_opt.tf.job_config.model import TFModel from nvflare.job_config.script_runner import ScriptRunner @@ -153,6 +154,8 @@ # Define the initial global model and send to server job.to(TFModel(ModerateTFNet(input_shape=(None, 32, 32, 3))), "server") + job.to(IntimeModelSelector(key_metric="accuracy"), "server") + # Add clients for i, train_idx_path in enumerate(train_idx_paths): curr_task_script_args = task_script_args + f" --train_idx_path {train_idx_path}" diff --git a/examples/getting_started/pt/nvflare_lightning_getting_started.ipynb b/examples/getting_started/pt/nvflare_lightning_getting_started.ipynb index 11c43227d5..3868b0199b 100644 --- a/examples/getting_started/pt/nvflare_lightning_getting_started.ipynb +++ b/examples/getting_started/pt/nvflare_lightning_getting_started.ipynb @@ -383,6 +383,27 @@ "job.to(PTModel(LitNet()), \"server\")" ] }, + { + "cell_type": "markdown", + "id": "72eefb39", + "metadata": {}, + "source": [ + "#### 5. Add ModelSelector\n", + "Add IntimeModelSelector for global best model selection." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "091beb78", + "metadata": {}, + "outputs": [], + "source": [ + "from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector\n", + "\n", + "job.to(IntimeModelSelector(key_metric=\"accuracy\"), \"server\")" + ] + }, { "cell_type": "markdown", "id": "77f5bc7f-4fb4-46e9-8f02-5e7245d95070", @@ -397,7 +418,7 @@ "metadata": {}, "source": [ "#### 5. Add clients\n", - "Next, we can use the `ScriptExecutor` and send it to each of the clients to run our training script.\n", + "Next, we can use the `ScriptRunner` and send it to each of the clients to run our training script.\n", "\n", "Note that our script could have additional input arguments, such as batch size or data path, but we don't use them here for simplicity.\n", "We can also specify, which GPU should be used to run this client, which is helpful for simulated environments." diff --git a/examples/getting_started/pt/nvflare_pt_getting_started.ipynb b/examples/getting_started/pt/nvflare_pt_getting_started.ipynb index 68ecd364a5..59ff3e9492 100644 --- a/examples/getting_started/pt/nvflare_pt_getting_started.ipynb +++ b/examples/getting_started/pt/nvflare_pt_getting_started.ipynb @@ -325,6 +325,27 @@ "job.to(PTModel(Net()), \"server\")" ] }, + { + "cell_type": "markdown", + "id": "eccae908", + "metadata": {}, + "source": [ + "#### 5. Add ModelSelector\n", + "Add IntimeModelSelector for global best model selection." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d52dd194", + "metadata": {}, + "outputs": [], + "source": [ + "from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector\n", + "\n", + "job.to(IntimeModelSelector(key_metric=\"accuracy\"), \"server\")" + ] + }, { "cell_type": "markdown", "id": "77f5bc7f-4fb4-46e9-8f02-5e7245d95070", diff --git a/examples/getting_started/sklearn/kmeans_script_runner_higgs.py b/examples/getting_started/sklearn/kmeans_script_runner_higgs.py index 39663be315..592b934d8f 100644 --- a/examples/getting_started/sklearn/kmeans_script_runner_higgs.py +++ b/examples/getting_started/sklearn/kmeans_script_runner_higgs.py @@ -21,6 +21,7 @@ from nvflare import FedJob from nvflare.app_common.aggregators.collect_and_assemble_aggregator import CollectAndAssembleAggregator from nvflare.app_common.shareablegenerators.full_model_shareable_generator import FullModelShareableGenerator +from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector from nvflare.app_common.workflows.scatter_and_gather import ScatterAndGather from nvflare.app_opt.sklearn.joblib_model_param_persistor import JoblibModelParamPersistor from nvflare.client.config import ExchangeFormat @@ -117,24 +118,29 @@ def split_higgs(input_data_path, input_header_path, output_dir, site_num, sample # ScatterAndGather also expects an "aggregator" which we define here. # The actual aggregation function is defined by an "assembler" to specify how to handle the collected updates. # We use KMeansAssembler which is the assembler designed for k-Means algorithm. - aggregator = CollectAndAssembleAggregator(assembler_id=job.as_id(KMeansAssembler())) + assembler_id = job.to_server(KMeansAssembler(), id="assembler") + aggregator_id = job.to_server(CollectAndAssembleAggregator(assembler_id=assembler_id), id="aggregator") # For kmeans with sklean, we need a custom persistor # JoblibModelParamPersistor is a persistor which save/read the model to/from file with JobLib format. - persistor = JoblibModelParamPersistor(initial_params={"n_clusters": 2}) + persistor_id = job.to_server(JoblibModelParamPersistor(initial_params={"n_clusters": 2}), id="persistor") + + shareable_generator_id = job.to_server(FullModelShareableGenerator(), id="shareable_generator") controller = ScatterAndGather( min_clients=n_clients, num_rounds=num_rounds, wait_time_after_min_received=0, - aggregator_id=job.as_id(aggregator), - persistor_id=job.as_id(persistor), - shareable_generator_id=job.as_id(FullModelShareableGenerator()), + aggregator_id=aggregator_id, + persistor_id=persistor_id, + shareable_generator_id=shareable_generator_id, train_task_name="train", # Client will start training once received such task. train_timeout=0, ) job.to(controller, "server") + job.to(IntimeModelSelector(key_metric="accuracy"), "server") + # Add clients for i in range(n_clients): executor = ScriptRunner( diff --git a/examples/getting_started/tf/nvflare_tf_getting_started.ipynb b/examples/getting_started/tf/nvflare_tf_getting_started.ipynb index af40feca93..f5e06707c3 100644 --- a/examples/getting_started/tf/nvflare_tf_getting_started.ipynb +++ b/examples/getting_started/tf/nvflare_tf_getting_started.ipynb @@ -315,6 +315,27 @@ "job.to(TFModel(TFNet()), \"server\")" ] }, + { + "cell_type": "markdown", + "id": "25c6eada", + "metadata": {}, + "source": [ + "#### 5. Add ModelSelector\n", + "Add IntimeModelSelector for global best model selection." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0ae73e50", + "metadata": {}, + "outputs": [], + "source": [ + "from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector\n", + "\n", + "job.to(IntimeModelSelector(key_metric=\"accuracy\"), \"server\")" + ] + }, { "cell_type": "markdown", "id": "77f5bc7f-4fb4-46e9-8f02-5e7245d95070", @@ -391,7 +412,7 @@ }, "outputs": [], "source": [ - "job.simulator_run(\"/tmp/nvflare/jobs/workdir\", , gpu=\"0\")" + "job.simulator_run(\"/tmp/nvflare/jobs/workdir\", gpu=\"0\")" ] }, { diff --git a/examples/getting_started/tf/tf_fl_script_runner_cifar10.py b/examples/getting_started/tf/tf_fl_script_runner_cifar10.py index 683878ca62..b2d809a2d4 100644 --- a/examples/getting_started/tf/tf_fl_script_runner_cifar10.py +++ b/examples/getting_started/tf/tf_fl_script_runner_cifar10.py @@ -21,6 +21,7 @@ from src.tf_net import ModerateTFNet from nvflare import FedJob +from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector from nvflare.app_opt.tf.job_config.model import TFModel from nvflare.job_config.script_runner import ScriptRunner @@ -153,6 +154,8 @@ # Define the initial global model and send to server job.to(TFModel(ModerateTFNet(input_shape=(None, 32, 32, 3))), "server") + job.to(IntimeModelSelector(key_metric="accuracy"), "server") + # Add clients for i, train_idx_path in enumerate(train_idx_paths): curr_task_script_args = task_script_args + f" --train_idx_path {train_idx_path}" diff --git a/examples/hello-world/hello-fedavg-numpy/fedavg_script_runner_hello-numpy.py b/examples/hello-world/hello-fedavg-numpy/fedavg_script_runner_hello-numpy.py index 48f34a0180..6fb676f452 100644 --- a/examples/hello-world/hello-fedavg-numpy/fedavg_script_runner_hello-numpy.py +++ b/examples/hello-world/hello-fedavg-numpy/fedavg_script_runner_hello-numpy.py @@ -13,6 +13,7 @@ # limitations under the License. from nvflare import FedJob +from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector from nvflare.app_common.workflows.fedavg import FedAvg from nvflare.client.config import ExchangeFormat from nvflare.job_config.script_runner import ScriptRunner @@ -31,6 +32,8 @@ ) job.to(controller, "server") + job.to(IntimeModelSelector(key_metric="accuracy"), "server") + # Add clients for i in range(n_clients): executor = ScriptRunner(script=train_script, script_args="", params_exchange_format=ExchangeFormat.NUMPY) diff --git a/examples/hello-world/hello-fedavg-numpy/hello-fedavg-numpy_getting_started.ipynb b/examples/hello-world/hello-fedavg-numpy/hello-fedavg-numpy_getting_started.ipynb index 6d3b0305cb..4fc9b9f4d7 100644 --- a/examples/hello-world/hello-fedavg-numpy/hello-fedavg-numpy_getting_started.ipynb +++ b/examples/hello-world/hello-fedavg-numpy/hello-fedavg-numpy_getting_started.ipynb @@ -196,6 +196,7 @@ "from nvflare.app_common.workflows.fedavg import FedAvg\n", "from nvflare.job_config.script_runner import ScriptRunner\n", "\n", + "\n", "job = FedJob(name=\"hello-fedavg-numpy\")" ] }, @@ -225,13 +226,34 @@ "job.to(controller, \"server\")" ] }, + { + "cell_type": "markdown", + "id": "4171fe9e", + "metadata": {}, + "source": [ + "#### 3. Add ModelSelector\n", + "Add IntimeModelSelector for global best model selection." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fb7b93a6", + "metadata": {}, + "outputs": [], + "source": [ + "from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector\n", + "\n", + "job.to(IntimeModelSelector(key_metric=\"accuracy\"), \"server\")" + ] + }, { "cell_type": "markdown", "id": "548966c2-90bf-47ad-91d2-5c6c22c3c4f0", "metadata": {}, "source": [ "#### 3. Add Clients\n", - "Next, we can use the `ScriptExecutor` and send it to each of the clients to run our training script.\n", + "Next, we can use the `ScriptRunner` and send it to each of the clients to run our training script.\n", "\n", "Note that our script could have additional input arguments, such as batch size or data path, but we don't use them here for simplicity.\n", "We can also specify, which GPU should be used to run this client, which is helpful for simulated environments." diff --git a/examples/hello-world/hello-pt/fedavg_script_runner_hello-pt.py b/examples/hello-world/hello-pt/fedavg_script_runner_hello-pt.py index c8bdbe6e4f..ab5851a015 100644 --- a/examples/hello-world/hello-pt/fedavg_script_runner_hello-pt.py +++ b/examples/hello-world/hello-pt/fedavg_script_runner_hello-pt.py @@ -14,9 +14,7 @@ from src.simple_network import SimpleNetwork -from nvflare import FedJob -from nvflare.app_common.workflows.fedavg import FedAvg -from nvflare.app_opt.pt.job_config.model import PTModel +from nvflare.app_opt.pt.job_config.fed_avg import FedAvgJob from nvflare.job_config.script_runner import ScriptRunner if __name__ == "__main__": @@ -24,17 +22,9 @@ num_rounds = 2 train_script = "src/hello-pt_cifar10_fl.py" - job = FedJob(name="hello-pt_cifar10_fedavg") - - # Define the controller workflow and send to server - controller = FedAvg( - num_clients=n_clients, - num_rounds=num_rounds, + job = FedAvgJob( + name="hello-pt_cifar10_fedavg", n_clients=n_clients, num_rounds=num_rounds, initial_model=SimpleNetwork() ) - job.to(controller, "server") - - # Define the initial global model and send to server - job.to(PTModel(SimpleNetwork()), "server") # Add clients for i in range(n_clients): diff --git a/examples/hello-world/hello-tf/fedavg_script_runner_hello-tf.py b/examples/hello-world/hello-tf/fedavg_script_runner_hello-tf.py index fa518293d7..a146e77de1 100644 --- a/examples/hello-world/hello-tf/fedavg_script_runner_hello-tf.py +++ b/examples/hello-world/hello-tf/fedavg_script_runner_hello-tf.py @@ -14,9 +14,7 @@ from src.tf_net import TFNet -from nvflare import FedJob -from nvflare.app_common.workflows.fedavg import FedAvg -from nvflare.app_opt.tf.job_config.model import TFModel +from nvflare.app_opt.tf.job_config.fed_avg import FedAvgJob from nvflare.job_config.script_runner import ScriptRunner if __name__ == "__main__": @@ -24,17 +22,7 @@ num_rounds = 3 train_script = "src/hello-tf_fl.py" - job = FedJob(name="hello-tf_fedavg") - - # Define the controller workflow and send to server - controller = FedAvg( - num_clients=n_clients, - num_rounds=num_rounds, - ) - job.to(controller, "server") - - # Define the initial global model and send to server - job.to(TFModel(TFNet()), "server") + job = FedAvgJob(name="hello-tf_fedavg", n_clients=n_clients, num_rounds=num_rounds, initial_model=TFNet()) # Add clients for i in range(n_clients): diff --git a/nvflare/app_opt/tf/job_config/fed_avg.py b/nvflare/app_opt/tf/job_config/fed_avg.py index 99be5530a1..2c2dadf015 100644 --- a/nvflare/app_opt/tf/job_config/fed_avg.py +++ b/nvflare/app_opt/tf/job_config/fed_avg.py @@ -69,12 +69,12 @@ def __init__( component = TBAnalyticsReceiver(events=["fed.analytix_log_stats"]) self.to_server(id="receiver", obj=component) - comp_ids = self.to_server(TFModel(initial_model)) + persistor_id = self.to_server(TFModel(initial_model)) controller = FedAvg( num_clients=n_clients, num_rounds=num_rounds, - persistor_id=comp_ids["persistor_id"], + persistor_id=persistor_id, ) self.to_server(controller)