diff --git a/intermediate_notebooks/E2E/mortgage/mortgage.ipynb b/intermediate_notebooks/E2E/mortgage/mortgage.ipynb new file mode 100644 index 00000000..9d99bfd5 --- /dev/null +++ b/intermediate_notebooks/E2E/mortgage/mortgage.ipynb @@ -0,0 +1,1198 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Mortgage Workflow\n", + "\n", + "## The Dataset\n", + "The dataset used with this workflow is derived from [Fannie Mae’s Single-Family Loan Performance Data](http://www.fanniemae.com/portal/funding-the-market/data/loan-performance-data.html) with all rights reserved by Fannie Mae. This processed dataset is redistributed with permission and consent from Fannie Mae.\n", + "\n", + "To acquire this dataset, please visit [RAPIDS Datasets Homepage](https://docs.rapids.ai/datasets/mortgage-data). We've also added some cells below to make it easier to download the data, set parameters, and run the notebook.\n", + "\n", + "## Introduction\n", + "The Mortgage workflow is composed of three core phases:\n", + "\n", + "1. ETL - Extract, Transform, Load\n", + "2. Data Conversion\n", + "3. ML - Training\n", + "\n", + "### ETL\n", + "Data is \n", + "1. Read in from storage\n", + "2. Transformed to emphasize key features\n", + "3. Loaded into volatile memory for conversion\n", + "\n", + "### Data Conversion\n", + "Features are\n", + "1. Broken into (labels, data) pairs\n", + "2. Distributed across many workers\n", + "3. Converted into compressed sparse row (CSR) matrix format for XGBoost\n", + "\n", + "### Machine Learning\n", + "The CSR data is fed into a distributed training session with Dask-XGBoost" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Imports statements" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "env: NCCL_P2P_DISABLE=1 # Necessary for NCCL < 2.4\n" + ] + } + ], + "source": [ + "%env NCCL_P2P_DISABLE=1 # Necessary for NCCL < 2.4" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "import dask_xgboost as dxgb_gpu\n", + "import dask\n", + "import dask_cudf\n", + "from dask_cuda import LocalCUDACluster\n", + "from dask.delayed import delayed\n", + "from dask.distributed import Client, wait\n", + "import cudf\n", + "\n", + "import pynvml\n", + "import numpy as np\n", + "import xgboost as xgb\n", + "\n", + "from collections import OrderedDict\n", + "import gc\n", + "from glob import glob\n", + "import os" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Set up RMM and Dask_cudf cluster" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "While `dask_cudf` allows RAPIDS to use all the GPUs in the cluster as a single, large GPU, if the data is far larger than your current dask cluster size, you will have to use `RMM`. RMM requires you to declare the number of workers RMM defaults to 50% of your GPU memory size. However, you are able to manage the memory to use up to 100% and that may help your performance, and maybe some out of memory errors. Below are some 90%+ values that you can use to set your `initial_pool_size`. They are commented out, as is setting the inital pool size. Uncomment the one you want to use." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Define your GPU Memory Allocation" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "# Decide initial pool size based on the memory size of the smallest GPU in your cluster and uncomment that line, as well as the initial_pool_size in the cell after\n", + "# ips = 6<<30 #8gb card\n", + "# ips = 14<<30 #16gb card\n", + "# ips = 22<<30 #24gb card\n", + "# ips = 29<<30 #32gb card\n", + "# ips = 43<<30 #48gb card\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Define the number of GPU workers that you have" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "n_workers = 4 # Please change your n_workers amount to the number of GPUs you have" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Set up RMM pool and start Dask cluster" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "def initialize_rmm_pool():\n", + " import rmm\n", + " rmm.reinitialize(pool_allocator=True, \n", + " #initial_pool_size=ips, #RMM defaults to 50% GPU memory. You may get further performance by increasing the pool size per GPU. Do not go above your smallest GPU mem size\n", + " managed_memory=True)\n", + "\n", + "def initialize_rmm_no_pool():\n", + " import rmm\n", + "\n", + " rmm.reinitialize(pool_allocator=False, managed_memory=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "LocalCUDACluster('tcp://127.0.0.1:43807', workers=4, threads=4, memory=270.40 GB)\n" + ] + }, + { + "data": { + "text/html": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "
\n", + "

Client

\n", + "\n", + "
\n", + "

Cluster

\n", + "
    \n", + "
  • Workers: 4
  • \n", + "
  • Cores: 4
  • \n", + "
  • Memory: 270.40 GB
  • \n", + "
