diff --git a/source/examples/index.md b/source/examples/index.md index b89d250f..e78f1551 100644 --- a/source/examples/index.md +++ b/source/examples/index.md @@ -15,4 +15,5 @@ rapids-autoscaling-multi-tenant-kubernetes/notebook xgboost-randomforest-gpu-hpo-dask/notebook rapids-azureml-hpo/notebook xgboost-rf-gpu-cpu-benchmark/notebook +xgboost-azure-mnmg-daskcloudprovider/notebook ``` diff --git a/source/examples/xgboost-azure-mnmg-daskcloudprovider/notebook.ipynb b/source/examples/xgboost-azure-mnmg-daskcloudprovider/notebook.ipynb index 1c97befb..811ae4e5 100644 --- a/source/examples/xgboost-azure-mnmg-daskcloudprovider/notebook.ipynb +++ b/source/examples/xgboost-azure-mnmg-daskcloudprovider/notebook.ipynb @@ -2,7 +2,13 @@ "cells": [ { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, "source": [ "# Multi-Node Multi-GPU XGBoost example on Azure using dask-cloudprovider \n", "\n", @@ -23,7 +29,15 @@ }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [ + "library/xgboost" + ] + }, "source": [ "## Step 0: Set up Azure credentials and CLI \n" ] @@ -42,7 +56,13 @@ }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, "source": [ "## Step 1: Import necessary packages " ] @@ -57,14 +77,20 @@ "source": [ "# # Uncomment the following and install some libraries at the beginning.\n", "# # If adlfs is not present, install adlfs to read from Azure data lake.\n", - "! pip install adlfs\n", - "! pip install \"dask-cloudprovider[azure]\" --upgrade" + "# ! pip install adlfs\n", + "# ! pip install \"dask-cloudprovider[azure]\" --upgrade" ] }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, + "execution_count": 1, + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, "outputs": [], "source": [ "from dask.distributed import Client, wait, get_worker\n", @@ -99,19 +125,19 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "location = \"West US 2\"\n", "resource_group = \"rapidsai-deployment\"\n", - "vnet = \"skirui-test-mnmg-daskcloudprovider\" # \"rapidsaiclouddeploymenttest-nsg\"\n", + "vnet = \"rapidsai-deployment-vnet\"\n", + "security_group = \"skirui-test-mnmg-daskcloudprovider\"\n", "vm_size = \"Standard_NC12s_v3\" # or choose a different GPU enabled VM type\n", "\n", - "docker_image = \"rapidsai/rapidsai:23.06-cuda11.8-runtime-ubuntu22.04-py3.10\"\n", + "docker_image = \"nvcr.io/nvidia/rapidsai/rapidsai-core:23.06-cuda11.8-runtime-ubuntu22.04-py3.10\" # \"rapidsai/rapidsai:23.06-cuda11.8-runtime-ubuntu22.04-py3.10\"\n", "docker_args = \"--shm-size=256m -e DISABLE_JUPYTER=true\"\n", - "worker_class = \"dask_cuda.CUDAWorker\"\n", - "worker_options = {\"rmm-managed-memory\": True}" + "worker_class = \"dask_cuda.CUDAWorker\"" ] }, { @@ -137,16 +163,43 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'vnet': None,\n", + " 'security_group': None,\n", + " 'public_ingress': True,\n", + " 'vm_size': 'Standard_DS1_v2',\n", + " 'disk_size': 50,\n", + " 'scheduler_vm_size': None,\n", + " 'docker_image': 'daskdev/dask:latest',\n", + " 'vm_image': {'publisher': 'Canonical',\n", + " 'offer': 'UbuntuServer',\n", + " 'sku': '18.04-LTS',\n", + " 'version': 'latest'},\n", + " 'bootstrap': True,\n", + " 'auto_shutdown': True,\n", + " 'marketplace_plan': {'publisher': 'nvidia',\n", + " 'name': 'ngc-base-version-23_03_0',\n", + " 'product': 'ngc_azure_17_11',\n", + " 'version': '23.03.0'}}" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "dask.config.set(\n", " {\n", " \"logging.distributed\": \"info\",\n", " \"cloudprovider.azure.azurevm.marketplace_plan\": {\n", " \"publisher\": \"nvidia\",\n", - " \"name\": \"ngc-base-version-23_03_02\",\n", + " \"name\": \"ngc-base-version-23_03_0\",\n", " \"product\": \"ngc_azure_17_11\",\n", " \"version\": \"23.03.0\",\n", " },\n", @@ -166,11 +219,41 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "! az vm image terms accept --urn \"nvidia:ngc_azure_17_11:ngc-base-version-23_03_02:23.03.0\" --verbose" + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " \"accepted\": true,\n", + " \"id\": \"/subscriptions/fc4f4a6b-4041-4b1c-8249-854d68edcf62/providers/Microsoft.MarketplaceOrdering/offerTypes/Microsoft.MarketplaceOrdering/offertypes/publishers/nvidia/offers/ngc_azure_17_11/plans/ngc-base-version-23_03_0/agreements/current\",\n", + " \"licenseTextLink\": \"https://mpcprodsa.blob.core.windows.net/legalterms/3E5ED_legalterms_NVIDIA%253a24NGC%253a5FAZURE%253a5F17%253a5F11%253a24NGC%253a2DBASE%253a2DVERSION%253a2D23%253a5F03%253a5F0%253a24KJVKRIWKTRQ3CIEPNL6YTG4AVORBHHPZCDQDVWX7JPPDEF6UM7R4XO76VDRHXCNTQYATKLGYYW3KA7DSIKTYXBZ3HJ2FMWYCINEY4WQ.txt\",\n", + " \"marketplaceTermsLink\": \"https://mpcprodsa.blob.core.windows.net/marketplaceterms/3EDEF_marketplaceterms_VIRTUALMACHINE%253a24AAK2OAIZEAWW5H4MSP5KSTVB6NDKKRTUBAU23BRFTWN4YC2MQLJUB5ZEYUOUJBVF3YK34CIVPZL2HWYASPGDUY5O2FWEGRBYOXWZE5Y.txt\",\n", + " \"name\": \"ngc-base-version-23_03_0\",\n", + " \"plan\": \"ngc-base-version-23_03_0\",\n", + " \"privacyPolicyLink\": \"https://www.nvidia.com/en-us/about-nvidia/privacy-policy/\",\n", + " \"product\": \"ngc_azure_17_11\",\n", + " \"publisher\": \"nvidia\",\n", + " \"retrieveDatetime\": \"2023-08-04T16:43:51.6614956Z\",\n", + " \"signature\": \"WSCMGFXMGNX6ACOPDERPFKAR4LNZFF6Q3BYB4JLRSMK3DTBLA3LZZIDL3FPNULJYRK3XPZFHGOELQLFJ45EVPM4WNTZEZ2M3YLORDNA\",\n", + " \"systemData\": {\n", + " \"createdAt\": \"2023-08-04T16:43:54.091798+00:00\",\n", + " \"createdBy\": \"fc4f4a6b-4041-4b1c-8249-854d68edcf62\",\n", + " \"createdByType\": \"ManagedIdentity\",\n", + " \"lastModifiedAt\": \"2023-08-04T16:43:54.091798+00:00\",\n", + " \"lastModifiedBy\": \"fc4f4a6b-4041-4b1c-8249-854d68edcf62\",\n", + " \"lastModifiedByType\": \"ManagedIdentity\"\n", + " },\n", + " \"type\": \"Microsoft.MarketplaceOrdering/offertypes\"\n", + "}\n", + "\u001b[32mCommand ran in 7.036 seconds (init: 0.189, invoke: 6.847)\u001b[0m\n" + ] + } + ], + "source": [ + "! az vm image terms accept --urn \"nvidia:ngc_azure_17_11:ngc-base-version-23_03_0:23.03.0\" --verbose" ] }, { @@ -201,7 +284,9 @@ }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "jp-MarkdownHeadingCollapsed": true + }, "source": [ "In general, if we use a generic image to create a cluster, we would have to wait till the new VMs are provisioned fully with all dependencies. The provisioning step does several things such as set the VM up with required libraries, set up Docker, install the NVIDIA drivers and also pull and decompress the RAPIDS container etc. This usually takes around 10-15 minutes of time depending on the cloud provider. If the user wants to fire up a cluster quickly, setting up a VM from a generic image every time may not be optimal. \n", "\n", @@ -422,7 +507,9 @@ }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "jp-MarkdownHeadingCollapsed": true + }, "source": [ "#### f. Set up customized VM information and clear default dask config " ] @@ -482,9 +569,50 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Creating scheduler instance\n", + "Assigned public IP\n", + "Network interface ready\n", + "Using Marketplace VM image with a Plan\n", + "Creating VM\n", + "Created VM dask-acc394fb-scheduler\n", + "Waiting for scheduler to run at 20.112.20.30:8786\n", + "Scheduler is running\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/home/azureuser/miniconda3/envs/rapids-23.06/lib/python3.10/contextlib.py:142: UserWarning: Creating your cluster is taking a surprisingly long time. This is likely due to pending resources. Hang tight! \n", + " next(self.gen)\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Creating worker instance\n", + "Creating worker instance\n", + "Network interface ready\n", + "Using Marketplace VM image with a Plan\n", + "Creating VM\n", + "Network interface ready\n", + "Using Marketplace VM image with a Plan\n", + "Creating VM\n", + "Created VM dask-acc394fb-worker-771e6bec\n", + "Created VM dask-acc394fb-worker-35d2d50f\n", + "CPU times: user 980 ms, sys: 206 ms, total: 1.19 s\n", + "Wall time: 8min 6s\n" + ] + } + ], "source": [ "%%time\n", "\n", @@ -495,12 +623,12 @@ " security_group=security_group,\n", " vm_image=vm_image,\n", " vm_size=vm_size,\n", + " disk_size=200,\n", " docker_image=docker_image,\n", " worker_class=worker_class,\n", " n_workers=2,\n", " security=True,\n", " docker_args=docker_args,\n", - " worker_options=worker_options,\n", " debug=False,\n", " bootstrap=False, # This is to prevent the cloud init jinja2 script from running in the custom VM.\n", ")" @@ -511,6 +639,391 @@ "execution_count": null, "metadata": {}, "outputs": [], + "source": [ + "# cluster.scheduler.admin_password" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cluster.workers[0].admin_password" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# cluster.close()" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/home/azureuser/miniconda3/envs/rapids-23.06/lib/python3.10/site-packages/distributed/client.py:1381: VersionMismatchWarning: Mismatched versions found\n", + "\n", + "+---------+-----------------+-----------------+-----------------+\n", + "| Package | Client | Scheduler | Workers |\n", + "+---------+-----------------+-----------------+-----------------+\n", + "| numpy | 1.24.4 | 1.24.3 | 1.24.3 |\n", + "| python | 3.10.12.final.0 | 3.10.11.final.0 | 3.10.11.final.0 |\n", + "+---------+-----------------+-----------------+-----------------+\n", + " warnings.warn(version_module.VersionMismatchWarning(msg[0][\"warning\"]))\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "
\n", + "
\n", + "

Client

\n", + "

Client-28ef148b-32e8-11ee-8889-6045bda2725d

\n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + "
Connection method: Cluster objectCluster type: dask_cloudprovider.AzureVMCluster
\n", + " Dashboard: http://20.112.20.30:8787/status\n", + "
\n", + "\n", + " \n", + "\n", + " \n", + "
\n", + "

Cluster Info

\n", + "
\n", + "
\n", + "
\n", + "
\n", + "

AzureVMCluster

\n", + "

d5b59ca0

\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
\n", + " Dashboard: http://20.112.20.30:8787/status\n", + " \n", + " Workers: 4\n", + "
\n", + " Total threads: 4\n", + " \n", + " Total memory: 440.42 GiB\n", + "
\n", + "\n", + "
\n", + " \n", + "

Scheduler Info

\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + "

Scheduler

\n", + "

Scheduler-7444f921-d220-4215-94a8-3de3fee9181d

\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
\n", + " Comm: tls://10.5.0.5:8786\n", + " \n", + " Workers: 4\n", + "
\n", + " Dashboard: http://10.5.0.5:8787/status\n", + " \n", + " Total threads: 4\n", + "
\n", + " Started: 7 minutes ago\n", + " \n", + " Total memory: 440.42 GiB\n", + "
\n", + "
\n", + "
\n", + "\n", + "
\n", + " \n", + "

Workers

\n", + "
\n", + "\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: dask-acc394fb-worker-35d2d50f-0

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tls://10.5.0.7:40749\n", + " \n", + " Total threads: 1\n", + "
\n", + " Dashboard: http://10.5.0.7:46705/status\n", + " \n", + " Memory: 110.11 GiB\n", + "
\n", + " Nanny: tls://10.5.0.7:43799\n", + "
\n", + " Local directory: /tmp/dask-worker-space/worker-qn26kw1y\n", + "
\n", + " GPU: Tesla V100-PCIE-16GB\n", + " \n", + " GPU memory: 16.00 GiB\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: dask-acc394fb-worker-35d2d50f-1

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tls://10.5.0.7:34469\n", + " \n", + " Total threads: 1\n", + "
\n", + " Dashboard: http://10.5.0.7:40865/status\n", + " \n", + " Memory: 110.11 GiB\n", + "
\n", + " Nanny: tls://10.5.0.7:36253\n", + "
\n", + " Local directory: /tmp/dask-worker-space/worker-rjbl3dux\n", + "
\n", + " GPU: Tesla V100-PCIE-16GB\n", + " \n", + " GPU memory: 16.00 GiB\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: dask-acc394fb-worker-771e6bec-0

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tls://10.5.0.6:45171\n", + " \n", + " Total threads: 1\n", + "
\n", + " Dashboard: http://10.5.0.6:34001/status\n", + " \n", + " Memory: 110.11 GiB\n", + "
\n", + " Nanny: tls://10.5.0.6:42431\n", + "
\n", + " Local directory: /tmp/dask-worker-space/worker-e8o7vbqu\n", + "
\n", + " GPU: Tesla V100-PCIE-16GB\n", + " \n", + " GPU memory: 16.00 GiB\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "

Worker: dask-acc394fb-worker-771e6bec-1

\n", + "
\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "\n", + " \n", + "\n", + "
\n", + " Comm: tls://10.5.0.6:33367\n", + " \n", + " Total threads: 1\n", + "
\n", + " Dashboard: http://10.5.0.6:35135/status\n", + " \n", + " Memory: 110.11 GiB\n", + "
\n", + " Nanny: tls://10.5.0.6:40283\n", + "
\n", + " Local directory: /tmp/dask-worker-space/worker-u2nbjxm8\n", + "
\n", + " GPU: Tesla V100-PCIE-16GB\n", + " \n", + " GPU memory: 16.00 GiB\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "\n", + "
\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "\n", + "
\n", + "
" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "client = Client(cluster)\n", "client" @@ -522,23 +1035,20 @@ "metadata": {}, "outputs": [], "source": [ - "def scale_workers(client, n_workers, n_gpus_per_worker, timeout=300):\n", - " import time\n", + "# def scale_workers(client, n_workers, n_gpus_per_worker, timeout=300):\n", + "# import time\n", + "# client.cluster.scale(n_workers)\n", + "# m = len(client.has_what().keys())\n", + "# start = end = time.perf_counter_ns()\n", + "# while ((m != n_workers*n_gpus_per_worker) and (((end - start) / 1e9) < timeout) ):\n", + "# time.sleep(5)\n", + "# m = len(client.has_what().keys())\n", "\n", - " client.cluster.scale(n_workers)\n", - " m = len(client.has_what().keys())\n", - " start = end = time.perf_counter_ns()\n", - " while (m != n_workers * n_gpus_per_worker) and (((end - start) / 1e9) < timeout):\n", - " time.sleep(5)\n", - " m = len(client.has_what().keys())\n", + "# end = time.perf_counter_ns()\n", "\n", - " end = time.perf_counter_ns()\n", - "\n", - " if ((end - start) / 1e9) >= timeout:\n", - " raise RuntimeError(\n", - " f\"Failed to rescale cluster in {timeout} sec.\"\n", - " \"Try increasing timeout for very large containers, and verify available compute resources.\"\n", - " )" + "# if (((end - start) / 1e9) >= timeout):\n", + "# raise RuntimeError(f\"Failed to rescale cluster in {timeout} sec.\"\n", + "# \"Try increasing timeout for very large containers, and verify available compute resources.\")" ] }, { @@ -547,7 +1057,7 @@ "metadata": {}, "outputs": [], "source": [ - "# # Uncomment if you only have the scheduler with n_workers=0 and want to scale the workers separately.\n", + "# Uncomment if you only have the scheduler with n_workers=0 and want to scale the workers separately.\n", "# %%time\n", "# scale_workers(client, 2, 2, timeout=600)" ] @@ -561,9 +1071,18 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 1.88 ms, sys: 530 µs, total: 2.41 ms\n", + "Wall time: 37.4 ms\n" + ] + } + ], "source": [ "%%time\n", "client.wait_for_workers(2)" @@ -578,9 +1097,21 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "All workers for scheduler id: Scheduler-7444f921-d220-4215-94a8-3de3fee9181d, address: tls://10.5.0.5:8786\n", + "Worker: tls://10.5.0.6:33367 , gpu_machines: {'memory-total': 17179869184, 'name': 'Tesla V100-PCIE-16GB'}\n", + "Worker: tls://10.5.0.6:45171 , gpu_machines: {'memory-total': 17179869184, 'name': 'Tesla V100-PCIE-16GB'}\n", + "Worker: tls://10.5.0.7:34469 , gpu_machines: {'memory-total': 17179869184, 'name': 'Tesla V100-PCIE-16GB'}\n", + "Worker: tls://10.5.0.7:40749 , gpu_machines: {'memory-total': 17179869184, 'name': 'Tesla V100-PCIE-16GB'}\n" + ] + } + ], "source": [ "def pretty_print(scheduler_dict):\n", " print(\n", @@ -600,7 +1131,6 @@ { "cell_type": "markdown", "metadata": { - "jp-MarkdownHeadingCollapsed": true, "tags": [] }, "source": [ @@ -625,7 +1155,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 9, "metadata": {}, "outputs": [], "source": [ @@ -676,7 +1206,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 15, "metadata": {}, "outputs": [], "source": [ @@ -735,7 +1265,7 @@ " df[\"month\"] = df[\"tpepPickupDateTime\"].dt.month\n", " df[\"day\"] = df[\"tpepPickupDateTime\"].dt.day\n", " df[\"diff\"] = (\n", - " df[\"tpepPickupDateTime\"] - df[\"tpepPickupDateTime\"]\n", + " df[\"tpepDropoffDateTime\"] - df[\"tpepPickupDateTime\"]\n", " ).dt.seconds # convert difference between pickup and dropoff into seconds\n", "\n", " df[\"pickup_latitude_r\"] = df[\"startLat\"] // 0.01 * 0.01\n", @@ -778,8 +1308,24 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, + "execution_count": 17, + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [ + "library/cuml", + "library/dask-cudf", + "library/numpy", + "library/dask", + "tools/dask-cloudprovider", + "cloud/azure/azure-vm", + "dataset/nyc-taxi", + "data-format/parquet", + "data-storage/adls" + ] + }, "outputs": [], "source": [ "def persist_train_infer_split(\n", @@ -925,9 +1471,25 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/home/azureuser/miniconda3/envs/rapids-23.06/lib/python3.10/site-packages/dask/dataframe/io/parquet/core.py:417: FutureWarning: The `chunksize` argument is deprecated, and will be removed in a future release. Setting the `blocksize` argument instead. Please see documentation on the `blocksize` argument for more information.\n", + " warnings.warn(\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Wall clock time taken for ETL and persisting : 75.64389750299961 s\n" + ] + } + ], "source": [ "tic = timer()\n", "X_train, y_train, X_infer, y_infer = taxi_data_loader(\n", @@ -943,9 +1505,20 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "48817562" + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "X_train.shape[0].compute()" ] @@ -959,9 +1532,152 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 20, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
indexpassengerCounttripDistancestartLonstartLatrateCodeIdendLonendLatdiffh_distanceday_of_weekis_weekend
3004463004461.04.00-73.98495540.7685431.0-74.00878940.7193301324.05.809536e+006.00.0
1638171638173.01.93-74.00817940.7221981.0-73.99298940.739151840.01.395489e+002.00.0
2369582369585.01.10-73.98759540.7753601.0-73.97649440.785755360.01.394768e+005.00.0
73461734611.00.76-73.99469840.7259291.0-73.99469840.725929180.01.005159e-135.00.0
2944642944641.00.60-73.97434240.7481651.0-73.98253640.750767229.01.395336e+006.00.0
\n", + "
" + ], + "text/plain": [ + " index passengerCount tripDistance startLon startLat \\\n", + "300446 300446 1.0 4.00 -73.984955 40.768543 \n", + "163817 163817 3.0 1.93 -74.008179 40.722198 \n", + "236958 236958 5.0 1.10 -73.987595 40.775360 \n", + "73461 73461 1.0 0.76 -73.994698 40.725929 \n", + "294464 294464 1.0 0.60 -73.974342 40.748165 \n", + "\n", + " rateCodeId endLon endLat diff h_distance day_of_week \\\n", + "300446 1.0 -74.008789 40.719330 1324.0 5.809536e+00 6.0 \n", + "163817 1.0 -73.992989 40.739151 840.0 1.395489e+00 2.0 \n", + "236958 1.0 -73.976494 40.785755 360.0 1.394768e+00 5.0 \n", + "73461 1.0 -73.994698 40.725929 180.0 1.005159e-13 5.0 \n", + "294464 1.0 -73.982536 40.750767 229.0 1.395336e+00 6.0 \n", + "\n", + " is_weekend \n", + "300446 0.0 \n", + "163817 0.0 \n", + "236958 0.0 \n", + "73461 0.0 \n", + "294464 0.0 " + ] + }, + "execution_count": 20, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "X_train.head()" ] @@ -969,7 +1685,6 @@ { "cell_type": "markdown", "metadata": { - "jp-MarkdownHeadingCollapsed": true, "tags": [] }, "source": [ @@ -991,7 +1706,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 21, "metadata": {}, "outputs": [], "source": [ @@ -1024,14 +1739,22 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Since the data is already persisted in the dask workers in the Kubernetes Cluster, the next steps should not take a lot of time." + "Since the data is already persisted in the dask workers in the cluster, the next steps should not take a lot of time." ] }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 22, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Wall clock time taken for this cell : 8.431016346999968 s\n" + ] + } + ], "source": [ "data_train = xgb.dask.DaskDMatrix(client, X_train, y_train)\n", "tic = timer()\n", @@ -1052,7 +1775,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 23, "metadata": {}, "outputs": [], "source": [ @@ -1063,7 +1786,6 @@ { "cell_type": "markdown", "metadata": { - "jp-MarkdownHeadingCollapsed": true, "tags": [] }, "source": [ @@ -1072,16 +1794,33 @@ }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, "source": [ "Here we will use the `predict` and `inplace_predict` methods provided by the `xgboost.dask` library, out of the box. Later we will also use [Forest Inference Library (FIL)](https://docs.rapids.ai/api/cuml/stable/api.html?highlight=forestinference#cuml.ForestInference) to perform prediction." ] }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 24, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "DoneAndNotDoneFutures(done=set(), not_done=set())" + ] + }, + "execution_count": 24, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "_y_test = y_infer.compute()\n", "wait(_y_test)" @@ -1089,9 +1828,17 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 25, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Wall clock time taken for xgb.dask.predict : 1.6842520279988094 s\n" + ] + } + ], "source": [ "d_test = xgb.dask.DaskDMatrix(client, X_infer)\n", "tic = timer()\n", @@ -1104,16 +1851,30 @@ }, { "cell_type": "markdown", - "metadata": {}, + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, "source": [ "#### Inference with the inplace predict method of dask XGBoost" ] }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 26, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Wall clock time taken for inplace inference : 2.1184139880006114 s\n" + ] + } + ], "source": [ "tic = timer()\n", "y_pred = xgb.dask.inplace_predict(client, xgb_gpu_model, X_infer)\n", @@ -1125,9 +1886,25 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 27, + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Calculating MSE\n", + "Workflow Complete - RMSE: 2.2907634\n", + "Wall clock time taken for this cell : 1.1065989440012345 s\n" + ] + } + ], "source": [ "tic = timer()\n", "print(\"Calculating MSE\")\n", @@ -1140,7 +1917,6 @@ { "cell_type": "markdown", "metadata": { - "jp-MarkdownHeadingCollapsed": true, "tags": [] }, "source": [ @@ -1154,7 +1930,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 48, "metadata": {}, "outputs": [], "source": [ @@ -1175,9 +1951,17 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 64, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "dict_keys(['tls://10.5.0.6:33367', 'tls://10.5.0.6:45171', 'tls://10.5.0.7:34469', 'tls://10.5.0.7:40749'])\n" + ] + } + ], "source": [ "workers = client.has_what().keys()\n", "print(workers)\n", @@ -1187,7 +1971,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 80, "metadata": {}, "outputs": [], "source": [ @@ -1216,6 +2000,7 @@ " worker.data[\"fil_model\"] = ForestInference.load(\n", " filename=os.path.join(worker.local_directory, model_file), model_type=\"xgboost\"\n", " )\n", + " worker.data[\"fil_model\"]\n", "\n", "\n", "def predict(input_df):\n", @@ -1232,16 +2017,16 @@ " zf.close()\n", " # check to see if local directory present in workers\n", " # if not present make it\n", - " fut = client.run(checkOrMakeLocalDir)\n", + " fut = client.submit(checkOrMakeLocalDir)\n", " wait(fut)\n", " # upload the zip file in workers\n", " fut = client.upload_file(f\"./{zip_file_name}\")\n", " wait(fut)\n", " # unzip file in the workers\n", - " fut = client.run(unzipFile, zip_file_name)\n", + " fut = client.submit(unzipFile, zip_file_name)\n", " wait(fut)\n", " # load model using FIL in workers\n", - " fut = client.run(workerModelInit, model_file_name)\n", + " fut = client.submit(workerModelInit, model_file_name)\n", " wait(fut)" ] }, @@ -1254,7 +2039,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 82, "metadata": {}, "outputs": [], "source": [ @@ -1271,9 +2056,45 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 79, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/home/azureuser/miniconda3/envs/rapids-23.06/lib/python3.10/site-packages/dask/dataframe/core.py:6984: FutureWarning: Meta is not valid, `map_partitions` and `map_overlap` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.\n", + " warnings.warn(\n" + ] + }, + { + "ename": "KeyError", + "evalue": "'fil_model'", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mKeyError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[79], line 3\u001b[0m\n\u001b[1;32m 1\u001b[0m tic \u001b[38;5;241m=\u001b[39m timer()\n\u001b[1;32m 2\u001b[0m predictions \u001b[38;5;241m=\u001b[39m X_infer\u001b[38;5;241m.\u001b[39mmap_partitions(predict, meta\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mfloat\u001b[39m\u001b[38;5;124m\"\u001b[39m) \u001b[38;5;66;03m# this is like MPI reduce\u001b[39;00m\n\u001b[0;32m----> 3\u001b[0m y_pred \u001b[38;5;241m=\u001b[39m \u001b[43mpredictions\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 4\u001b[0m wait(y_pred)\n\u001b[1;32m 5\u001b[0m toc \u001b[38;5;241m=\u001b[39m timer()\n", + "File \u001b[0;32m~/miniconda3/envs/rapids-23.06/lib/python3.10/site-packages/dask/base.py:314\u001b[0m, in \u001b[0;36mDaskMethodsMixin.compute\u001b[0;34m(self, **kwargs)\u001b[0m\n\u001b[1;32m 290\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mcompute\u001b[39m(\u001b[38;5;28mself\u001b[39m, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs):\n\u001b[1;32m 291\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"Compute this dask collection\u001b[39;00m\n\u001b[1;32m 292\u001b[0m \n\u001b[1;32m 293\u001b[0m \u001b[38;5;124;03m This turns a lazy Dask collection into its in-memory equivalent.\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 312\u001b[0m \u001b[38;5;124;03m dask.base.compute\u001b[39;00m\n\u001b[1;32m 313\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[0;32m--> 314\u001b[0m (result,) \u001b[38;5;241m=\u001b[39m \u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtraverse\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;28;43;01mFalse\u001b[39;49;00m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 315\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n", + "File \u001b[0;32m~/miniconda3/envs/rapids-23.06/lib/python3.10/site-packages/dask/base.py:599\u001b[0m, in \u001b[0;36mcompute\u001b[0;34m(traverse, optimize_graph, scheduler, get, *args, **kwargs)\u001b[0m\n\u001b[1;32m 596\u001b[0m keys\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_keys__())\n\u001b[1;32m 597\u001b[0m postcomputes\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_postcompute__())\n\u001b[0;32m--> 599\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[43mschedule\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdsk\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mkeys\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 600\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m repack([f(r, \u001b[38;5;241m*\u001b[39ma) \u001b[38;5;28;01mfor\u001b[39;00m r, (f, a) \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mzip\u001b[39m(results, postcomputes)])\n", + "File \u001b[0;32m~/miniconda3/envs/rapids-23.06/lib/python3.10/site-packages/distributed/client.py:3186\u001b[0m, in \u001b[0;36mClient.get\u001b[0;34m(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)\u001b[0m\n\u001b[1;32m 3184\u001b[0m should_rejoin \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mFalse\u001b[39;00m\n\u001b[1;32m 3185\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m-> 3186\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgather\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpacked\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mdirect\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdirect\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 3187\u001b[0m \u001b[38;5;28;01mfinally\u001b[39;00m:\n\u001b[1;32m 3188\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m futures\u001b[38;5;241m.\u001b[39mvalues():\n", + "File \u001b[0;32m~/miniconda3/envs/rapids-23.06/lib/python3.10/site-packages/distributed/client.py:2345\u001b[0m, in \u001b[0;36mClient.gather\u001b[0;34m(self, futures, errors, direct, asynchronous)\u001b[0m\n\u001b[1;32m 2343\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 2344\u001b[0m local_worker \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m\n\u001b[0;32m-> 2345\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 2346\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_gather\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2347\u001b[0m \u001b[43m \u001b[49m\u001b[43mfutures\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2348\u001b[0m \u001b[43m \u001b[49m\u001b[43merrors\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43merrors\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2349\u001b[0m \u001b[43m \u001b[49m\u001b[43mdirect\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdirect\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2350\u001b[0m \u001b[43m \u001b[49m\u001b[43mlocal_worker\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mlocal_worker\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2351\u001b[0m \u001b[43m \u001b[49m\u001b[43masynchronous\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43masynchronous\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 2352\u001b[0m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n", + "File \u001b[0;32m~/miniconda3/envs/rapids-23.06/lib/python3.10/site-packages/distributed/utils.py:349\u001b[0m, in \u001b[0;36mSyncMethodMixin.sync\u001b[0;34m(self, func, asynchronous, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 347\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m future\n\u001b[1;32m 348\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m--> 349\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43msync\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 350\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mloop\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mfunc\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mcallback_timeout\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcallback_timeout\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\n\u001b[1;32m 351\u001b[0m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\n", + "File \u001b[0;32m~/miniconda3/envs/rapids-23.06/lib/python3.10/site-packages/distributed/utils.py:416\u001b[0m, in \u001b[0;36msync\u001b[0;34m(loop, func, callback_timeout, *args, **kwargs)\u001b[0m\n\u001b[1;32m 414\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m error:\n\u001b[1;32m 415\u001b[0m typ, exc, tb \u001b[38;5;241m=\u001b[39m error\n\u001b[0;32m--> 416\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exc\u001b[38;5;241m.\u001b[39mwith_traceback(tb)\n\u001b[1;32m 417\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 418\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n", + "File \u001b[0;32m~/miniconda3/envs/rapids-23.06/lib/python3.10/site-packages/distributed/utils.py:389\u001b[0m, in \u001b[0;36msync..f\u001b[0;34m()\u001b[0m\n\u001b[1;32m 387\u001b[0m future \u001b[38;5;241m=\u001b[39m wait_for(future, callback_timeout)\n\u001b[1;32m 388\u001b[0m future \u001b[38;5;241m=\u001b[39m asyncio\u001b[38;5;241m.\u001b[39mensure_future(future)\n\u001b[0;32m--> 389\u001b[0m result \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01myield\u001b[39;00m future\n\u001b[1;32m 390\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m:\n\u001b[1;32m 391\u001b[0m error \u001b[38;5;241m=\u001b[39m sys\u001b[38;5;241m.\u001b[39mexc_info()\n", + "File \u001b[0;32m~/miniconda3/envs/rapids-23.06/lib/python3.10/site-packages/tornado/gen.py:767\u001b[0m, in \u001b[0;36mRunner.run\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m 765\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[1;32m 766\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 767\u001b[0m value \u001b[38;5;241m=\u001b[39m \u001b[43mfuture\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mresult\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 768\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m e:\n\u001b[1;32m 769\u001b[0m \u001b[38;5;66;03m# Save the exception for later. It's important that\u001b[39;00m\n\u001b[1;32m 770\u001b[0m \u001b[38;5;66;03m# gen.throw() not be called inside this try/except block\u001b[39;00m\n\u001b[1;32m 771\u001b[0m \u001b[38;5;66;03m# because that makes sys.exc_info behave unexpectedly.\u001b[39;00m\n\u001b[1;32m 772\u001b[0m exc: Optional[\u001b[38;5;167;01mException\u001b[39;00m] \u001b[38;5;241m=\u001b[39m e\n", + "File \u001b[0;32m~/miniconda3/envs/rapids-23.06/lib/python3.10/site-packages/distributed/client.py:2208\u001b[0m, in \u001b[0;36mClient._gather\u001b[0;34m(self, futures, errors, direct, local_worker)\u001b[0m\n\u001b[1;32m 2206\u001b[0m exc \u001b[38;5;241m=\u001b[39m CancelledError(key)\n\u001b[1;32m 2207\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m-> 2208\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exception\u001b[38;5;241m.\u001b[39mwith_traceback(traceback)\n\u001b[1;32m 2209\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exc\n\u001b[1;32m 2210\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m errors \u001b[38;5;241m==\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mskip\u001b[39m\u001b[38;5;124m\"\u001b[39m:\n", + "File \u001b[0;32m/opt/conda/envs/rapids/lib/python3.10/site-packages/dask/optimization.py:990\u001b[0m, in \u001b[0;36m__call__\u001b[0;34m()\u001b[0m\n", + "File \u001b[0;32m/opt/conda/envs/rapids/lib/python3.10/site-packages/dask/core.py:149\u001b[0m, in \u001b[0;36mget\u001b[0;34m()\u001b[0m\n", + "File \u001b[0;32m/opt/conda/envs/rapids/lib/python3.10/site-packages/dask/core.py:119\u001b[0m, in \u001b[0;36m_execute_task\u001b[0;34m()\u001b[0m\n", + "File \u001b[0;32m/opt/conda/envs/rapids/lib/python3.10/site-packages/dask/utils.py:73\u001b[0m, in \u001b[0;36mapply\u001b[0;34m()\u001b[0m\n", + "File \u001b[0;32m/opt/conda/envs/rapids/lib/python3.10/site-packages/dask/dataframe/core.py:7006\u001b[0m, in \u001b[0;36mapply_and_enforce\u001b[0;34m()\u001b[0m\n", + "Cell \u001b[0;32mIn[74], line 23\u001b[0m, in \u001b[0;36mpredict\u001b[0;34m()\u001b[0m\n\u001b[1;32m 20\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mpredict\u001b[39m(input_df):\n\u001b[1;32m 21\u001b[0m \u001b[38;5;66;03m# this function will run in each worker and predict \u001b[39;00m\n\u001b[1;32m 22\u001b[0m worker \u001b[38;5;241m=\u001b[39m get_worker()\n\u001b[0;32m---> 23\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m worker\u001b[38;5;241m.\u001b[39mdata[\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mfil_model\u001b[39m\u001b[38;5;124m\"\u001b[39m]\u001b[38;5;241m.\u001b[39mpredict(input_df)\n", + "File \u001b[0;32m/opt/conda/envs/rapids/lib/python3.10/site-packages/dask_cuda/device_host_file.py:262\u001b[0m, in \u001b[0;36m__getitem__\u001b[0;34m()\u001b[0m\n", + "\u001b[0;31mKeyError\u001b[0m: 'fil_model'" + ] + } + ], "source": [ "tic = timer()\n", "predictions = X_infer.map_partitions(predict, meta=\"float\") # this is like MPI reduce\n", @@ -1285,9 +2106,17 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 70, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "It took -18149.079091776 seconds to predict on 5426301 rows using FIL distributedly on each worker\n" + ] + } + ], "source": [ "rows_csv = X_infer.iloc[:, 0].shape[0].compute()\n", "print(\n", @@ -1297,9 +2126,23 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 71, + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Final - RMSE: 2.2907634\n" + ] + } + ], "source": [ "tic = timer()\n", "score = mean_squared_error(y_pred, _y_test)\n", @@ -1310,7 +2153,6 @@ { "cell_type": "markdown", "metadata": { - "jp-MarkdownHeadingCollapsed": true, "tags": [] }, "source": [ @@ -1330,9 +2172,9 @@ ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "rapids-23.06", "language": "python", - "name": "python3" + "name": "rapids-23.06" }, "language_info": { "codemirror_mode": { @@ -1344,7 +2186,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.7" + "version": "3.10.12" } }, "nbformat": 4,