diff --git a/examples/advanced/random_forest/README.md b/examples/advanced/random_forest/README.md index ff059241f5..4abd6edc70 100644 --- a/examples/advanced/random_forest/README.md +++ b/examples/advanced/random_forest/README.md @@ -97,21 +97,10 @@ By default, CPU based training is used. If the CUDA is installed on the site, tree construction and prediction can be accelerated using GPUs. -GPUs are enabled by using :code:`gpu_hist` as :code:`tree_method` parameter. -For example, -:: - "xgboost_params": { - "max_depth": 8, - "eta": 0.1, - "objective": "binary:logistic", - "eval_metric": "auc", - "tree_method": "gpu_hist", - "gpu_id": 0, - "nthread": 16 - } - -For GPU based training, edit `job_config_gen.sh` to change `TREE_METHOD="hist"` to `TREE_METHOD="gpu_hist"`. -Then run the `job_config_gen.sh` again to generates new job configs for GPU-based training. +In order to enable GPU accelerated training, first ensure that your machine has CUDA installed and has at least one GPU. +In `config_fed_client.json` set `"use_gpus": true` and `"tree_method": "hist"`. +Then, in `FedXGBTreeExecutor` we use the `device` parameter to map each rank to a GPU device ordinal. +If using multiple GPUs, we can map each rank to a different GPU device, however you can also map each rank to the same GPU device if using a single GPU. ## Run experiments After you run the two scripts `data_split_gen.sh` and `jobs_gen.sh`, the experiments can be run with the NVFlare simulator. @@ -162,4 +151,4 @@ AUC over first 1000000 instances is: 0.7828698775310959 AUC over first 1000000 instances is: 0.779952094937354 20_clients_square_split_scaled_lr_split_0.02_subsample AUC over first 1000000 instances is: 0.7825360505137948 -``` \ No newline at end of file +``` diff --git a/examples/advanced/random_forest/jobs_gen.sh b/examples/advanced/random_forest/jobs_gen.sh index f7c14f611d..77f64109f4 100755 --- a/examples/advanced/random_forest/jobs_gen.sh +++ b/examples/advanced/random_forest/jobs_gen.sh @@ -1,6 +1,5 @@ #!/usr/bin/env bash -# change to "gpu_hist" for gpu training TREE_METHOD="hist" DATA_SPLIT_ROOT="/tmp/nvflare/random_forest/HIGGS/data_splits" diff --git a/examples/advanced/random_forest/utils/model_validation.py b/examples/advanced/random_forest/utils/model_validation.py index 7a82fb2fef..6f81bb0faa 100644 --- a/examples/advanced/random_forest/utils/model_validation.py +++ b/examples/advanced/random_forest/utils/model_validation.py @@ -40,7 +40,7 @@ def model_validation_args_parser(): help="Total number of trees", ) parser.add_argument( - "--tree_method", type=str, default="hist", help="tree_method for xgboost - use hist or gpu_hist for best perf" + "--tree_method", type=str, default="hist", help="tree_method for xgboost - use hist for best perf" ) return parser diff --git a/examples/advanced/random_forest/utils/prepare_job_config.py b/examples/advanced/random_forest/utils/prepare_job_config.py index 7acf944a63..d7ad720833 100644 --- a/examples/advanced/random_forest/utils/prepare_job_config.py +++ b/examples/advanced/random_forest/utils/prepare_job_config.py @@ -40,7 +40,7 @@ def job_config_args_parser(): parser.add_argument("--lr_mode", type=str, default="uniform", help="Whether to use uniform or scaled shrinkage") parser.add_argument("--nthread", type=int, default=16, help="nthread for xgboost") parser.add_argument( - "--tree_method", type=str, default="hist", help="tree_method for xgboost - use hist or gpu_hist for best perf" + "--tree_method", type=str, default="hist", help="tree_method for xgboost - use hist for best perf" ) return parser diff --git a/examples/advanced/vertical_xgboost/README.md b/examples/advanced/vertical_xgboost/README.md index e17ff4e1d0..88171d9ff9 100644 --- a/examples/advanced/vertical_xgboost/README.md +++ b/examples/advanced/vertical_xgboost/README.md @@ -2,7 +2,7 @@ This example shows how to use vertical federated learning with [NVIDIA FLARE](https://nvflare.readthedocs.io/en/main/index.html) on tabular data. Here we use the optimized gradient boosting library [XGBoost](https://github.com/dmlc/xgboost) and leverage its federated learning support. -Before starting please make sure you set up a [virtual environment](../../../README.md#set-up-a-virtual-environment) and install the additional requirements: +Before starting please make sure you set up a [virtual environment](../../README.md#set-up-a-virtual-environment) and install the additional requirements: ``` python3 -m pip install -r requirements.txt ``` @@ -30,7 +30,7 @@ Run the following command to prepare the data splits: ### Private Set Intersection (PSI) Since not every site will have the same set of data samples (rows), we can use PSI to compare encrypted versions of the sites' datasets in order to jointly compute the intersection based on common IDs. In this example, the HIGGS dataset does not contain unique identifiers so we add a temporary `uid_{idx}` to each instance and give each site a portion of the HIGGS dataset that includes a common overlap. Afterwards the identifiers are dropped since they are only used for matching, and training is then done on the intersected data. To learn more about our PSI protocol implementation, see our [psi example](../psi/README.md). -> **_NOTE:_** The uid can be a composition of multiple variabes with a transformation, however in this example we use indices for simplicity. PSI can also be used for computing the intersection of overlapping features, but here we give each site unique features. +> **_NOTE:_** The uid can be a composition of multiple variables with a transformation, however in this example we use indices for simplicity. PSI can also be used for computing the intersection of overlapping features, but here we give each site unique features. Create the psi job using the predefined psi_csv template: ``` @@ -58,7 +58,9 @@ Lastly, we must subclass `XGBDataLoader` and implement the `load_data()` method. By default, CPU based training is used. In order to enable GPU accelerated training, first ensure that your machine has CUDA installed and has at least one GPU. -In `config_fed_client.json` set `"use_gpus": true` and `"tree_method": "hist"` in `xgb_params`. Then, in `FedXGBHistogramExecutor` we use the `device` parameter to map each rank to a GPU device ordinal in `xgb_params`. If using multiple GPUs, we can map each rank to a different GPU device, however you can also map each rank to the same GPU device if using a single GPU. +In `config_fed_client.json` set `"use_gpus": true` and `"tree_method": "hist"` in `xgb_params`. +Then, in `FedXGBHistogramExecutor` we use the `device` parameter to map each rank to a GPU device ordinal in `xgb_params`. +If using multiple GPUs, we can map each rank to a different GPU device, however you can also map each rank to the same GPU device if using a single GPU. We can create a GPU enabled job using the job CLI: ``` @@ -87,10 +89,11 @@ The model will be saved to `test.model.json`. ## Results Model accuracy can be visualized in tensorboard: ``` -tensorboard --logdir /tmp/nvflare/vertical_xgb +tensorboard --logdir /tmp/nvflare/vertical_xgb/simulate_job/tb_events ``` -An example training (pink) and validation (orange) AUC graph from running vertical XGBoost on HIGGS. -Used an intersection of 50000 samples across 5 clients each with different features, and ran for ~50 rounds due to early stopping. +An example training (pink) and validation (orange) AUC graph from running vertical XGBoost on HIGGS: +(Used an intersection of 50000 samples across 5 clients each with different features, +and ran for ~50 rounds due to early stopping.) ![Vertical XGBoost graph](./figs/vertical_xgboost_graph.png) diff --git a/examples/advanced/xgboost/README.md b/examples/advanced/xgboost/README.md index d9f073659f..c0df82b8e1 100644 --- a/examples/advanced/xgboost/README.md +++ b/examples/advanced/xgboost/README.md @@ -139,7 +139,9 @@ By default, CPU based training is used. If the CUDA is installed on the site, tree construction and prediction can be accelerated using GPUs. -To enable GPU accelerated training, in `config_fed_client.json` set `"use_gpus": true` and `"tree_method": "hist"`. Then, in `FedXGBHistogramExecutor` we use the `device` parameter to map each rank to a GPU device ordinal in `xgb_params`. For a single GPU, assuming it has enough memory, we can map each rank to the same device with `params["device"] = f"cuda:0"`. +To enable GPU accelerated training, in `config_fed_client.json` set `"use_gpus": true` and `"tree_method": "hist"`. +Then, in `FedXGBHistogramExecutor` we use the `device` parameter to map each rank to a GPU device ordinal in `xgb_params`. +For a single GPU, assuming it has enough memory, we can map each rank to the same device with `params["device"] = f"cuda:0"`. ### Multi GPU support diff --git a/examples/advanced/xgboost/histogram-based/README.md b/examples/advanced/xgboost/histogram-based/README.md index 5160582693..ba92e787ef 100644 --- a/examples/advanced/xgboost/histogram-based/README.md +++ b/examples/advanced/xgboost/histogram-based/README.md @@ -11,15 +11,21 @@ Switch to this directory and install additional requirements (suggest to do this python3 -m pip install -r requirements.txt ``` +### Run centralized experiments +``` +bash run_experiment_centralized.sh +``` + ### Run federated experiments with simulator locally Next, we will use the NVFlare simulator to run FL training automatically. ``` -bash run_experiment_simulator.sh +nvflare simulator jobs/higgs_2_histogram_v2_uniform_split_uniform_lr \ + -w /tmp/nvflare/xgboost_v2_workspace -n 2 -t 2 ``` -### Run centralized experiments +Model accuracy can be visualized in tensorboard: ``` -bash run_experiment_centralized.sh +tensorboard --logdir /tmp/nvflare/xgboost_v2_workspace/simulate_job/tb_events ``` ### Run federated experiments in real world @@ -51,4 +57,21 @@ The custom executor can inherit the base class `FedXGBHistogramExecutor` and overwrite the `xgb_train()` method. To use other dataset, can inherit the base class `XGBDataLoader` and -implement that `load_data()` method. +implement the `load_data()` method. + +## Loose integration + +We can use the NVFlare controller/executor just to launch the external xgboost +federated server and client. + +### Run federated experiments with simulator locally +Next, we will use the NVFlare simulator to run FL training automatically. +``` +nvflare simulator jobs/higgs_2_histogram_uniform_split_uniform_lr \ + -w /tmp/nvflare/xgboost_workspace -n 2 -t 2 +``` + +Model accuracy can be visualized in tensorboard: +``` +tensorboard --logdir /tmp/nvflare/xgboost_workspace/simulate_job/tb_events +``` diff --git a/examples/advanced/xgboost/histogram-based/jobs/base/app/config/config_fed_client.json b/examples/advanced/xgboost/histogram-based/jobs/base/app/config/config_fed_client.json index c142310a3f..52697b10fe 100755 --- a/examples/advanced/xgboost/histogram-based/jobs/base/app/config/config_fed_client.json +++ b/examples/advanced/xgboost/histogram-based/jobs/base/app/config/config_fed_client.json @@ -13,6 +13,7 @@ "data_loader_id": "dataloader", "num_rounds": "{num_rounds}", "early_stopping_rounds": 2, + "metrics_writer_id": "metrics_writer", "xgb_params": { "max_depth": 8, "eta": 0.1, @@ -34,6 +35,16 @@ "args": { "data_split_filename": "data_split.json" } + }, + { + "id": "metrics_writer", + "path": "nvflare.app_opt.tracking.tb.tb_writer.TBWriter", + "args": {"event_type": "analytix_log_stats"} + }, + { + "id": "event_to_fed", + "name": "ConvertToFedEvent", + "args": {"events_to_convert": ["analytix_log_stats"], "fed_event_prefix": "fed."} } ] } diff --git a/examples/advanced/xgboost/histogram-based/jobs/base/app/config/config_fed_server.json b/examples/advanced/xgboost/histogram-based/jobs/base/app/config/config_fed_server.json index c759f3a703..9814f32e2c 100755 --- a/examples/advanced/xgboost/histogram-based/jobs/base/app/config/config_fed_server.json +++ b/examples/advanced/xgboost/histogram-based/jobs/base/app/config/config_fed_server.json @@ -2,7 +2,15 @@ "format_version": 2, "task_data_filters": [], "task_result_filters": [], - "components": [], + "components": [ + { + "id": "tb_receiver", + "path": "nvflare.app_opt.tracking.tb.tb_receiver.TBAnalyticsReceiver", + "args": { + "tb_folder": "tb_events" + } + } + ], "workflows": [ { "id": "xgb_controller", diff --git a/examples/advanced/xgboost/histogram-based/jobs/base_v2/app/config/config_fed_client.json b/examples/advanced/xgboost/histogram-based/jobs/base_v2/app/config/config_fed_client.json index 337ddd84ca..1dd56f3b26 100755 --- a/examples/advanced/xgboost/histogram-based/jobs/base_v2/app/config/config_fed_client.json +++ b/examples/advanced/xgboost/histogram-based/jobs/base_v2/app/config/config_fed_client.json @@ -11,6 +11,7 @@ "path": "nvflare.app_opt.xgboost.histogram_based_v2.executor.FedXGBHistogramExecutor", "args": { "data_loader_id": "dataloader", + "metrics_writer_id": "metrics_writer", "early_stopping_rounds": 2, "xgb_params": { "max_depth": 8, @@ -33,6 +34,16 @@ "args": { "data_split_filename": "data_split.json" } + }, + { + "id": "metrics_writer", + "path": "nvflare.app_opt.tracking.tb.tb_writer.TBWriter", + "args": {"event_type": "analytix_log_stats"} + }, + { + "id": "event_to_fed", + "name": "ConvertToFedEvent", + "args": {"events_to_convert": ["analytix_log_stats"], "fed_event_prefix": "fed."} } ] } diff --git a/examples/advanced/xgboost/histogram-based/jobs/base_v2/app/config/config_fed_server.json b/examples/advanced/xgboost/histogram-based/jobs/base_v2/app/config/config_fed_server.json index 7f92707d78..6ed5edf3ac 100755 --- a/examples/advanced/xgboost/histogram-based/jobs/base_v2/app/config/config_fed_server.json +++ b/examples/advanced/xgboost/histogram-based/jobs/base_v2/app/config/config_fed_server.json @@ -3,7 +3,15 @@ "num_rounds": 100, "task_data_filters": [], "task_result_filters": [], - "components": [], + "components": [ + { + "id": "tb_receiver", + "path": "nvflare.app_opt.tracking.tb.tb_receiver.TBAnalyticsReceiver", + "args": { + "tb_folder": "tb_events" + } + } + ], "workflows": [ { "id": "xgb_controller", diff --git a/examples/advanced/xgboost/histogram-based/xgboost_histogram_higgs.ipynb b/examples/advanced/xgboost/histogram-based/xgboost_histogram_higgs.ipynb index 833a688e7a..1aa747b74d 100644 --- a/examples/advanced/xgboost/histogram-based/xgboost_histogram_higgs.ipynb +++ b/examples/advanced/xgboost/histogram-based/xgboost_histogram_higgs.ipynb @@ -138,7 +138,7 @@ "outputs": [], "source": [ "%load_ext tensorboard\n", - "%tensorboard --logdir /tmp/nvflare/workspaces/xgboost_workspace_5_histogram_uniform_split_uniform_lr" + "%tensorboard --logdir /tmp/nvflare/workspaces/xgboost_workspace_5_histogram_uniform_split_uniform_lr/simulate_job/tb_events" ] } ], diff --git a/examples/advanced/xgboost/prepare_job_config.sh b/examples/advanced/xgboost/prepare_job_config.sh index e04e6e589c..89161f00ed 100755 --- a/examples/advanced/xgboost/prepare_job_config.sh +++ b/examples/advanced/xgboost/prepare_job_config.sh @@ -1,5 +1,4 @@ #!/usr/bin/env bash -# change to "gpu_hist" for gpu training TREE_METHOD="hist" prepare_job_config() { diff --git a/examples/advanced/xgboost/tree-based/README.md b/examples/advanced/xgboost/tree-based/README.md index a447980e79..ddcb545d09 100644 --- a/examples/advanced/xgboost/tree-based/README.md +++ b/examples/advanced/xgboost/tree-based/README.md @@ -16,7 +16,7 @@ In addition to basic uniform shrinkage setting where all clients have the same l ## Run automated experiments Please make sure to finish the [preparation steps](../README.md) before running the following steps. -To run all of the experiments in this example with NVFlare, follow the steps below. To try out a single experiment, follow this [notebook](./xgboost_tree_higgs.ipynb). +To run all experiments in this example with NVFlare, follow the steps below. To try out a single experiment, follow this [notebook](./xgboost_tree_higgs.ipynb). ### Environment Preparation diff --git a/examples/advanced/xgboost/utils/prepare_job_config.py b/examples/advanced/xgboost/utils/prepare_job_config.py index 33523081a4..7390453f99 100644 --- a/examples/advanced/xgboost/utils/prepare_job_config.py +++ b/examples/advanced/xgboost/utils/prepare_job_config.py @@ -50,7 +50,7 @@ def job_config_args_parser(): parser.add_argument("--lr_mode", type=str, default="uniform", help="Whether to use uniform or scaled shrinkage") parser.add_argument("--nthread", type=int, default=16, help="nthread for xgboost") parser.add_argument( - "--tree_method", type=str, default="hist", help="tree_method for xgboost - use hist or gpu_hist for best perf" + "--tree_method", type=str, default="hist", help="tree_method for xgboost - use hist for best perf" ) return parser diff --git a/job_templates/vertical_xgb/config_fed_client.conf b/job_templates/vertical_xgb/config_fed_client.conf index fceab1d275..72dff673b9 100644 --- a/job_templates/vertical_xgb/config_fed_client.conf +++ b/job_templates/vertical_xgb/config_fed_client.conf @@ -7,7 +7,7 @@ executors = [ executor { # Federated XGBoost Executor for histogram-base collaboration id = "xgb_hist_executor" - name = "FedXGBHistogramExecutor" + path = "nvflare.app_opt.xgboost.histogram_based.executor.FedXGBHistogramExecutor" args { num_rounds = 100 early_stopping_rounds = 2 @@ -23,6 +23,8 @@ executors = [ data_loader_id = "dataloader" # whether to enable GPU training use_gpus = false + metrics_writer_id = "metrics_writer" + model_file_name = "test.model.json" } } } @@ -47,4 +49,19 @@ components = [ train_proportion = 0.8 } } + { + id = "metrics_writer" + path = "nvflare.app_opt.tracking.tb.tb_writer.TBWriter" + args { + event_type = "analytix_log_stats" + } + } + { + id = "event_to_fed" + name = "ConvertToFedEvent" + args { + events_to_convert = ["analytix_log_stats"] + fed_event_prefix = "fed." + } + } ] diff --git a/job_templates/vertical_xgb/config_fed_server.conf b/job_templates/vertical_xgb/config_fed_server.conf index 07acd76846..45f9f67a6a 100644 --- a/job_templates/vertical_xgb/config_fed_server.conf +++ b/job_templates/vertical_xgb/config_fed_server.conf @@ -1,7 +1,4 @@ format_version = 2 -server { - heart_beat_timeout = 600 -} task_data_filters = [] task_result_filters = [] workflows = [ @@ -13,4 +10,12 @@ workflows = [ } } ] -components = [] +components = [ + { + id = "tb_receiver" + path = "nvflare.app_opt.tracking.tb.tb_receiver.TBAnalyticsReceiver" + args { + tb_folder = tb_events + } + } +] diff --git a/nvflare/app_common/tracking/log_writer.py b/nvflare/app_common/tracking/log_writer.py index c20d62bb20..fa7162d3e4 100644 --- a/nvflare/app_common/tracking/log_writer.py +++ b/nvflare/app_common/tracking/log_writer.py @@ -13,7 +13,9 @@ # limitations under the License. from abc import ABC, abstractmethod +from typing import Optional +from nvflare.apis.analytix import AnalyticsDataType from nvflare.apis.event_type import EventType from nvflare.apis.fl_component import FLComponent from nvflare.apis.fl_context import FLContext @@ -41,6 +43,23 @@ def handle_event(self, event_type: str, fl_ctx: FLContext): self.sender = AnalyticsSender(self.event_type, self.get_writer_name()) self.sender.engine = engine + def write(self, tag: str, value, data_type: AnalyticsDataType, global_step: Optional[int] = None, **kwargs): + """Writes a record. + + Args: + tag (str): Tag name + value: Value to send + data_type (AnalyticsDataType): Data type of the value being sent + global_step (optional, int): Global step value. + + Raises: + TypeError: global_step must be an int + """ + self.sender.add(tag=tag, value=value, data_type=data_type, global_step=global_step, **kwargs) + @abstractmethod def get_writer_name(self) -> LogWriterName: pass + + def get_default_metric_data_type(self) -> AnalyticsDataType: + return AnalyticsDataType.METRICS diff --git a/nvflare/app_opt/tracking/mlflow/mlflow_writer.py b/nvflare/app_opt/tracking/mlflow/mlflow_writer.py index 82433c7320..751254eb90 100644 --- a/nvflare/app_opt/tracking/mlflow/mlflow_writer.py +++ b/nvflare/app_opt/tracking/mlflow/mlflow_writer.py @@ -37,6 +37,9 @@ def get_writer_name(self) -> LogWriterName: """Returns "MLFLOW".""" return LogWriterName.MLFLOW + def get_default_metric_data_type(self) -> AnalyticsDataType: + return AnalyticsDataType.METRICS + def log_param(self, key: str, value: any) -> None: """Log a parameter (e.g. model hyperparameter) under the current run. @@ -49,7 +52,7 @@ def log_param(self, key: str, value: any) -> None: All backend stores support values up to length 500, but some may support larger values. """ - self.sender.add(tag=key, value=value, data_type=AnalyticsDataType.PARAMETER) + self.write(tag=key, value=value, data_type=AnalyticsDataType.PARAMETER) def log_params(self, values: dict) -> None: """Log a batch of params for the current run. @@ -57,7 +60,7 @@ def log_params(self, values: dict) -> None: Args: values (dict): Dictionary of param_name: String -> value: (String, but will be string-ified if not) """ - self.sender.add(tag="params", value=values, data_type=AnalyticsDataType.PARAMETERS) + self.write(tag="params", value=values, data_type=AnalyticsDataType.PARAMETERS) def log_metric(self, key: str, value: float, step: Optional[int] = None) -> None: """Log a metric under the current run. @@ -72,7 +75,7 @@ def log_metric(self, key: str, value: float, step: Optional[int] = None) -> None support larger values. step (int, optional): Metric step. Defaults to zero if unspecified. """ - self.sender.add(tag=key, value=value, data_type=AnalyticsDataType.METRIC, global_step=step) + self.write(tag=key, value=value, data_type=AnalyticsDataType.METRIC, global_step=step) def log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None) -> None: """Log multiple metrics for the current run. @@ -84,7 +87,7 @@ def log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None) -> step (int, optional): A single integer step at which to log the specified Metrics. If unspecified, each metric is logged at step zero. """ - self.sender.add(tag="metrics", value=metrics, data_type=AnalyticsDataType.METRICS, global_step=step) + self.write(tag="metrics", value=metrics, data_type=AnalyticsDataType.METRICS, global_step=step) def log_text(self, text: str, artifact_file_path: str) -> None: """Log text as an artifact under the current run. @@ -94,7 +97,7 @@ def log_text(self, text: str, artifact_file_path: str) -> None: artifact_file_path (str): The run-relative artifact file path in posixpath format to which the text is saved (e.g. “dir/file.txt”). """ - self.sender.add(tag="text", value=text, data_type=AnalyticsDataType.TEXT, path=artifact_file_path) + self.write(tag="text", value=text, data_type=AnalyticsDataType.TEXT, path=artifact_file_path) def set_tag(self, key: str, tag: any) -> None: """Set a tag under the current run. @@ -105,7 +108,7 @@ def set_tag(self, key: str, tag: any) -> None: All backend stores will support values up to length 5000, but some may support larger values. """ - self.sender.add(tag=key, value=tag, data_type=AnalyticsDataType.TAG) + self.write(tag=key, value=tag, data_type=AnalyticsDataType.TAG) def set_tags(self, tags: dict) -> None: """Log a batch of tags for the current run. @@ -114,4 +117,4 @@ def set_tags(self, tags: dict) -> None: tags (dict): Dictionary of tag_name: String -> value: (String, but will be string-ified if not) """ - self.sender.add(tag="tags", value=tags, data_type=AnalyticsDataType.TAGS) + self.write(tag="tags", value=tags, data_type=AnalyticsDataType.TAGS) diff --git a/nvflare/app_opt/tracking/tb/tb_writer.py b/nvflare/app_opt/tracking/tb/tb_writer.py index 4eabf45b99..a60ad45d0a 100644 --- a/nvflare/app_opt/tracking/tb/tb_writer.py +++ b/nvflare/app_opt/tracking/tb/tb_writer.py @@ -32,6 +32,9 @@ def __init__(self, event_type=ANALYTIC_EVENT_TYPE): def get_writer_name(self) -> LogWriterName: return LogWriterName.TORCH_TB + def get_default_metric_data_type(self) -> AnalyticsDataType: + return AnalyticsDataType.SCALARS + def add_scalar(self, tag: str, scalar: float, global_step: Optional[int] = None, **kwargs): """Sends a scalar. @@ -41,7 +44,7 @@ def add_scalar(self, tag: str, scalar: float, global_step: Optional[int] = None, global_step (optional, int): Global step value. **kwargs: Additional arguments to pass to the receiver side. """ - self.sender.add(tag=tag, value=scalar, data_type=AnalyticsDataType.SCALAR, global_step=global_step, **kwargs) + self.write(tag=tag, value=scalar, data_type=AnalyticsDataType.SCALAR, global_step=global_step, **kwargs) def add_scalars(self, tag: str, scalars: dict, global_step: Optional[int] = None, **kwargs): """Sends scalars. @@ -52,7 +55,7 @@ def add_scalars(self, tag: str, scalars: dict, global_step: Optional[int] = None global_step (optional, int): Global step value. **kwargs: Additional arguments to pass to the receiver side. """ - self.sender.add(tag=tag, value=scalars, data_type=AnalyticsDataType.SCALARS, global_step=global_step, **kwargs) + self.write(tag=tag, value=scalars, data_type=AnalyticsDataType.SCALARS, global_step=global_step, **kwargs) def flush(self): """Flushes out the message. diff --git a/nvflare/app_opt/tracking/wandb/wandb_writer.py b/nvflare/app_opt/tracking/wandb/wandb_writer.py index 7dc5104fa5..e1f496bfc1 100644 --- a/nvflare/app_opt/tracking/wandb/wandb_writer.py +++ b/nvflare/app_opt/tracking/wandb/wandb_writer.py @@ -28,6 +28,9 @@ def get_writer_name(self) -> LogWriterName: """Returns "WEIGHTS_AND_BIASES".""" return LogWriterName.WANDB + def get_default_metric_data_type(self) -> AnalyticsDataType: + return AnalyticsDataType.METRICS + def log(self, metrics: Dict[str, float], step: Optional[int] = None): """Log multiple metrics for the current run. @@ -35,4 +38,4 @@ def log(self, metrics: Dict[str, float], step: Optional[int] = None): metrics (Dict[str, float]): Dictionary of metric_name of type String to Float values. step (int, optional): A single integer step at which to log the specified Metrics. """ - self.sender.add(tag="metrics", value=metrics, data_type=AnalyticsDataType.METRICS, global_step=step) + self.write(tag="metrics", value=metrics, data_type=AnalyticsDataType.METRICS, global_step=step) diff --git a/nvflare/app_opt/xgboost/histogram_based/executor.py b/nvflare/app_opt/xgboost/histogram_based/executor.py index d9233974c9..f6ef0e57b8 100644 --- a/nvflare/app_opt/xgboost/histogram_based/executor.py +++ b/nvflare/app_opt/xgboost/histogram_based/executor.py @@ -26,14 +26,17 @@ from nvflare.apis.signal import Signal from nvflare.apis.workspace import Workspace from nvflare.app_common.app_constant import AppConstants +from nvflare.app_common.tracking.log_writer import LogWriter from nvflare.app_opt.xgboost.data_loader import XGBDataLoader from nvflare.app_opt.xgboost.histogram_based.constants import XGB_TRAIN_TASK, XGBShareableHeader -from nvflare.fuel.utils.import_utils import optional_import +from nvflare.app_opt.xgboost.metrics_cb import MetricsCallback from nvflare.security.logging import secure_format_exception, secure_log_traceback class XGBoostParams: - def __init__(self, xgb_params: dict, num_rounds=10, early_stopping_rounds=2, verbose_eval=False): + def __init__( + self, xgb_params: dict, num_rounds: int = 10, early_stopping_rounds: int = 2, verbose_eval: bool = False + ): """Container for all XGBoost parameters. Args: @@ -48,25 +51,6 @@ def __init__(self, xgb_params: dict, num_rounds=10, early_stopping_rounds=2, ver self.xgb_params: dict = xgb_params if xgb_params else {} -class TensorBoardCallback(xgb.callback.TrainingCallback): - def __init__(self, app_dir: str, tensorboard): - self.train_writer = tensorboard.SummaryWriter(log_dir=os.path.join(app_dir, "train-auc/")) - self.val_writer = tensorboard.SummaryWriter(log_dir=os.path.join(app_dir, "val-auc/")) - - def after_iteration(self, model, epoch: int, evals_log: xgb.callback.TrainingCallback.EvalsLog): - if not evals_log: - return False - - for data, metric in evals_log.items(): - for metric_name, log in metric.items(): - score = log[-1][0] if isinstance(log[-1], tuple) else log[-1] - if data == "train": - self.train_writer.add_scalar(metric_name, score, epoch) - else: - self.val_writer.add_scalar(metric_name, score, epoch) - return False - - class FedXGBHistogramExecutor(Executor): """Federated XGBoost Executor Spec for histogram-base collaboration. @@ -81,6 +65,8 @@ def __init__( data_loader_id: str, verbose_eval=False, use_gpus=False, + metrics_writer_id: str = None, + model_file_name="test.model.json", ): """Federated XGBoost Executor for histogram-base collaboration. @@ -97,14 +83,17 @@ def __init__( data_loader_id: the ID points to XGBDataLoader. verbose_eval: verbose_eval in xgboost.train use_gpus: flag to enable gpu training + metrics_writer_id: the ID points to a LogWriter, if provided, a MetricsCallback will be added. + Users can then use the receivers from nvflare.app_opt.tracking. + model_file_name (str): where to save the model. """ super().__init__() - self.app_dir = None self.num_rounds = num_rounds self.early_stopping_rounds = early_stopping_rounds self.xgb_params = xgb_params self.data_loader_id = data_loader_id + self.data_loader = None self.verbose_eval = verbose_eval self.use_gpus = use_gpus @@ -117,6 +106,10 @@ def __init__( self._server_address = "localhost" self.train_data = None self.val_data = None + self.model_file_name = model_file_name + + self._metrics_writer_id = metrics_writer_id + self._metrics_writer = None def initialize(self, fl_ctx): self.client_id = fl_ctx.get_identity_name() @@ -124,13 +117,16 @@ def initialize(self, fl_ctx): self.log_info(fl_ctx, f"server address is {self._server_address}") engine = fl_ctx.get_engine() - ws = engine.get_workspace() - self.app_dir = ws.get_app_dir(fl_ctx.get_job_id()) self.data_loader = engine.get_component(self.data_loader_id) if not isinstance(self.data_loader, XGBDataLoader): self.system_panic("data_loader should be type XGBDataLoader", fl_ctx) + if self._metrics_writer_id: + self._metrics_writer = engine.get_component(self._metrics_writer_id) + if not isinstance(self._metrics_writer, LogWriter): + self.system_panic("writer should be type LogWriter", fl_ctx) + def xgb_train(self, params: XGBoostParams) -> xgb.core.Booster: """XGBoost training logic. @@ -148,9 +144,9 @@ def xgb_train(self, params: XGBoostParams) -> xgb.core.Booster: watchlist = [(dval, "eval"), (dtrain, "train")] callbacks = [callback.EvaluationMonitor(rank=self.rank)] - tensorboard, flag = optional_import(module="torch.utils.tensorboard") - if flag and self.app_dir: - callbacks.append(TensorBoardCallback(self.app_dir, tensorboard)) + + if self._metrics_writer: + callbacks.append(MetricsCallback(self._metrics_writer)) # Run training, all the features in training API is available. bst = xgb.train( @@ -288,7 +284,7 @@ def train(self, shareable: Shareable, fl_ctx: FLContext, abort_signal: Signal) - workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT) run_number = fl_ctx.get_prop(FLContextKey.CURRENT_RUN) run_dir = workspace.get_run_dir(run_number) - bst.save_model(os.path.join(run_dir, "test.model.json")) + bst.save_model(os.path.join(run_dir, self.model_file_name)) xgb.collective.communicator_print("Finished training\n") except Exception as e: secure_log_traceback() diff --git a/nvflare/app_opt/xgboost/histogram_based_v2/adaptors/grpc_client_adaptor.py b/nvflare/app_opt/xgboost/histogram_based_v2/adaptors/grpc_client_adaptor.py index 5e046ec1c3..7ec279b70c 100644 --- a/nvflare/app_opt/xgboost/histogram_based_v2/adaptors/grpc_client_adaptor.py +++ b/nvflare/app_opt/xgboost/histogram_based_v2/adaptors/grpc_client_adaptor.py @@ -99,7 +99,6 @@ def __init__( self.internal_server_addr = None self._training_stopped = False self._client_name = None - self._app_dir = None self._workspace = None self._run_dir = None self._process = None @@ -107,9 +106,6 @@ def __init__( def initialize(self, fl_ctx: FLContext): self._client_name = fl_ctx.get_identity_name() - engine = fl_ctx.get_engine() - ws = engine.get_workspace() - self._app_dir = ws.get_app_dir(fl_ctx.get_job_id()) self._workspace = fl_ctx.get_prop(FLContextKey.WORKSPACE_OBJECT) run_number = fl_ctx.get_prop(FLContextKey.CURRENT_RUN) self._run_dir = self._workspace.get_run_dir(run_number) @@ -133,7 +129,6 @@ class since the self object contains a sender that contains a Core Cell which ca Constant.RUNNER_CTX_RANK: self.rank, Constant.RUNNER_CTX_NUM_ROUNDS: self.num_rounds, Constant.RUNNER_CTX_MODEL_DIR: self._run_dir, - Constant.RUNNER_CTX_TB_DIR: self._app_dir, } starter = _ClientStarter(self.xgb_runner) self.logger.info(f"starting XGB client with {ctx=}") diff --git a/nvflare/app_opt/xgboost/histogram_based_v2/defs.py b/nvflare/app_opt/xgboost/histogram_based_v2/defs.py index 9d6472ec68..dd0d440220 100644 --- a/nvflare/app_opt/xgboost/histogram_based_v2/defs.py +++ b/nvflare/app_opt/xgboost/histogram_based_v2/defs.py @@ -83,5 +83,4 @@ class Constant: RUNNER_CTX_NUM_ROUNDS = "num_rounds" RUNNER_CTX_WORLD_SIZE = "world_size" RUNNER_CTX_RANK = "rank" - RUNNER_CTX_TB_DIR = "tb_dir" RUNNER_CTX_MODEL_DIR = "model_dir" diff --git a/nvflare/app_opt/xgboost/histogram_based_v2/executor.py b/nvflare/app_opt/xgboost/histogram_based_v2/executor.py index 18d99d44c5..4411822a5d 100644 --- a/nvflare/app_opt/xgboost/histogram_based_v2/executor.py +++ b/nvflare/app_opt/xgboost/histogram_based_v2/executor.py @@ -29,6 +29,7 @@ def __init__( int_server_grpc_options=None, req_timeout=100.0, model_file_name="model.json", + metrics_writer_id: str = None, in_process=True, ): XGBExecutor.__init__( @@ -44,6 +45,7 @@ def __init__( self.int_server_grpc_options = int_server_grpc_options self.model_file_name = model_file_name self.in_process = in_process + self.metrics_writer_id = metrics_writer_id def get_adaptor(self, fl_ctx: FLContext): runner = XGBClientRunner( @@ -53,6 +55,7 @@ def get_adaptor(self, fl_ctx: FLContext): verbose_eval=self.verbose_eval, use_gpus=self.use_gpus, model_file_name=self.model_file_name, + metrics_writer_id=self.metrics_writer_id, ) runner.initialize(fl_ctx) adaptor = GrpcClientAdaptor( diff --git a/nvflare/app_opt/xgboost/histogram_based_v2/runners/client_runner.py b/nvflare/app_opt/xgboost/histogram_based_v2/runners/client_runner.py index c1c0937ddc..5d1affa015 100644 --- a/nvflare/app_opt/xgboost/histogram_based_v2/runners/client_runner.py +++ b/nvflare/app_opt/xgboost/histogram_based_v2/runners/client_runner.py @@ -20,15 +20,18 @@ from nvflare.apis.fl_component import FLComponent from nvflare.apis.fl_context import FLContext +from nvflare.app_common.tracking.log_writer import LogWriter from nvflare.app_opt.xgboost.data_loader import XGBDataLoader from nvflare.app_opt.xgboost.histogram_based_v2.defs import Constant from nvflare.app_opt.xgboost.histogram_based_v2.runner import XGBRunner -from nvflare.fuel.utils.import_utils import optional_import +from nvflare.app_opt.xgboost.metrics_cb import MetricsCallback from nvflare.fuel.utils.obj_utils import get_logger class XGBoostParams: - def __init__(self, xgb_params: dict, num_rounds=10, early_stopping_rounds=2, verbose_eval=False): + def __init__( + self, xgb_params: dict, num_rounds: int = 10, early_stopping_rounds: int = 2, verbose_eval: bool = False + ): """Container for all XGBoost parameters. Args: @@ -43,36 +46,31 @@ def __init__(self, xgb_params: dict, num_rounds=10, early_stopping_rounds=2, ver self.xgb_params: dict = xgb_params if xgb_params else {} -class TensorBoardCallback(xgb.callback.TrainingCallback): - def __init__(self, app_dir: str, tensorboard): - super().__init__() - self.train_writer = tensorboard.SummaryWriter(log_dir=os.path.join(app_dir, "train-auc/")) - self.val_writer = tensorboard.SummaryWriter(log_dir=os.path.join(app_dir, "val-auc/")) - - def after_iteration(self, model, epoch: int, evals_log: xgb.callback.TrainingCallback.EvalsLog): - if not evals_log: - return False - - for data, metric in evals_log.items(): - for metric_name, log in metric.items(): - score = log[-1][0] if isinstance(log[-1], tuple) else log[-1] - if data == "train": - self.train_writer.add_scalar(metric_name, score, epoch) - else: - self.val_writer.add_scalar(metric_name, score, epoch) - return False - - class XGBClientRunner(XGBRunner, FLComponent): def __init__( self, data_loader_id: str, early_stopping_rounds: int, xgb_params: dict, - verbose_eval, - use_gpus, + verbose_eval: bool, + use_gpus: bool, model_file_name: str, + metrics_writer_id: str = None, ): + """Constructor. + + Args: + early_stopping_rounds: early stopping rounds + xgb_params: This dict is passed to `xgboost.train()` as the first argument `params`. + It contains all the Booster parameters. + Please refer to XGBoost documentation for details: + https://xgboost.readthedocs.io/en/stable/python/python_api.html#module-xgboost.training + data_loader_id: the ID points to XGBDataLoader. + verbose_eval: verbose_eval in xgboost.train + use_gpus: flag to enable gpu training + metrics_writer_id: the ID points to a LogWriter, if provided, a MetricsCallback will be added. + Users can then use the receivers from nvflare.app_opt.tracking. + """ FLComponent.__init__(self) self.early_stopping_rounds = early_stopping_rounds self.xgb_params = xgb_params @@ -88,9 +86,10 @@ def __init__( self._num_rounds = None self._server_addr = None self._data_loader = None - self._tb_dir = None self._model_dir = None self._stopped = False + self._metrics_writer_id = metrics_writer_id + self._metrics_writer = None def initialize(self, fl_ctx: FLContext): engine = fl_ctx.get_engine() @@ -98,6 +97,11 @@ def initialize(self, fl_ctx: FLContext): if not isinstance(self._data_loader, XGBDataLoader): self.system_panic(f"data_loader should be type XGBDataLoader but got {type(self._data_loader)}", fl_ctx) + if self._metrics_writer_id: + self._metrics_writer = engine.get_component(self._metrics_writer_id) + if not isinstance(self._metrics_writer, LogWriter): + self.system_panic("writer should be type LogWriter", fl_ctx) + def xgb_train( self, params: XGBoostParams, train_data: xgb.core.DMatrix, val_data: xgb.core.DMatrix ) -> xgb.core.Booster: @@ -118,9 +122,8 @@ def xgb_train( watchlist = [(val_data, "eval"), (train_data, "train")] callbacks = [callback.EvaluationMonitor(rank=self._rank)] - tensorboard, flag = optional_import(module="torch.utils.tensorboard") - if flag and self._tb_dir: - callbacks.append(TensorBoardCallback(self._tb_dir, tensorboard)) + if self._metrics_writer: + callbacks.append(MetricsCallback(self._metrics_writer)) # Run training, all the features in training API is available. bst = xgb.train( @@ -140,7 +143,6 @@ def run(self, ctx: dict): self._world_size = ctx.get(Constant.RUNNER_CTX_WORLD_SIZE) self._num_rounds = ctx.get(Constant.RUNNER_CTX_NUM_ROUNDS) self._server_addr = ctx.get(Constant.RUNNER_CTX_SERVER_ADDR) - self._tb_dir = ctx.get(Constant.RUNNER_CTX_TB_DIR) self._model_dir = ctx.get(Constant.RUNNER_CTX_MODEL_DIR) if self.use_gpus: diff --git a/nvflare/app_opt/xgboost/metrics_cb.py b/nvflare/app_opt/xgboost/metrics_cb.py new file mode 100644 index 0000000000..2f6b421f12 --- /dev/null +++ b/nvflare/app_opt/xgboost/metrics_cb.py @@ -0,0 +1,38 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import xgboost.callback + +from nvflare.app_common.tracking.log_writer import LogWriter + + +class MetricsCallback(xgboost.callback.TrainingCallback): + def __init__(self, writer: LogWriter): + xgboost.callback.TrainingCallback.__init__(self) + if not isinstance(writer, LogWriter): + raise RuntimeError("MetricsCallback: writer is not valid.") + self.writer = writer + + def after_iteration(self, model, epoch: int, evals_log: xgboost.callback.TrainingCallback.EvalsLog): + if not evals_log: + return False + + data_type = self.writer.get_default_metric_data_type() + for data, metric in evals_log.items(): + record = {} + for metric_name, log in metric.items(): + score = log[-1][0] if isinstance(log[-1], tuple) else log[-1] + record[metric_name] = score + self.writer.write(tag=f"{data}_metrics", value=record, data_type=data_type, global_step=epoch) + return False