\n", + "
" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "dask.config.set({'distributed.scheduler.work-stealing': False})\n", + "dask.config.get('distributed.scheduler.work-stealing')\n", + "dask.config.set({'distributed.scheduler.bandwidth': 1})\n", + "dask.config.get('distributed.scheduler.bandwidth')\n", + "cluster = LocalCUDACluster(n_workers=n_workers, threads_per_worker=1) \n", + "print(cluster)\n", + "client = Client(cluster)\n", + "client.restart()\n", + "client.run(initialize_rmm_pool)\n", + "client" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Get Data" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We've made it easy for you download them and start your ETL. Please select the range of years that you want to analyze. The more years you pick, the longer it will take to download the data. **If you already have the data, please edit and run only the first cell**\n", + "\n", + "The [RAPIDS Datasets Homepage](https://docs.rapids.ai/datasets/mortgage-data) will give you further information about the datasets, such as size. Our largest dataset is 196GB uncompressed, so download the dataset that can fit in your own storage requirements." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "/raid/tdyer/rapids-h2o/mortgage/data/mortgage/\n" + ] + } + ], + "source": [ + "# Edit and uncomment `end_year` if you already have the dataset you want downloaded. Please keep track of that end year. \n", + "# If you don't have the data, you can use the below cells to download the size you want\n", + "path = os.getcwd()\n", + "data_dir = path + \"/data/mortgage/\" #your folder where the mortgage data is located.\n", + "start_year = 2000\n", + "print(data_dir)\n", + "#end_year = 2000 #uncomment only if you have the data downloaded to your machine already and know your end year. " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "If you have a dataset already, declare its end year and \n", + "don't \n", + "- download the data again\n", + "- untar your data, if you have already done so." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "end_year = 2007" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "If you don't have the dataset, please choose a span of years to download by commenting our default years and uncommenting the amount of years that you want. Our default is the 2 years dataset." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " % Total % Received % Xferd Average Speed Time Time Time Current\n", + " Dload Upload Total Spent Left Speed\n", + " 78 448M 78 354M 0 0 552k 0 0:13:52 0:10:56 0:02:56 491k7 547k 0:13:32 0:08:20 0:05:12 528k 0 0:13:38 0:08:53 0:04:45 493k 0 0:13:47 0:10:09 0:03:38 503k" + ] + } + ], + "source": [ + "#One year\n", + "# !curl http://rapidsai-data.s3-website.us-east-2.amazonaws.com/notebook-mortgage-data/mortgage_2000.tgz --create-dirs -o data/mortgage/mortgage_compressed.tgz\n", + "# end_year = 2000" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Two years\n", + "!curl http://rapidsai-data.s3-website.us-east-2.amazonaws.com/notebook-mortgage-data/mortgage_2000-2001.tgz --create-dirs -o data/mortgage/mortgage_compressed.tgz\n", + "end_year = 2001" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Four years\n", + "# !curl http://rapidsai-data.s3-website.us-east-2.amazonaws.com/notebook-mortgage-data/mortgage_2000-2003.tgz --create-dirs -o data/mortgage/mortgage_compressed.tgz\n", + "# end_year = 2003" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " % Total % Received % Xferd Average Speed Time Time Time Current\n", + " Dload Upload Total Spent Left Speed\n", + "100 14.0G 100 14.0G 0 0 7916k 0 0:31:00 0:31:00 --:--:-- 16.4M 6188k 0 0:39:39 0:01:10 0:38:29 3913k 0 4457k 0 0:55:04 0:02:44 0:52:20 1307k3k 0 0:53:05 0:09:53 0:43:12 5156k 4620k 0 0:53:07 0:10:30 0:42:37 8265k 0 4617k 0 0:53:09 0:11:54 0:41:15 4241k807k 0 0:51:03 0:20:14 0:30:49 6953k 0 0 5637k 0 0:43:32 0:25:20 0:18:12 11.5M0:43:05 0:25:34 0:17:31 11.0M0G 62 8976M 0 0 5822k 0 0:42:09 0:26:18 0:15:51 8155k 0 0:40:59 0:26:43 0:14:16 18.9M 0:29:30 0:04:23 19.5M:22 0:30:42 0:00:40 26.7M\n" + ] + } + ], + "source": [ + "#Eight years\n", + "# !curl http://rapidsai-data.s3-website.us-east-2.amazonaws.com/notebook-mortgage-data/mortgage_2000-2007.tgz --create-dirs -o data/mortgage/mortgage_compressed.tgz\n", + "# end_year = 2007" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Sixteen years\n", + "# !curl http://rapidsai-data.s3-website.us-east-2.amazonaws.com/notebook-mortgage-data/mortgage_2000-2015.tgz --create-dirs -o data/mortgage/mortgage_compressed.tgz\n", + "# end_year = 2015" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Seventeen years\n", + "# !curl http://rapidsai-data.s3-website.us-east-2.amazonaws.com/notebook-mortgage-data/mortgage_2000-2016.tgz --create-dirs -o data/mortgage/mortgage_compressed.tgz\n", + "# end_year = 2016" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "names.csv\n", + "acq/Acquisition_2007Q4.txt\n", + "acq/Acquisition_2007Q3.txt\n", + "acq/Acquisition_2007Q2.txt\n", + "acq/Acquisition_2007Q1.txt\n", + "acq/Acquisition_2006Q4.txt\n", + "acq/Acquisition_2006Q3.txt\n", + "acq/Acquisition_2006Q2.txt\n", + "acq/Acquisition_2006Q1.txt\n", + "acq/Acquisition_2005Q4.txt\n", + "acq/Acquisition_2005Q3.txt\n", + "acq/Acquisition_2005Q2.txt\n", + "acq/Acquisition_2005Q1.txt\n", + "acq/Acquisition_2004Q4.txt\n", + "acq/Acquisition_2004Q3.txt\n", + "acq/Acquisition_2004Q2.txt\n", + "acq/Acquisition_2004Q1.txt\n", + "acq/Acquisition_2003Q4.txt\n", + "acq/Acquisition_2003Q3.txt\n", + "acq/Acquisition_2003Q2.txt\n", + "acq/Acquisition_2003Q1.txt\n", + "acq/Acquisition_2002Q4.txt\n", + "acq/Acquisition_2002Q3.txt\n", + "acq/Acquisition_2002Q2.txt\n", + "acq/Acquisition_2002Q1.txt\n", + "acq/Acquisition_2001Q4.txt\n", + "acq/Acquisition_2001Q3.txt\n", + "acq/Acquisition_2001Q2.txt\n", + "acq/Acquisition_2001Q1.txt\n", + "acq/Acquisition_2000Q4.txt\n", + "acq/Acquisition_2000Q3.txt\n", + "acq/Acquisition_2000Q2.txt\n", + "acq/Acquisition_2000Q1.txt\n", + "perf/Performance_2007Q4.txt\n", + "perf/Performance_2007Q3.txt\n", + "perf/Performance_2007Q2.txt\n", + "perf/Performance_2007Q1.txt\n", + "perf/Performance_2006Q4.txt\n", + "perf/Performance_2006Q3.txt\n", + "perf/Performance_2006Q2.txt\n", + "perf/Performance_2006Q1.txt\n", + "perf/Performance_2005Q4.txt_1\n", + "perf/Performance_2005Q4.txt_0\n", + "perf/Performance_2005Q3.txt_1\n", + "perf/Performance_2005Q3.txt_0\n", + "perf/Performance_2005Q2.txt\n", + "perf/Performance_2005Q1.txt\n", + "perf/Performance_2004Q4.txt\n", + "perf/Performance_2004Q3.txt\n", + "perf/Performance_2004Q2.txt_1\n", + "perf/Performance_2004Q2.txt_0\n", + "perf/Performance_2004Q1.txt_1\n", + "perf/Performance_2004Q1.txt_0\n", + "perf/Performance_2003Q4.txt_1_1\n", + "perf/Performance_2003Q4.txt_1_0\n", + "perf/Performance_2003Q4.txt_0_1\n", + "perf/Performance_2003Q4.txt_0_0\n", + "perf/Performance_2003Q3.txt_1_1_1\n", + "perf/Performance_2003Q3.txt_1_1_0\n", + "perf/Performance_2003Q3.txt_1_0_1\n", + "perf/Performance_2003Q3.txt_1_0_0\n", + "perf/Performance_2003Q3.txt_0_1_1\n", + "perf/Performance_2003Q3.txt_0_1_0\n", + "perf/Performance_2003Q3.txt_0_0_1\n", + "perf/Performance_2003Q3.txt_0_0_0\n", + "perf/Performance_2003Q2.txt_1_1_1\n", + "perf/Performance_2003Q2.txt_1_1_0\n", + "perf/Performance_2003Q2.txt_1_0_1\n", + "perf/Performance_2003Q2.txt_1_0_0\n", + "perf/Performance_2003Q2.txt_0_1_1\n", + "perf/Performance_2003Q2.txt_0_1_0\n", + "perf/Performance_2003Q2.txt_0_0_1\n", + "perf/Performance_2003Q2.txt_0_0_0\n", + "perf/Performance_2003Q1.txt_1_1\n", + "perf/Performance_2003Q1.txt_1_0\n", + "perf/Performance_2003Q1.txt_0_1\n", + "perf/Performance_2003Q1.txt_0_0\n", + "perf/Performance_2002Q4.txt_1_1\n", + "perf/Performance_2002Q4.txt_1_0\n", + "perf/Performance_2002Q4.txt_0_1\n", + "perf/Performance_2002Q4.txt_0_0\n", + "perf/Performance_2002Q3.txt_1\n", + "perf/Performance_2002Q3.txt_0\n", + "perf/Performance_2002Q2.txt\n", + "perf/Performance_2002Q1.txt_1\n", + "perf/Performance_2002Q1.txt_0\n", + "perf/Performance_2001Q4.txt_1\n", + "perf/Performance_2001Q4.txt_0\n", + "perf/Performance_2001Q3.txt_1\n", + "perf/Performance_2001Q3.txt_0\n", + "perf/Performance_2001Q2.txt_1\n", + "perf/Performance_2001Q2.txt_0\n", + "perf/Performance_2001Q1.txt\n", + "perf/Performance_2000Q4.txt\n", + "perf/Performance_2000Q3.txt\n", + "perf/Performance_2000Q2.txt\n", + "perf/Performance_2000Q1.txt\n" + ] + } + ], + "source": [ + "#untar your data\n", + "!tar -xvzf data/mortgage/mortgage_compressed.tgz -C data/mortgage/" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Define the paths to data and set the size of the dataset" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The cell below serves as a quick start on mortgage data for the year 2000. Visit the [RAPIDS Datasets Homepage](https://docs.rapids.ai/datasets/mortgage-data) and update `data_url` below if you want to try other years" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "acq_data_path = data_dir + \"acq\"\n", + "perf_data_path = data_dir + \"perf\"\n", + "col_names_path = data_dir + \"names.csv\"" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/home/tdyer/miniconda3/envs/rapids13/lib/python3.6/site-packages/fsspec/implementations/local.py:33: FutureWarning: The default value of auto_mkdir=True has been deprecated and will be changed to auto_mkdir=False by default in a future release.\n", + " FutureWarning,\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + " seller_name\"|\"new\n", + "0 WITMER FUNDING|LLC\"|\"Witmer\n", + "1 WELLS FARGO CREDIT RISK TRANSFER SECURITIES TR...\n", + "2 WELLS FARGO BANK| NA\"|\"Wells Fargo\n", + "3 WELLS FARGO BANK|N.A.\"|\"Wells Fargo\n", + "4 WELLS FARGO BANK|NA\"|\"Wells Fargo\n" + ] + } + ], + "source": [ + "#test to see you are reading from the proper directory. You will see an output if you are.\n", + "temp = cudf.read_csv(col_names_path)\n", + "print(temp.head())\n", + "del temp" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Define functions to encapsulate the workflow into a single call" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [], + "source": [ + "#You can edit your number of dask_cudf partitions here, depending on how many GPUs you have and how big your dataset is. The number chosen can boost or reduce performance\n", + "n_partitions = 40" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "def process_quarter_gpu(year=2000, quarter=1, perf_file=\"\"):\n", + " ml_arrays = run_gpu_workflow(quarter=quarter,year=year, perf_file=perf_file)\n", + " return ml_arrays\n", + "\n", + "def null_workaround(df, **kwargs):\n", + " for column, data_type in df.dtypes.items():\n", + " if str(data_type) == \"str\":\n", + " df[column] = df[column].astype('int32').fillna(-1)\n", + " if str(data_type) == \"category\":\n", + " df[column] = df[column].astype('int32').fillna(-1)\n", + " if str(data_type) in ['int8', 'int16', 'int32', 'int64', 'float32', 'float64']:\n", + " df[column] = df[column].fillna(np.dtype(data_type).type(-1))\n", + " return df\n", + "\n", + "def gpu_load_performance_csv(performance_path, **kwargs):\n", + " \"\"\" Loads performance data\n", + "\n", + " Returns\n", + " -------\n", + " Dask GPU DataFrame\n", + " \"\"\"\n", + " \n", + " cols = [\n", + " \"loan_id\", \"monthly_reporting_period\", \"servicer\", \"interest_rate\", \"current_actual_upb\",\n", + " \"loan_age\", \"remaining_months_to_legal_maturity\", \"adj_remaining_months_to_maturity\",\n", + " \"maturity_date\", \"msa\", \"current_loan_delinquency_status\", \"mod_flag\", \"zero_balance_code\",\n", + " \"zero_balance_effective_date\", \"last_paid_installment_date\", \"foreclosed_after\",\n", + " \"disposition_date\", \"foreclosure_costs\", \"prop_preservation_and_repair_costs\",\n", + " \"asset_recovery_costs\", \"misc_holding_expenses\", \"holding_taxes\", \"net_sale_proceeds\",\n", + " \"credit_enhancement_proceeds\", \"repurchase_make_whole_proceeds\", \"other_foreclosure_proceeds\",\n", + " \"non_interest_bearing_upb\", \"principal_forgiveness_upb\", \"repurchase_make_whole_proceeds_flag\",\n", + " \"foreclosure_principal_write_off_amount\", \"servicing_activity_indicator\"\n", + " ]\n", + " \n", + " dtypes = OrderedDict([\n", + " (\"loan_id\", \"int64\"),\n", + " (\"monthly_reporting_period\", \"date\"),\n", + " (\"servicer\", \"str\"),\n", + " (\"interest_rate\", \"float64\"),\n", + " (\"current_actual_upb\", \"float64\"),\n", + " (\"loan_age\", \"float64\"),\n", + " (\"remaining_months_to_legal_maturity\", \"float64\"),\n", + " (\"adj_remaining_months_to_maturity\", \"float64\"),\n", + " (\"maturity_date\", \"date\"),\n", + " (\"msa\", \"float64\"),\n", + " (\"current_loan_delinquency_status\", \"int32\"),\n", + " (\"mod_flag\", \"category\"),\n", + " (\"zero_balance_code\", \"str\"),\n", + " (\"zero_balance_effective_date\", \"date\"),\n", + " (\"last_paid_installment_date\", \"date\"),\n", + " (\"foreclosed_after\", \"date\"),\n", + " (\"disposition_date\", \"date\"),\n", + " (\"foreclosure_costs\", \"float64\"),\n", + " (\"prop_preservation_and_repair_costs\", \"float64\"),\n", + " (\"asset_recovery_costs\", \"float64\"),\n", + " (\"misc_holding_expenses\", \"float64\"),\n", + " (\"holding_taxes\", \"float64\"),\n", + " (\"net_sale_proceeds\", \"float64\"),\n", + " (\"credit_enhancement_proceeds\", \"float64\"),\n", + " (\"repurchase_make_whole_proceeds\", \"float64\"),\n", + " (\"other_foreclosure_proceeds\", \"float64\"),\n", + " (\"non_interest_bearing_upb\", \"float64\"),\n", + " (\"principal_forgiveness_upb\", \"float64\"),\n", + " (\"repurchase_make_whole_proceeds_flag\", \"str\"),\n", + " (\"foreclosure_principal_write_off_amount\", \"float64\"),\n", + " (\"servicing_activity_indicator\", \"category\")\n", + " ])\n", + "\n", + " print(performance_path)\n", + " pdf = dask_cudf.read_csv(performance_path, names=cols, delimiter='|', dtype=list(dtypes.values()), header= True, npartitions = n_partitions) \n", + " return pdf\n", + "\n", + "def gpu_load_acquisition_csv(acquisition_path, **kwargs):\n", + " \"\"\" Loads acquisition data\n", + "\n", + " Returns\n", + " -------\n", + " Dask GPU DataFrame\n", + " \"\"\"\n", + " \n", + " cols = [\n", + " 'loan_id', 'orig_channel', 'seller_name', 'orig_interest_rate', 'orig_upb', 'orig_loan_term', \n", + " 'orig_date', 'first_pay_date', 'orig_ltv', 'orig_cltv', 'num_borrowers', 'dti', 'borrower_credit_score', \n", + " 'first_home_buyer', 'loan_purpose', 'property_type', 'num_units', 'occupancy_status', 'property_state',\n", + " 'zip', 'mortgage_insurance_percent', 'product_type', 'coborrow_credit_score', 'mortgage_insurance_type', \n", + " 'relocation_mortgage_indicator'\n", + " ]\n", + " \n", + " dtypes = OrderedDict([\n", + " (\"loan_id\", \"int64\"),\n", + " (\"orig_channel\", \"str\"),\n", + " (\"seller_name\", \"str\"),\n", + " (\"orig_interest_rate\", \"float64\"),\n", + " (\"orig_upb\", \"int64\"),\n", + " (\"orig_loan_term\", \"int64\"),\n", + " (\"orig_date\", \"date\"),\n", + " (\"first_pay_date\", \"date\"),\n", + " (\"orig_ltv\", \"float64\"),\n", + " (\"orig_cltv\", \"float64\"),\n", + " (\"num_borrowers\", \"float64\"),\n", + " (\"dti\", \"float64\"),\n", + " (\"borrower_credit_score\", \"float64\"),\n", + " (\"first_home_buyer\", \"str\"),\n", + " (\"loan_purpose\", \"str\"),\n", + " (\"property_type\", \"str\"),\n", + " (\"num_units\", \"int64\"),\n", + " (\"occupancy_status\", \"str\"),\n", + " (\"property_state\", \"str\"),\n", + " (\"zip\", \"int64\"),\n", + " (\"mortgage_insurance_percent\", \"float64\"),\n", + " (\"product_type\", \"category\"),\n", + " (\"coborrow_credit_score\", \"float64\"),\n", + " (\"mortgage_insurance_type\", \"float64\"),\n", + " (\"relocation_mortgage_indicator\", \"str\")\n", + " ])\n", + " \n", + " print(acquisition_path)\n", + " adf = dask_cudf.read_csv(acquisition_path, names=cols, delimiter='|', dtype=list(dtypes.values()), header= True, npartitions = 1)\n", + " return adf\n", + "\n", + "def gpu_load_names(**kwargs):\n", + " \"\"\" Loads names used for renaming the banks\n", + " \n", + " Returns\n", + " -------\n", + " Dask GPU DataFrame\n", + " \"\"\"\n", + "\n", + " cols = [\n", + " 'seller_name', 'new'\n", + " ]\n", + " \n", + " dtypes = OrderedDict([\n", + " (\"seller_name\", \"str\"),\n", + " (\"new\", \"category\"),\n", + " ])\n", + " ndf = dask_cudf.read_csv(col_names_path, names=cols, delimiter='|', dtype=list(dtypes.values()), header= True, npartitions = 1)\n", + " return ndf" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [], + "source": [ + "def create_ever_features(gdf, **kwargs):\n", + " everdf = gdf[['loan_id', 'current_loan_delinquency_status']]\n", + " everdf = everdf.groupby('loan_id').max()\n", + " everdf = everdf.reset_index()\n", + " del(gdf)\n", + " everdf['ever_30'] = (everdf['current_loan_delinquency_status'] >= 1).astype('int8')\n", + " everdf['ever_90'] = (everdf['current_loan_delinquency_status'] >= 3).astype('int8')\n", + " everdf['ever_180'] = (everdf['current_loan_delinquency_status'] >= 6).astype('int8')\n", + " everdf = everdf.drop('current_loan_delinquency_status', axis=1)\n", + " return everdf" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [], + "source": [ + "def create_delinq_features(gdf, **kwargs):\n", + " \"\"\" Gathers loans with 30, 90 and 180 day delinquency status and merges them into one dataframe\n", + " \n", + " Returns\n", + " -------\n", + " Dask GPU DataFrame\n", + " \"\"\"\n", + " delinq_gdf = gdf[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status']]\n", + " del(gdf)\n", + " delinq_30 = delinq_gdf.query('current_loan_delinquency_status >= 1')[['loan_id', 'monthly_reporting_period']].groupby('loan_id').min()\n", + " delinq_30 = delinq_30.reset_index()\n", + " delinq_30['delinquency_30'] = delinq_30['monthly_reporting_period']\n", + " delinq_30 = delinq_30.drop('monthly_reporting_period', axis=1)\n", + " delinq_90 = delinq_gdf.query('current_loan_delinquency_status >= 3')[['loan_id', 'monthly_reporting_period']].groupby('loan_id').min()\n", + " delinq_90 = delinq_90.reset_index()\n", + " delinq_90['delinquency_90'] = delinq_90['monthly_reporting_period']\n", + " delinq_90 = delinq_90.drop('monthly_reporting_period', axis=1)\n", + " delinq_180 = delinq_gdf.query('current_loan_delinquency_status >= 6')[['loan_id', 'monthly_reporting_period']].groupby('loan_id').min()\n", + " delinq_180 = delinq_180.reset_index()\n", + " delinq_180['delinquency_180'] = delinq_180['monthly_reporting_period']\n", + " delinq_180 = delinq_180.drop('monthly_reporting_period', axis=1)\n", + " del(delinq_gdf)\n", + " delinq_merge = delinq_30.merge(delinq_90, on=['loan_id'])\n", + " delinq_merge['delinquency_90'] = delinq_merge['delinquency_90'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))\n", + " delinq_merge = delinq_merge.merge(delinq_180, on=['loan_id'])\n", + " delinq_merge['delinquency_180'] = delinq_merge['delinquency_180'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))\n", + " del(delinq_30)\n", + " del(delinq_90)\n", + " del(delinq_180)\n", + " return delinq_merge" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [], + "source": [ + "def join_ever_delinq_features(everdf_tmp, delinq_merge, **kwargs):\n", + " everdf = everdf_tmp.merge(delinq_merge, on=['loan_id'])\n", + " del(everdf_tmp)\n", + " del(delinq_merge)\n", + " everdf['delinquency_30'] = everdf['delinquency_30'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))\n", + " everdf['delinquency_90'] = everdf['delinquency_90'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))\n", + " everdf['delinquency_180'] = everdf['delinquency_180'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))\n", + " return everdf" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [], + "source": [ + "def create_joined_df(gdf, everdf, **kwargs):\n", + " test = gdf[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status', 'current_actual_upb']]\n", + " del(gdf)\n", + " test['timestamp'] = test['monthly_reporting_period']\n", + " test= test.drop('monthly_reporting_period', axis=1)\n", + " test['timestamp_month'] = test['timestamp'].dt.month\n", + " test['timestamp_year'] = test['timestamp'].dt.year\n", + " test['delinquency_12'] = test['current_loan_delinquency_status']\n", + " test = test.drop('current_loan_delinquency_status', axis=1)\n", + " test['upb_12'] = test['current_actual_upb']\n", + " test = test.drop('current_actual_upb', axis=1)\n", + " test['upb_12'] = test['upb_12'].fillna(999999999)\n", + " test['delinquency_12'] = test['delinquency_12'].fillna(-1)\n", + " \n", + " joined_df = test.merge(everdf, on=['loan_id'])\n", + " del(everdf)\n", + " del(test)\n", + " \n", + " joined_df['ever_30'] = joined_df['ever_30'].fillna(-1)\n", + " joined_df['ever_90'] = joined_df['ever_90'].fillna(-1)\n", + " joined_df['ever_180'] = joined_df['ever_180'].fillna(-1)\n", + " joined_df['delinquency_30'] = joined_df['delinquency_30'].fillna(-1)\n", + " joined_df['delinquency_90'] = joined_df['delinquency_90'].fillna(-1)\n", + " joined_df['delinquency_180'] = joined_df['delinquency_180'].fillna(-1)\n", + " \n", + " joined_df['timestamp_year'] = joined_df['timestamp_year'].astype('int32')\n", + " joined_df['timestamp_month'] = joined_df['timestamp_month'].astype('int32')\n", + " \n", + " return joined_df" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [], + "source": [ + "def create_12_mon_features(joined_df, **kwargs):\n", + " testdfs = []\n", + " n_months = 12\n", + " for y in range(1, n_months + 1):\n", + " tmpdf = joined_df[['loan_id', 'timestamp_year', 'timestamp_month', 'delinquency_12', 'upb_12']]\n", + " tmpdf['josh_months'] = tmpdf['timestamp_year'] * 12 + tmpdf['timestamp_month']\n", + " tmpdf['josh_mody_n'] = ((tmpdf['josh_months'].astype('float64') - 24000 - y) / 12).astype('int64')\n", + " tmpdf = tmpdf.groupby(['loan_id', 'josh_mody_n']).agg({'delinquency_12': 'max','upb_12': 'min'})\n", + " tmpdf = tmpdf.reset_index()\n", + " tmpdf['delinquency_12'] = (tmpdf['delinquency_12']>3).astype('int32')\n", + " tmpdf['delinquency_12'] +=(tmpdf['upb_12']==0).astype('int32')\n", + " tmpdf['timestamp_year'] = (((tmpdf['josh_mody_n'] * n_months) + 24000 + (y - 1)) / 12).astype('int16')\n", + " tmpdf['timestamp_month'] = np.int8(y)\n", + " tmpdf = tmpdf.drop('josh_mody_n', axis=1)\n", + " testdfs.append(tmpdf)\n", + " del(tmpdf)\n", + " del(joined_df)\n", + "\n", + " return dask_cudf.concat(testdfs)" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [], + "source": [ + "def combine_joined_12_mon(joined_df, testdf, **kwargs):\n", + " joined_df = joined_df.drop('delinquency_12', axis=1)\n", + " joined_df = joined_df.drop('upb_12', axis=1)\n", + " joined_df['timestamp_year'] = joined_df['timestamp_year'].astype('int16')\n", + " joined_df['timestamp_month'] = joined_df['timestamp_month'].astype('int8')\n", + " return joined_df.merge(testdf, on=['loan_id', 'timestamp_year', 'timestamp_month'])" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [], + "source": [ + "def final_performance_delinquency(gdf, joined_df, **kwargs):\n", + " merged = null_workaround(gdf)\n", + " joined_df = null_workaround(joined_df)\n", + " joined_df['timestamp_month'] = joined_df['timestamp_month'].astype('int8')\n", + " joined_df['timestamp_year'] = joined_df['timestamp_year'].astype('int16')\n", + " merged['timestamp_month'] = merged['monthly_reporting_period'].dt.month\n", + " merged['timestamp_month'] = merged['timestamp_month'].astype('int8')\n", + " merged['timestamp_year'] = merged['monthly_reporting_period'].dt.year\n", + " merged['timestamp_year'] = merged['timestamp_year'].astype('int16')\n", + " merged = merged.merge(joined_df, on=['loan_id', 'timestamp_year', 'timestamp_month'])\n", + " merged = merged.drop('timestamp_year', axis=1)\n", + " merged = merged.drop('timestamp_month', axis=1)\n", + " return merged" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [], + "source": [ + "def join_perf_acq_gdfs(perf, acq, **kwargs):\n", + " perf = null_workaround(perf)\n", + " acq = null_workaround(acq)\n", + " return perf.merge(acq, on=['loan_id'])" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": {}, + "outputs": [], + "source": [ + "def last_mile_cleaning(df, **kwargs):\n", + " drop_list = [\n", + " 'loan_id', 'orig_date', 'first_pay_date', 'seller_name',\n", + " 'monthly_reporting_period', 'last_paid_installment_date', 'maturity_date', 'ever_30', 'ever_90', 'ever_180',\n", + " 'delinquency_30', 'delinquency_90', 'delinquency_180', 'upb_12',\n", + " 'zero_balance_effective_date','foreclosed_after', 'disposition_date','timestamp'\n", + " ]\n", + " for column in drop_list:\n", + " df = df.drop(column, axis=1)\n", + " for col, dtype in df.dtypes.iteritems():\n", + " if str(dtype)=='category':\n", + " df[col] = df[col].cat.codes\n", + " df[col] = df[col].astype('float32')\n", + " df['delinquency_12'] = df['delinquency_12'] > 0\n", + " df['delinquency_12'] = df['delinquency_12'].fillna(False).astype('int32')\n", + " for column in df.columns:\n", + " df[column] = df[column].fillna(np.dtype(str(df[column].dtype)).type(-1))\n", + " return df" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [], + "source": [ + "def run_gpu_workflow(quarter=1, year=2000, perf_file=\"\", **kwargs):\n", + " \"\"\" Main function to perform ETL on the data. \n", + " \n", + " Returns\n", + " -------\n", + " Dask GPU DataFrame\n", + " \"\"\"\n", + " names = gpu_load_names()\n", + " acq_gdf = gpu_load_acquisition_csv(acquisition_path= acq_data_path + \"/Acquisition_\"\n", + " + str(year) + \"Q\" + str(quarter) + \".txt\")\n", + " acq_gdf = acq_gdf.merge(names, on=['seller_name'], how=\"left\")\n", + " acq_gdf = acq_gdf.drop('seller_name', axis=1)\n", + " acq_gdf['seller_name'] = acq_gdf['new']\n", + " acq_gdf = acq_gdf.drop('new', axis=1)\n", + " perf_df_tmp = gpu_load_performance_csv(perf_file)\n", + " gdf = perf_df_tmp\n", + " everdf = create_ever_features(gdf)\n", + " delinq_merge = create_delinq_features(gdf)\n", + " everdf = join_ever_delinq_features(everdf, delinq_merge)\n", + " del(delinq_merge)\n", + " joined_df = create_joined_df(gdf, everdf)\n", + " testdf = create_12_mon_features(joined_df)\n", + " joined_df = combine_joined_12_mon(joined_df, testdf)\n", + " del(testdf)\n", + " perf_df = final_performance_delinquency(gdf, joined_df)\n", + " del(gdf, joined_df)\n", + " final_gdf = join_perf_acq_gdfs(perf_df, acq_gdf)\n", + " del(perf_df)\n", + " del(acq_gdf)\n", + " final_gdf = last_mile_cleaning(final_gdf)\n", + " return final_gdf" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ETL" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Perform all of ETL with a single call to\n", + "```python\n", + "process_quarter_gpu(year=year, quarter=quarter, perf_file=file)\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "/raid/tdyer/rapids-h2o/mortgage/data/mortgage/acq/Acquisition_2000Q1.txt\n", + "/raid/tdyer/rapids-h2o/mortgage/data/mortgage/perf/Performance_2000Q1.txt\n", + "/raid/tdyer/rapids-h2o/mortgage/data/mortgage/acq/Acquisition_2000Q2.txt\n", + "/raid/tdyer/rapids-h2o/mortgage/data/mortgage/perf/Performance_2000Q2.txt\n", + "/raid/tdyer/rapids-h2o/mortgage/data/mortgage/acq/Acquisition_2000Q3.txt\n", + "/raid/tdyer/rapids-h2o/mortgage/data/mortgage/perf/Performance_2000Q3.txt\n", + "/raid/tdyer/rapids-h2o/mortgage/data/mortgage/acq/Acquisition_2000Q4.txt\n", + "/raid/tdyer/rapids-h2o/mortgage/data/mortgage/perf/Performance_2000Q4.txt\n", + "/raid/tdyer/rapids-h2o/mortgage/data/mortgage/acq/Acquisition_2001Q1.txt\n", + "/raid/tdyer/rapids-h2o/mortgage/data/mortgage/perf/Performance_2001Q1.txt\n", + "/raid/tdyer/rapids-h2o/mortgage/data/mortgage/acq/Acquisition_2001Q2.txt\n", + "/raid/tdyer/rapids-h2o/mortgage/data/mortgage/perf/Performance_2001Q2.txt_0\n", + "/raid/tdyer/rapids-h2o/mortgage/data/mortgage/acq/Acquisition_2001Q2.txt\n", + "/raid/tdyer/rapids-h2o/mortgage/data/mortgage/perf/Performance_2001Q2.txt_1\n" + ] + } + ], + "source": [ + "%%time\n", + "\n", + "# NOTE: The ETL calculates additional features which are then dropped before creating the XGBoost DMatrix.\n", + "# This can be optimized to avoid calculating the dropped features.\n", + "part_count = 4\n", + "\n", + "gpu_dfs = []\n", + "gpu_time = 0\n", + "quarter = 1\n", + "year = start_year\n", + "count = 0\n", + "while year <= end_year:\n", + " for file in glob(os.path.join(perf_data_path + \"/Performance_\" + str(year) + \"Q\" + str(quarter) + \"*\")):\n", + " gpu_dfs.append(process_quarter_gpu(year=year, quarter=quarter, perf_file=file))\n", + " count += 1\n", + " quarter += 1\n", + " if quarter == 5:\n", + " year += 1\n", + " quarter = 1\n", + "print(\"ETL for start_year:{} and end_year:{}\\n\".format(start_year,end_year))\n", + "wait(gpu_dfs)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#client.run(initialize_rmm_no_pool)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## XGBoost the Dataset" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Train and test on the dataset" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "First thing we do is split the data into a test and a training dataframe. Let's persist/wait our test dataset here" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "test_df = gpu_dfs[len(gpu_dfs)-1].persist()\n", + "_ = wait(test_df)\n", + "del gpu_dfs[len(gpu_dfs)-1]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Concatenate all the dask dataframes into one. We will `.persist()` and `wait` here as a milestone to reduce the persist/wait times later." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "train_df = dask_cudf.concat(gpu_dfs).persist(split_out=n_workers)\n", + "_ = wait(train_df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "train_data, test_data = dask.persist(train_df[train_df.columns.difference([\"delinquency_12\"])], test_df[test_df.columns.difference([\"delinquency_12\"])])\n", + "train_labels, test_labels = dask.persist(train_df[\"delinquency_12\"], test_df[\"delinquency_12\"])\n", + "_ = wait(train_data)\n", + "_ = wait(test_data)\n", + "_ = wait(train_labels)\n", + "_ = wait(test_labels)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create dtrain and dtest dask dmatrix\n", + "dtrain = xgb.dask.DaskDMatrix(client, \n", + " train_data,\n", + " train_labels, missing=-1 )\n", + "\n", + "dtest = xgb.dask.DaskDMatrix(client, \n", + " test_data,\n", + " test_labels , missing=-1 )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "#Train the model\n", + "\n", + "trained_model = xgb.dask.train(client,\n", + " {\n", + " 'learning_rate': 0.1,\n", + " 'max_depth': 8,\n", + " 'subsample': 1,\n", + " 'gamma': 0.1,\n", + " 'silent': True,\n", + " 'verbose_eval': True,\n", + " 'tree_method':'gpu_hist',\n", + " 'loss': 'ls',\n", + " 'objective': 'binary:logistic',\n", + " 'max_features': 'auto',\n", + " 'criterion': 'friedman_mse',\n", + " 'grow_policy': 'lossguide',\n", + " },\n", + " dtrain,\n", + " num_boost_round=100, evals=[(dtrain, 'train')])\n", + "\n", + "#Predict the model\n", + "prediction = xgb.dask.predict(client, trained_model['booster'], dtest)\n", + "\n", + "#form and test predictions form xgboost.dask output\n", + "## get pred into an array\n", + "pred = prediction.compute()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Finally we will form our test_labels into an array that we can validate our predictions `pred` against it with an RMSE score. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%time\n", + "## get test_labels into an array form\n", + "ytest= test_labels.compute()\n", + "ytest = ytest.astype(np.float32)\n", + "\n", + "##test prediction wih RMSE\n", + "from cuml.metrics import mean_squared_error\n", + "rmse = np.sqrt(mean_squared_error(ytest, pred))\n", + "\n", + "print(\"RMSE: \", rmse)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Our notebook ends here. You now have a fully trained model and tested it to get your RMSE. You can continue doing any other analytics you'd like. Hooray!" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}