Skip to content

Commit

Permalink
fix job api examples (#2823)
Browse files Browse the repository at this point in the history
  • Loading branch information
SYangster authored Aug 22, 2024
1 parent 16e0c27 commit f766f90
Show file tree
Hide file tree
Showing 15 changed files with 144 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
from pt.utils.cifar10_data_splitter import Cifar10DataSplitter
from pt.utils.cifar10_data_utils import load_cifar10_data

from nvflare import FedJob
from nvflare.app_common.executors.model_learner_executor import ModelLearnerExecutor
from nvflare.app_common.workflows.cross_site_model_eval import CrossSiteModelEval
from nvflare.app_common.workflows.fedavg import FedAvg
from nvflare.app_opt.pt.job_config.model import PTModel
from nvflare.app_opt.pt.job_config.fed_avg import FedAvgJob

if __name__ == "__main__":
n_clients = 2
Expand All @@ -35,13 +33,9 @@
alpha = 0.1
train_split_root = f"/tmp/cifar10_splits/clients{n_clients}_alpha{alpha}" # avoid overwriting results

job = FedJob(name="cifar10_fedavg")
job = FedAvgJob(name="cifar10_fedavg", n_clients=n_clients, num_rounds=num_rounds, initial_model=ModerateCNN())

ctrl1 = FedAvg(
num_clients=n_clients,
num_rounds=num_rounds,
)
ctrl2 = CrossSiteModelEval()
ctrl = CrossSiteModelEval()

load_cifar10_data() # preload CIFAR10 data
data_splitter = Cifar10DataSplitter(
Expand All @@ -50,17 +44,18 @@
alpha=alpha,
)

job.to(ctrl1, "server")
job.to(ctrl2, "server")
job.to(ctrl, "server")
job.to(data_splitter, "server")

# Define the initial global model and send to server
job.to(PTModel(ModerateCNN()), "server")

for i in range(n_clients):
learner = CIFAR10ModelLearner(train_idx_root=train_split_root, aggregation_epochs=aggregation_epochs, lr=0.01)
executor = ModelLearnerExecutor(learner_id=job.as_id(learner))
job.to(executor, f"site-{i+1}") # data splitter assumes client names start from 1
site_name = f"site-{i+1}"
learner_id = job.to(
CIFAR10ModelLearner(train_idx_root=train_split_root, aggregation_epochs=aggregation_epochs, lr=0.01),
site_name,
id="learner",
)
executor = ModelLearnerExecutor(learner_id=learner_id)
job.to(executor, site_name) # data splitter assumes client names start from 1

# job.export_job("/tmp/nvflare/jobs/job_config")
job.simulator_run("/tmp/nvflare/jobs/workdir", gpu="0")
3 changes: 3 additions & 0 deletions examples/advanced/job_api/pt/fedavg_script_runner_cifar10.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from src.net import Net

from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector
from nvflare.app_common.workflows.fedavg import FedAvg
from nvflare.app_opt.pt.job_config.model import PTModel

Expand All @@ -39,6 +40,8 @@
# Define the initial global model and send to server
job.to(PTModel(Net()), "server")

job.to(IntimeModelSelector(key_metric="accuracy"), "server")

# Note: We can optionally replace the above code with the FedAvgJob, which is a pattern to simplify FedAvg job creations
# job = FedAvgJob(name="cifar10_fedavg", num_rounds=num_rounds, n_clients=n_clients, initial_model=Net())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
job = FedAvgJob(name="cifar10_fedavg_privacy", num_rounds=num_rounds, n_clients=n_clients, initial_model=Net())

for i in range(n_clients):
site_name = f"site-{i}"
executor = ScriptRunner(script=train_script, script_args="")
job.to(executor, f"site-{i}", tasks=["train"])
job.to(executor, site_name, tasks=["train"])

# add privacy filter.
pp_filter = PercentilePrivacy(percentile=10, gamma=0.01)
job.to(pp_filter, f"site-{i}", tasks=["train"], filter_type=FilterType.TASK_RESULT)
job.to(pp_filter, site_name, tasks=["train"], filter_type=FilterType.TASK_RESULT)

# job.export_job("/tmp/nvflare/jobs/job_config")
job.simulator_run("/tmp/nvflare/jobs/workdir", gpu="0")
16 changes: 11 additions & 5 deletions examples/advanced/job_api/sklearn/kmeans_script_runner_higgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from nvflare import FedJob
from nvflare.app_common.aggregators.collect_and_assemble_aggregator import CollectAndAssembleAggregator
from nvflare.app_common.shareablegenerators.full_model_shareable_generator import FullModelShareableGenerator
from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector
from nvflare.app_common.workflows.scatter_and_gather import ScatterAndGather
from nvflare.app_opt.sklearn.joblib_model_param_persistor import JoblibModelParamPersistor
from nvflare.client.config import ExchangeFormat
Expand Down Expand Up @@ -117,24 +118,29 @@ def split_higgs(input_data_path, input_header_path, output_dir, site_num, sample
# ScatterAndGather also expects an "aggregator" which we define here.
# The actual aggregation function is defined by an "assembler" to specify how to handle the collected updates.
# We use KMeansAssembler which is the assembler designed for k-Means algorithm.
aggregator = CollectAndAssembleAggregator(assembler_id=job.as_id(KMeansAssembler()))
assembler_id = job.to_server(KMeansAssembler(), id="assembler")
aggregator_id = job.to_server(CollectAndAssembleAggregator(assembler_id=assembler_id), id="aggregator")

# For kmeans with sklean, we need a custom persistor
# JoblibModelParamPersistor is a persistor which save/read the model to/from file with JobLib format.
persistor = JoblibModelParamPersistor(initial_params={"n_clusters": 2})
persistor_id = job.to_server(JoblibModelParamPersistor(initial_params={"n_clusters": 2}), id="persistor")

shareable_generator_id = job.to_server(FullModelShareableGenerator(), id="shareable_generator")

controller = ScatterAndGather(
min_clients=n_clients,
num_rounds=num_rounds,
wait_time_after_min_received=0,
aggregator_id=job.as_id(aggregator),
persistor_id=job.as_id(persistor),
shareable_generator_id=job.as_id(FullModelShareableGenerator()),
aggregator_id=aggregator_id,
persistor_id=persistor_id,
shareable_generator_id=shareable_generator_id,
train_task_name="train", # Client will start training once received such task.
train_timeout=0,
)
job.to(controller, "server")

job.to(IntimeModelSelector(key_metric="accuracy"), "server")

# Add clients
for i in range(n_clients):
executor = ScriptRunner(
Expand Down
3 changes: 3 additions & 0 deletions examples/advanced/job_api/tf/tf_fl_script_runner_cifar10.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from src.tf_net import ModerateTFNet

from nvflare import FedJob
from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector
from nvflare.app_opt.tf.job_config.model import TFModel
from nvflare.job_config.script_runner import ScriptRunner

Expand Down Expand Up @@ -153,6 +154,8 @@
# Define the initial global model and send to server
job.to(TFModel(ModerateTFNet(input_shape=(None, 32, 32, 3))), "server")

job.to(IntimeModelSelector(key_metric="accuracy"), "server")

# Add clients
for i, train_idx_path in enumerate(train_idx_paths):
curr_task_script_args = task_script_args + f" --train_idx_path {train_idx_path}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,27 @@
"job.to(PTModel(LitNet()), \"server\")"
]
},
{
"cell_type": "markdown",
"id": "72eefb39",
"metadata": {},
"source": [
"#### 5. Add ModelSelector\n",
"Add IntimeModelSelector for global best model selection."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "091beb78",
"metadata": {},
"outputs": [],
"source": [
"from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector\n",
"\n",
"job.to(IntimeModelSelector(key_metric=\"accuracy\"), \"server\")"
]
},
{
"cell_type": "markdown",
"id": "77f5bc7f-4fb4-46e9-8f02-5e7245d95070",
Expand All @@ -397,7 +418,7 @@
"metadata": {},
"source": [
"#### 5. Add clients\n",
"Next, we can use the `ScriptExecutor` and send it to each of the clients to run our training script.\n",
"Next, we can use the `ScriptRunner` and send it to each of the clients to run our training script.\n",
"\n",
"Note that our script could have additional input arguments, such as batch size or data path, but we don't use them here for simplicity.\n",
"We can also specify, which GPU should be used to run this client, which is helpful for simulated environments."
Expand Down
21 changes: 21 additions & 0 deletions examples/getting_started/pt/nvflare_pt_getting_started.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,27 @@
"job.to(PTModel(Net()), \"server\")"
]
},
{
"cell_type": "markdown",
"id": "eccae908",
"metadata": {},
"source": [
"#### 5. Add ModelSelector\n",
"Add IntimeModelSelector for global best model selection."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d52dd194",
"metadata": {},
"outputs": [],
"source": [
"from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector\n",
"\n",
"job.to(IntimeModelSelector(key_metric=\"accuracy\"), \"server\")"
]
},
{
"cell_type": "markdown",
"id": "77f5bc7f-4fb4-46e9-8f02-5e7245d95070",
Expand Down
16 changes: 11 additions & 5 deletions examples/getting_started/sklearn/kmeans_script_runner_higgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from nvflare import FedJob
from nvflare.app_common.aggregators.collect_and_assemble_aggregator import CollectAndAssembleAggregator
from nvflare.app_common.shareablegenerators.full_model_shareable_generator import FullModelShareableGenerator
from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector
from nvflare.app_common.workflows.scatter_and_gather import ScatterAndGather
from nvflare.app_opt.sklearn.joblib_model_param_persistor import JoblibModelParamPersistor
from nvflare.client.config import ExchangeFormat
Expand Down Expand Up @@ -117,24 +118,29 @@ def split_higgs(input_data_path, input_header_path, output_dir, site_num, sample
# ScatterAndGather also expects an "aggregator" which we define here.
# The actual aggregation function is defined by an "assembler" to specify how to handle the collected updates.
# We use KMeansAssembler which is the assembler designed for k-Means algorithm.
aggregator = CollectAndAssembleAggregator(assembler_id=job.as_id(KMeansAssembler()))
assembler_id = job.to_server(KMeansAssembler(), id="assembler")
aggregator_id = job.to_server(CollectAndAssembleAggregator(assembler_id=assembler_id), id="aggregator")

# For kmeans with sklean, we need a custom persistor
# JoblibModelParamPersistor is a persistor which save/read the model to/from file with JobLib format.
persistor = JoblibModelParamPersistor(initial_params={"n_clusters": 2})
persistor_id = job.to_server(JoblibModelParamPersistor(initial_params={"n_clusters": 2}), id="persistor")

shareable_generator_id = job.to_server(FullModelShareableGenerator(), id="shareable_generator")

controller = ScatterAndGather(
min_clients=n_clients,
num_rounds=num_rounds,
wait_time_after_min_received=0,
aggregator_id=job.as_id(aggregator),
persistor_id=job.as_id(persistor),
shareable_generator_id=job.as_id(FullModelShareableGenerator()),
aggregator_id=aggregator_id,
persistor_id=persistor_id,
shareable_generator_id=shareable_generator_id,
train_task_name="train", # Client will start training once received such task.
train_timeout=0,
)
job.to(controller, "server")

job.to(IntimeModelSelector(key_metric="accuracy"), "server")

# Add clients
for i in range(n_clients):
executor = ScriptRunner(
Expand Down
23 changes: 22 additions & 1 deletion examples/getting_started/tf/nvflare_tf_getting_started.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,27 @@
"job.to(TFModel(TFNet()), \"server\")"
]
},
{
"cell_type": "markdown",
"id": "25c6eada",
"metadata": {},
"source": [
"#### 5. Add ModelSelector\n",
"Add IntimeModelSelector for global best model selection."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0ae73e50",
"metadata": {},
"outputs": [],
"source": [
"from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector\n",
"\n",
"job.to(IntimeModelSelector(key_metric=\"accuracy\"), \"server\")"
]
},
{
"cell_type": "markdown",
"id": "77f5bc7f-4fb4-46e9-8f02-5e7245d95070",
Expand Down Expand Up @@ -391,7 +412,7 @@
},
"outputs": [],
"source": [
"job.simulator_run(\"/tmp/nvflare/jobs/workdir\", , gpu=\"0\")"
"job.simulator_run(\"/tmp/nvflare/jobs/workdir\", gpu=\"0\")"
]
},
{
Expand Down
3 changes: 3 additions & 0 deletions examples/getting_started/tf/tf_fl_script_runner_cifar10.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from src.tf_net import ModerateTFNet

from nvflare import FedJob
from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector
from nvflare.app_opt.tf.job_config.model import TFModel
from nvflare.job_config.script_runner import ScriptRunner

Expand Down Expand Up @@ -153,6 +154,8 @@
# Define the initial global model and send to server
job.to(TFModel(ModerateTFNet(input_shape=(None, 32, 32, 3))), "server")

job.to(IntimeModelSelector(key_metric="accuracy"), "server")

# Add clients
for i, train_idx_path in enumerate(train_idx_paths):
curr_task_script_args = task_script_args + f" --train_idx_path {train_idx_path}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

from nvflare import FedJob
from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector
from nvflare.app_common.workflows.fedavg import FedAvg
from nvflare.client.config import ExchangeFormat
from nvflare.job_config.script_runner import ScriptRunner
Expand All @@ -31,6 +32,8 @@
)
job.to(controller, "server")

job.to(IntimeModelSelector(key_metric="accuracy"), "server")

# Add clients
for i in range(n_clients):
executor = ScriptRunner(script=train_script, script_args="", params_exchange_format=ExchangeFormat.NUMPY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@
"from nvflare.app_common.workflows.fedavg import FedAvg\n",
"from nvflare.job_config.script_runner import ScriptRunner\n",
"\n",
"\n",
"job = FedJob(name=\"hello-fedavg-numpy\")"
]
},
Expand Down Expand Up @@ -225,13 +226,34 @@
"job.to(controller, \"server\")"
]
},
{
"cell_type": "markdown",
"id": "4171fe9e",
"metadata": {},
"source": [
"#### 3. Add ModelSelector\n",
"Add IntimeModelSelector for global best model selection."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fb7b93a6",
"metadata": {},
"outputs": [],
"source": [
"from nvflare.app_common.widgets.intime_model_selector import IntimeModelSelector\n",
"\n",
"job.to(IntimeModelSelector(key_metric=\"accuracy\"), \"server\")"
]
},
{
"cell_type": "markdown",
"id": "548966c2-90bf-47ad-91d2-5c6c22c3c4f0",
"metadata": {},
"source": [
"#### 3. Add Clients\n",
"Next, we can use the `ScriptExecutor` and send it to each of the clients to run our training script.\n",
"Next, we can use the `ScriptRunner` and send it to each of the clients to run our training script.\n",
"\n",
"Note that our script could have additional input arguments, such as batch size or data path, but we don't use them here for simplicity.\n",
"We can also specify, which GPU should be used to run this client, which is helpful for simulated environments."
Expand Down
Loading

0 comments on commit f766f90

Please sign in to comment.