diff --git a/extensions/rapids_notebook_files.py b/extensions/rapids_notebook_files.py index 5d867258..eef81d94 100644 --- a/extensions/rapids_notebook_files.py +++ b/extensions/rapids_notebook_files.py @@ -28,7 +28,7 @@ def walk_files(app, dir, outdir): with open(str(outdir / page.name), "w") as writer: writer.write( re.sub( - r"\{\{.*?\}\}", + r"(?, 'serialize': , 'version': '10', 'latest_version': None, 'conda_file': None, 'image': None, 'build': , 'inference_config': None, 'os_type': 'Linux', 'arm_type': 'environment_version', 'conda_file_path': None, 'path': None, 'datastore': None, 'upload_hash': None, 'translated_conda_file': None})" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# RUN THIS CODE ONCE TO SETUP ENVIRONMENT\n", + "from azure.ai.ml.entities import Environment, BuildContext\n", + "\n", + "env_docker_image = Environment(\n", + " build=BuildContext(path=os.getcwd()),\n", + " name=\"rapids-mlflow\",\n", + " description=\"RAPIDS environment with azureml-mlflow\",\n", + ")\n", + "\n", + "ml_client.environments.create_or_update(env_docker_image)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Submit the training job " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We will configure and run a training job using the`command`class. The [command](https://learn.microsoft.com/en-us/python/api/azure-ai-ml/azure.ai.ml?view=azure-python#azure-ai-ml-command) can be used to run standalone jobs or as a function inside pipelines.\n", + "`inputs` is a dictionary of command-line arguments to pass to the training script.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": { + "tags": [ + "library/randomforest", + "library/cudf" + ] + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.\n", + "Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.\n", + "Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.\n", + "Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.\n", + "Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.\n", + "Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.\n", + "\u001b[32mUploading code (0.33 MBs): 100%|██████████| 327210/327210 [00:00<00:00, 1802654.05it/s]\n", + "\u001b[39m\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "'https://ml.azure.com/runs/zen_eye_lm7dcp68jz?wsid=/subscriptions/fc4f4a6b-4041-4b1c-8249-854d68edcf62/resourcegroups/rapidsai-deployment/workspaces/rapids-aml-cluster&tid=43083d15-7273-40c1-b7db-39efd9ccc17a'" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from azure.ai.ml import command, Input\n", + "\n", + "\n", + "command_job = command(\n", + " environment=\"rapids-mlflow:1\",\n", + " experiment_name=experiment_name,\n", + " code=os.getcwd(),\n", + " inputs={\n", + " \"data_dir\": Input(type=\"uri_file\", path=data_uri),\n", + " \"n_bins\": 32,\n", + " \"compute\": \"single-GPU\", # multi-GPU for algorithms via Dask\n", + " \"cv_folds\": 5,\n", + " \"n_estimators\": 100,\n", + " \"max_depth\": 6,\n", + " \"max_features\": 0.3,\n", + " },\n", + " command=\"python train_rapids.py --data_dir ${{inputs.data_dir}} --n_bins ${{inputs.n_bins}} --compute ${{inputs.compute}} --cv_folds ${{inputs.cv_folds}}\\\n", + " --n_estimators ${{inputs.n_estimators}} --max_depth ${{inputs.max_depth}} --max_features ${{inputs.max_features}}\",\n", + " compute=\"rapids-cluster\",\n", + ")\n", + "\n", + "\n", + "# submit the command\n", + "returned_job = ml_client.jobs.create_or_update(command_job)\n", + "\n", + "# get a URL for the status of the job\n", + "returned_job.studio_url" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Tune model hyperparameters" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can optimize our model's hyperparameters and improve the accuracy using Azure Machine Learning's hyperparameter tuning capabilities." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Start a hyperparameter sweep" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's define the hyperparameter space to sweep over. We will tune `n_estimators`, `max_depth` and `max_features` parameters. In this example we will use random sampling to try different configuration sets of hyperparameters and maximize `Accuracy`." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "from azure.ai.ml.sweep import Choice, Uniform, MedianStoppingPolicy\n", + "\n", + "command_job_for_sweep = command_job(\n", + " n_estimators=Choice(values=range(50, 500)),\n", + " max_depth=Choice(values=range(5, 19)),\n", + " max_features=Uniform(min_value=0.2, max_value=1.0),\n", + ")\n", + "\n", + "# apply sweep parameter to obtain the sweep_job\n", + "sweep_job = command_job_for_sweep.sweep(\n", + " compute=\"rapids-cluster\",\n", + " sampling_algorithm=\"random\",\n", + " primary_metric=\"Accuracy\",\n", + " goal=\"Maximize\",\n", + ")\n", + "\n", + "\n", + "# Define the limits for this sweep\n", + "sweep_job.set_limits(\n", + " max_total_trials=10, max_concurrent_trials=2, timeout=18000, trial_timeout=3600\n", + ")\n", + "\n", + "\n", + "# Specify your experiment details\n", + "sweep_job.display_name = \"RF-rapids-sweep-job\"\n", + "sweep_job.description = \"Run RAPIDS hyperparameter sweep job\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This will launch the RAPIDS training script with parameters that were specified in the cell above." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "# submit the hpo job\n", + "returned_sweep_job = ml_client.create_or_update(sweep_job)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Monitor SweepJobs runs" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Monitor your job at https://ml.azure.com/runs/eager_turtle_r7fs2xzcty?wsid=/subscriptions/fc4f4a6b-4041-4b1c-8249-854d68edcf62/resourcegroups/rapidsai-deployment/workspaces/rapids-aml-cluster&tid=43083d15-7273-40c1-b7db-39efd9ccc17a\n" + ] + } + ], + "source": [ + "aml_url = returned_sweep_job.studio_url\n", + "\n", + "print(\"Monitor your job at\", aml_url)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Find and register best model" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Download the best trial model output" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [], + "source": [ + "ml_client.jobs.download(returned_sweep_job.name, output_name=\"model\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Delete cluster" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [], + "source": [ + "ml_client.compute.begin_delete(gpu_compute_target).wait()" + ] + } + ], + "metadata": { + "kernel_info": { + "name": "rapids" + }, + "kernelspec": { + "display_name": "rapids-23.06", + "language": "python", + "name": "rapids-23.06" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.12" + }, + "microsoft": { + "ms_spell_check": { + "ms_spell_check_language": "en" + } + }, + "nteract": { + "version": "nteract-front-end@1.0.0" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/source/examples/rapids-azureml-hpo/rapids_csp_azure.py b/source/examples/rapids-azureml-hpo/rapids_csp_azure.py new file mode 100644 index 00000000..ea7724ea --- /dev/null +++ b/source/examples/rapids-azureml-hpo/rapids_csp_azure.py @@ -0,0 +1,503 @@ +# +# Copyright (c) 2019-2021, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +import json +import logging +import pprint +import time + +import cudf +import cuml +import dask +import dask_cudf +import numpy as np +import pandas as pd +import pyarrow.orc as pyarrow_orc +import sklearn +import xgboost +from cuml.dask.common import utils as dask_utils +from cuml.metrics.accuracy import accuracy_score +from cuml.model_selection import train_test_split as cuml_train_test_split +from dask.distributed import Client +from dask_cuda import LocalCUDACluster +from dask_ml.model_selection import train_test_split as dask_train_test_split +from sklearn.model_selection import train_test_split as sklearn_train_test_split + +default_azureml_paths = { + "train_script": "./train_script", + "train_data": "./data_airline", + "output": "./output", +} + + +class RapidsCloudML: + def __init__( + self, + cloud_type="Azure", + model_type="RandomForest", + data_type="Parquet", + compute_type="single-GPU", + verbose_estimator=False, + CSP_paths=default_azureml_paths, + ): + self.CSP_paths = CSP_paths + self.cloud_type = cloud_type + self.model_type = model_type + self.data_type = data_type + self.compute_type = compute_type + self.verbose_estimator = verbose_estimator + self.log_to_file( + f"\n> RapidsCloudML\n\tCompute, Data, Model, Cloud types " + f"{self.compute_type, self.data_type, self.model_type, self.cloud_type}" + ) + + # Setting up client for multi-GPU option + if "multi" in self.compute_type: + self.log_to_file("\n\tMulti-GPU selected") + # This will use all GPUs on the local host by default + cluster = LocalCUDACluster(threads_per_worker=1) + self.client = Client(cluster) + + # Query the client for all connected workers + self.workers = self.client.has_what().keys() + self.n_workers = len(self.workers) + self.log_to_file(f"\n\tClient information {self.client}") + + def load_hyperparams(self, model_name="XGBoost"): + """ + Selecting model paramters based on the model we select for execution. + Checks if there is a config file present in the path self.CSP_paths['hyperparams'] with + the parameters for the experiment. If not present, it returns the default parameters. + + Parameters + ---------- + model_name : string + Selects which model to set the parameters for. Takes either 'XGBoost' or 'RandomForest'. + + Returns + ---------- + model_params : dict + Loaded model parameters (dict) + """ + + self.log_to_file("\n> Loading Hyperparameters") + + # Default parameters of the models + if self.model_type == "XGBoost": + # https://xgboost.readthedocs.io/en/latest/parameter.html + model_params = { + "max_depth": 6, + "num_boost_round": 100, + "learning_rate": 0.3, + "gamma": 0.0, + "lambda": 1.0, + "alpha": 0.0, + "objective": "binary:logistic", + "random_state": 0, + } + + elif self.model_type == "RandomForest": + # https://docs.rapids.ai/api/cuml/stable/ -> cuml.ensemble.RandomForestClassifier + model_params = { + "n_estimators": 10, + "max_depth": 10, + "n_bins": 16, + "max_features": 1.0, + "seed": 0, + } + + hyperparameters = {} + try: + with open(self.CSP_paths["hyperparams"]) as file_handle: + hyperparameters = json.load(file_handle) + for key, value in hyperparameters.items(): + model_params[key] = value + pprint.pprint(model_params) + return model_params + + except Exception as error: + self.log_to_file(str(error)) + return + + def load_data( + self, filename="dataset.orc", col_labels=None, y_label="ArrDelayBinary" + ): + """ + Loading the data into the object from the filename and based on the columns that we are + interested in. Also, generates y_label from 'ArrDelay' column to convert this into a binary + classification problem. + + Parameters + ---------- + filename : string + the path of the dataset to be loaded + + col_labels : list of strings + The input columns that we are interested in. None selects all the columns + + y_label : string + The column to perform the prediction task in. + + Returns + ---------- + dataset : dataframe (Pandas, cudf or dask-cudf) + Ingested dataset in the format of a dataframe + + col_labels : list of strings + The input columns selected + + y_label : string + The generated y_label name for binary classification + + duration : float + The time it took to execute the function + """ + target_filename = filename + self.log_to_file(f"\n> Loading dataset from {target_filename}") + + with PerfTimer() as ingestion_timer: + if "CPU" in self.compute_type: + # CPU Reading options + self.log_to_file("\n\tCPU read") + + if self.data_type == "ORC": + with open(target_filename, mode="rb") as file: + dataset = pyarrow_orc.ORCFile(file).read().to_pandas() + elif self.data_type == "CSV": + dataset = pd.read_csv(target_filename, names=col_labels) + + elif self.data_type == "Parquet": + if "single" in self.compute_type: + dataset = pd.read_parquet(target_filename) + + elif "multi" in self.compute_type: + self.log_to_file("\n\tReading using dask dataframe") + dataset = dask.dataframe.read_parquet( + target_filename, columns=col_labels + ) + + elif "GPU" in self.compute_type: + # GPU Reading Option + + self.log_to_file("\n\tGPU read") + if self.data_type == "ORC": + dataset = cudf.read_orc(target_filename) + + elif self.data_type == "CSV": + dataset = cudf.read_csv(target_filename, names=col_labels) + + elif self.data_type == "Parquet": + if "single" in self.compute_type: + dataset = cudf.read_parquet(target_filename) + + elif "multi" in self.compute_type: + self.log_to_file("\n\tReading using dask_cudf") + dataset = dask_cudf.read_parquet( + target_filename, columns=col_labels + ) + + # cast all columns to float32 + for col in dataset.columns: + dataset[col] = dataset[col].astype(np.float32) # needed for random forest + + # Adding y_label column if it is not present + if y_label not in dataset.columns: + dataset[y_label] = 1.0 * (dataset["ArrDelay"] > 10) + + dataset[y_label] = dataset[y_label].astype(np.int32) # Needed for cuml RF + + dataset = dataset.fillna(0.0) # Filling the null values. Needed for dask-cudf + + self.log_to_file(f"\n\tIngestion completed in {ingestion_timer.duration}") + self.log_to_file( + f"\n\tDataset descriptors: {dataset.shape}\n\t{dataset.dtypes}" + ) + return dataset, col_labels, y_label, ingestion_timer.duration + + def split_data( + self, dataset, y_label, train_size=0.8, random_state=0, shuffle=True + ): + """ + Splitting data into train and test split, has appropriate imports for different compute modes. + CPU compute - Uses sklearn, we manually filter y_label column in the split call + GPU Compute - Single GPU uses cuml and multi GPU uses dask, both split y_label internally. + + Parameters + ---------- + dataset : dataframe + The dataframe on which we wish to perform the split + y_label : string + The name of the column (not the series itself) + train_size : float + The size for the split. Takes values between 0 to 1. + random_state : int + Useful for running reproducible splits. + shuffle : binary + Specifies if the data must be shuffled before splitting. + + Returns + ---------- + X_train : dataframe + The data to be used for training. Has same type as input dataset. + X_test : dataframe + The data to be used for testing. Has same type as input dataset. + y_train : dataframe + The label to be used for training. Has same type as input dataset. + y_test : dataframe + The label to be used for testing. Has same type as input dataset. + duration : float + The time it took to perform the split + """ + self.log_to_file("\n> Splitting train and test data") + time.perf_counter() + + with PerfTimer() as split_timer: + if "CPU" in self.compute_type: + X_train, X_test, y_train, y_test = sklearn_train_test_split( + dataset.loc[:, dataset.columns != y_label], + dataset[y_label], + train_size=train_size, + shuffle=shuffle, + random_state=random_state, + ) + + elif "GPU" in self.compute_type: + if "single" in self.compute_type: + X_train, X_test, y_train, y_test = cuml_train_test_split( + X=dataset, + y=y_label, + train_size=train_size, + shuffle=shuffle, + random_state=random_state, + ) + elif "multi" in self.compute_type: + X_train, X_test, y_train, y_test = dask_train_test_split( + dataset, + y_label, + train_size=train_size, + shuffle=False, # shuffle not available for dask_cudf yet + random_state=random_state, + ) + + self.log_to_file(f"\n\tX_train shape and type{X_train.shape} {type(X_train)}") + self.log_to_file(f"\n\tSplit completed in {split_timer.duration}") + return X_train, X_test, y_train, y_test, split_timer.duration + + def train_model(self, X_train, y_train, model_params): + """ + Trains a model with the model_params specified by calling fit_xgboost or + fit_random_forest depending on the model_type. + + Parameters + ---------- + X_train : dataframe + The data for traning + y_train : dataframe + The label to be used for training. + model_params : dict + The model params to use for this training + Returns + ---------- + trained_model : The object of the trained model either of XGBoost or RandomForest + + training_time : float + The time it took to train the model + """ + self.log_to_file(f"\n> Training {self.model_type} estimator w/ hyper-params") + training_time = 0 + + try: + if self.model_type == "XGBoost": + trained_model, training_time = self.fit_xgboost( + X_train, y_train, model_params + ) + elif self.model_type == "RandomForest": + trained_model, training_time = self.fit_random_forest( + X_train, y_train, model_params + ) + except Exception as error: + self.log_to_file("\n\n!error during model training: " + str(error)) + self.log_to_file(f"\n\tFinished training in {training_time:.4f} s") + return trained_model, training_time + + def fit_xgboost(self, X_train, y_train, model_params): + """ + Trains a XGBoost model on X_train and y_train with model_params + + Parameters and Objects returned are same as trained_model + """ + if "GPU" in self.compute_type: + model_params.update({"tree_method": "gpu_hist"}) + else: + model_params.update({"tree_method": "hist"}) + + with PerfTimer() as train_timer: + if "single" in self.compute_type: + train_DMatrix = xgboost.DMatrix(data=X_train, label=y_train) + trained_model = xgboost.train( + dtrain=train_DMatrix, + params=model_params, + num_boost_round=model_params["num_boost_round"], + ) + elif "multi" in self.compute_type: + self.log_to_file("\n\tTraining multi-GPU XGBoost") + train_DMatrix = xgboost.dask.DaskDMatrix( + self.client, data=X_train, label=y_train + ) + trained_model = xgboost.dask.train( + self.client, + dtrain=train_DMatrix, + params=model_params, + num_boost_round=model_params["num_boost_round"], + ) + return trained_model, train_timer.duration + + def fit_random_forest(self, X_train, y_train, model_params): + """ + Trains a RandomForest model on X_train and y_train with model_params. + Depending on compute_type, estimators from appropriate packages are used. + CPU - sklearn + Single-GPU - cuml + multi_gpu - cuml.dask + + Parameters and Objects returned are same as trained_model + """ + if "CPU" in self.compute_type: + rf_model = sklearn.ensemble.RandomForestClassifier( + n_estimators=model_params["n_estimators"], + max_depth=model_params["max_depth"], + max_features=model_params["max_features"], + n_jobs=int(self.n_workers), + verbose=self.verbose_estimator, + ) + elif "GPU" in self.compute_type: + if "single" in self.compute_type: + rf_model = cuml.ensemble.RandomForestClassifier( + n_estimators=model_params["n_estimators"], + max_depth=model_params["max_depth"], + n_bins=model_params["n_bins"], + max_features=model_params["max_features"], + verbose=self.verbose_estimator, + ) + elif "multi" in self.compute_type: + self.log_to_file("\n\tFitting multi-GPU daskRF") + X_train, y_train = dask_utils.persist_across_workers( + self.client, + [X_train.fillna(0.0), y_train.fillna(0.0)], + workers=self.workers, + ) + rf_model = cuml.dask.ensemble.RandomForestClassifier( + n_estimators=model_params["n_estimators"], + max_depth=model_params["max_depth"], + n_bins=model_params["n_bins"], + max_features=model_params["max_features"], + verbose=self.verbose_estimator, + ) + with PerfTimer() as train_timer: + try: + trained_model = rf_model.fit(X_train, y_train) + except Exception as error: + self.log_to_file("\n\n! Error during fit " + str(error)) + return trained_model, train_timer.duration + + def evaluate_test_perf(self, trained_model, X_test, y_test, threshold=0.5): + """ + Evaluates the model performance on the inference set. For XGBoost we need + to generate a DMatrix and then we can evaluate the model. + For Random Forest, in single GPU case, we can just call .score function. + And multi-GPU Random Forest needs to predict on the model and then compute + the accuracy score. + + Parameters + ---------- + trained_model : The object of the trained model either of XGBoost or RandomForest + X_test : dataframe + The data for testing + y_test : dataframe + The label to be used for testing. + Returns + ---------- + test_accuracy : float + The accuracy achieved on test set + duration : float + The time it took to evaluate the model + """ + self.log_to_file("\n> Inferencing on test set") + test_accuracy = None + with PerfTimer() as inference_timer: + try: + if self.model_type == "XGBoost": + if "multi" in self.compute_type: + test_DMatrix = xgboost.dask.DaskDMatrix( + self.client, data=X_test, label=y_test + ) + xgb_pred = xgboost.dask.predict( + self.client, trained_model, test_DMatrix + ).compute() + xgb_pred = (xgb_pred > threshold) * 1.0 + test_accuracy = accuracy_score(y_test.compute(), xgb_pred) + elif "single" in self.compute_type: + test_DMatrix = xgboost.DMatrix(data=X_test, label=y_test) + xgb_pred = trained_model.predict(test_DMatrix) + xgb_pred = (xgb_pred > threshold) * 1.0 + test_accuracy = accuracy_score(y_test, xgb_pred) + + elif self.model_type == "RandomForest": + if "multi" in self.compute_type: + cuml_pred = trained_model.predict(X_test).compute() + self.log_to_file("\n\tPrediction complete") + test_accuracy = accuracy_score( + y_test.compute(), cuml_pred, convert_dtype=True + ) + elif "single" in self.compute_type: + test_accuracy = trained_model.score( + X_test, y_test.astype("int32") + ) + + except Exception as error: + self.log_to_file("\n\n!error during inference: " + str(error)) + + self.log_to_file(f"\n\tFinished inference in {inference_timer.duration:.4f} s") + self.log_to_file(f"\n\tTest-accuracy: {test_accuracy}") + return test_accuracy, inference_timer.duration + + def set_up_logging(self): + """ + Function to set up logging for the object. + """ + logging_path = self.CSP_paths["output"] + "/log.txt" + logging.basicConfig(filename=logging_path, level=logging.INFO) + + def log_to_file(self, text): + """ + Logs the text that comes in as input. + """ + logging.info(text) + print(text) + + +# perf_counter = highest available timer resolution +class PerfTimer: + def __init__(self): + self.start = None + self.duration = None + + def __enter__(self): + self.start = time.perf_counter() + return self + + def __exit__(self, *args): + self.duration = time.perf_counter() - self.start diff --git a/source/examples/rapids-azureml-hpo/train_rapids.py b/source/examples/rapids-azureml-hpo/train_rapids.py new file mode 100644 index 00000000..63ce4f5f --- /dev/null +++ b/source/examples/rapids-azureml-hpo/train_rapids.py @@ -0,0 +1,175 @@ +# +# Copyright (c) 2019-2021, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import argparse +import os + +import cudf +import cuml +import mlflow +import numpy as np +from rapids_csp_azure import PerfTimer, RapidsCloudML + + +def main(): + parser = argparse.ArgumentParser() + + parser.add_argument("--data_dir", type=str, help="location of data") + parser.add_argument( + "--n_estimators", type=int, default=100, help="Number of trees in RF" + ) + parser.add_argument( + "--max_depth", type=int, default=16, help="Max depth of each tree" + ) + parser.add_argument( + "--n_bins", + type=int, + default=8, + help="Number of bins used in split point calculation", + ) + parser.add_argument( + "--max_features", + type=float, + default=1.0, + help="Number of features for best split", + ) + parser.add_argument( + "--compute", + type=str, + default="single-GPU", + help="set to multi-GPU for algorithms via dask", + ) + parser.add_argument( + "--cv_folds", type=int, default=5, help="Number of CV fold splits" + ) + + args = parser.parse_args() + data_dir = args.data_dir + compute = args.compute + cv_folds = args.cv_folds + + n_estimators = args.n_estimators + mlflow.log_param("n_estimators", np.int(args.n_estimators)) + max_depth = args.max_depth + mlflow.log_param("max_depth", np.int(args.max_depth)) + n_bins = args.n_bins + mlflow.log_param("n_bins", np.int(args.n_bins)) + max_features = args.max_features + mlflow.log_param("max_features", np.str(args.max_features)) + + print("\n---->>>> cuDF version <<<<----\n", cudf.__version__) + print("\n---->>>> cuML version <<<<----\n", cuml.__version__) + + azure_ml = RapidsCloudML( + cloud_type="Azure", + model_type="RandomForest", + data_type="Parquet", + compute_type=compute, + ) + print(args.compute) + + if compute == "single-GPU": + dataset, _, y_label, _ = azure_ml.load_data(filename=data_dir) + else: + # use parquet files from 'https://airlinedataset.blob.core.windows.net/airline-10years' for multi-GPU training + dataset, _, y_label, _ = azure_ml.load_data( + filename=os.path.join(data_dir, "part*.parquet"), + col_labels=[ + "Flight_Number_Reporting_Airline", + "Year", + "Quarter", + "Month", + "DayOfWeek", + "DOT_ID_Reporting_Airline", + "OriginCityMarketID", + "DestCityMarketID", + "DepTime", + "DepDelay", + "DepDel15", + "ArrDel15", + "ArrDelay", + "AirTime", + "Distance", + ], + y_label="ArrDel15", + ) + + X = dataset[dataset.columns.difference(["ArrDelay", y_label])] + y = dataset[y_label] + del dataset + + print("\n---->>>> Training using GPUs <<<<----\n") + + # ---------------------------------------------------------------------------------------------------- + # cross-validation folds + # ---------------------------------------------------------------------------------------------------- + accuracy_per_fold = [] + train_time_per_fold = [] + infer_time_per_fold = [] + trained_model = [] + global_best_test_accuracy = 0 + + model_params = { + "n_estimators": n_estimators, + "max_depth": max_depth, + "max_features": max_features, + "n_bins": n_bins, + } + + # optional cross-validation w/ model_params['n_train_folds'] > 1 + for i_train_fold in range(cv_folds): + print(f"\n CV fold { i_train_fold } of { cv_folds }\n") + + # split data + X_train, X_test, y_train, y_test, _ = azure_ml.split_data( + X, y, random_state=i_train_fold + ) + # train model + trained_model, training_time = azure_ml.train_model( + X_train, y_train, model_params + ) + + train_time_per_fold.append(round(training_time, 4)) + + # evaluate perf + test_accuracy, infer_time = azure_ml.evaluate_test_perf( + trained_model, X_test, y_test + ) + accuracy_per_fold.append(round(test_accuracy, 4)) + infer_time_per_fold.append(round(infer_time, 4)) + + # update best model [ assumes maximization of perf metric ] + if test_accuracy > global_best_test_accuracy: + global_best_test_accuracy = test_accuracy + + mlflow.log_metric( + "Total training inference time", np.float(training_time + infer_time) + ) + mlflow.log_metric("Accuracy", np.float(global_best_test_accuracy)) + print("\n Accuracy :", global_best_test_accuracy) + print("\n accuracy per fold :", accuracy_per_fold) + print("\n train-time per fold :", train_time_per_fold) + print("\n train-time all folds :", sum(train_time_per_fold)) + print("\n infer-time per fold :", infer_time_per_fold) + print("\n infer-time all folds :", sum(infer_time_per_fold)) + + +if __name__ == "__main__": + with PerfTimer() as total_script_time: + main() + print(f"Total runtime: {total_script_time.duration:.2f}") + mlflow.log_metric("Total runtime", np.float(total_script_time.duration)) + print("\n Exiting script")