Skip to content

Commit

Permalink
Refactor Categorify (#1692)
Browse files Browse the repository at this point in the history
* use dask.dataframe's aca

* support on_host

* fix split-out (band-aid)

* second attempt to refactor categorify

* use temporary write

* save state - still need to figure out how to 'always' write cats to a directory

* using dask to write/read categories

* use official dispatch

* support multi-partition cats

* avoid manual bcast merge for single partition

* clean up _to_parquet_dask  a bit

* deprecate tree_width

* fix split_out default

* update join_groupby and target_encoding options

* improve split_out=1 performance

* uuse np.arange(... like)

* improve test coverage

* update docs

* use schema=None too avoid need for head()

* arbitrary change

* improve options and test coverage

* use dask to calculate divisions for now

* cast cats_n type to be consistent with cats_1

* catch expected warnings

* formatting

* formatting

* formatting (one more time)

* Update bench/examples/dask-nvtabular-criteo-benchmark.py

* Update nvtabular/ops/categorify.py

* enable multi-columnn vocabs and simplify dask utilities

* minor change to trigger CI

* formatting

* deprecate start_index

* deprecate start_index and na_sentinel

* pushing on categorify redesign

* get tests passing with new categorify standard

* update size stats when split_out == 1

* fix test_categorify_freq_limit

* create _save_embeddings function

* start writing meta.*.parquet file for non-unique statistics

* use meta.*.parquet

* fix embedding size info

* revert stale test changes

* update test_drop_low_cardinality

* fix test_cpu_workflow.py

* fix test_workflow.py

* catch TypeError in dask sort

* fix single_table

* update documentation

* cpp experiment

* missing parentheses

---------

Co-authored-by: Karl Higley <[email protected]>
  • Loading branch information
rjzamora and karlhigley committed May 30, 2023
1 parent 20f7bec commit 6dd49c9
Show file tree
Hide file tree
Showing 11 changed files with 1,023 additions and 598 deletions.
44 changes: 22 additions & 22 deletions bench/examples/MultiGPUBench.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ The script is designed with a parquet-formatted dataset in mind. Although csv fi

### General Notes on Parameter Tuning

The script was originally developed and tested on an NVIDIA DGX-1 machine (8x 32GB V100s, 1TB RAM). Users with limited device and/or host memory may experience memory errors with the default options. Depending on the system, these users may need to modify one or more of the “Algorithm Options” described below. For example, it may be necessary to expand the list of “high-cardinality” columns, increase the tree-width and/or use “disk” for the cat-cache options.
The script was originally developed and tested on an NVIDIA DGX-1 machine (8x 32GB V100s, 1TB RAM). Users with limited device and/or host memory may experience memory errors with the default options. Depending on the system, these users may need to modify one or more of the “Algorithm Options” described below. For example, it may be necessary to expand the list of “high-cardinality” columns, increase the split-out and/or use “disk” for the cat-cache options.

In addition to adjusting the algorithm details, users with limited device memory may also find it useful to adjust the `--device-pool-frac` and/or `--device-limit-frac` options (reduce both fractions).

For all users, the most important benchmark options include the following.

- **Device list**: `-d` (easiest way to set the number of workers)
- **Partition size**: `—part-mem-frac` (bigger partitions = better device efficiency)
- **Intermediate-category storage**: `—cats-on-device` (always use this option when total device memory is sufficiently large)
- **Partition size**: `-part-mem-frac` (bigger partitions = better device efficiency)
- **Intermediate-category storage**: `-cats-on-device` (always use this option when total device memory is sufficiently large)
- **Communication protocol**: `-p` (use ucx whenever possible)
- **Output-file count**: `—out-files-per-proc` (fewer output files is faster)
- **Output-file count**: `-out-files-per-proc` (fewer output files is faster)

See option descriptions below for more information.

Expand Down Expand Up @@ -78,13 +78,13 @@ NVTabular uses the file structure of the output dataset to shuffle data as it is

e.g. `--out-files-per-proc 24`

Note that a large number of output files may be required to perform the “PER_WORKER” shuffle option (see description of the `—shuffle` flag below). This is because each file will be fully shuffled in device memory.
Note that a large number of output files may be required to perform the “PER_WORKER” shuffle option (see description of the `-shuffle` flag below). This is because each file will be fully shuffled in device memory.

##### Shuffling

NVTabular currently offers two options for shuffling output data to disk. The `“PER_PARTITION”` option means that each partition will be independently shuffled after transformation, and then appended to some number of distinct output files. The number of output files is specified by the `--out-files-per-proc` flag (described above), and the files are uniquely mapped to each worker. The `“PER_WORKER”` option follows the same process, but the “files” are initially written to in-host-memory, and then reshuffled and persisted to disk after the full dataset is processed. The user can specify the specific shuffling algorithm to use with the `—shuffle` flag.
NVTabular currently offers two options for shuffling output data to disk. The `“PER_PARTITION”` option means that each partition will be independently shuffled after transformation, and then appended to some number of distinct output files. The number of output files is specified by the `--out-files-per-proc` flag (described above), and the files are uniquely mapped to each worker. The `“PER_WORKER”` option follows the same process, but the “files” are initially written to in-host-memory, and then reshuffled and persisted to disk after the full dataset is processed. The user can specify the specific shuffling algorithm to use with the `-shuffle` flag.

e.g. `—shuffle PER_WORKER`
e.g. `-shuffle PER_WORKER`

Note that the `“PER_WORKER”` option may require a larger-than default output-file count (See description of the `--out-files-per-proc` flag above).

Expand All @@ -97,15 +97,15 @@ By default this script will assume the following categorical and continuous colu
- Categorical: “C1”, ”C2”, … , ”C26”
- Continuous: “I1”, ”I2”, … , ”I13”

The user may specify different column names, or a subset of these names, by passing a column-separated list to the `--cat-names` and/or `—cont_names` flags.
The user may specify different column names, or a subset of these names, by passing a column-separated list to the `--cat-names` and/or `-cont_names` flags.

e.g. `—cat-names C01,C02 --cont_names I01,I02 —high_cards C01`
e.g. `-cat-names C01,C02 --cont_names I01,I02 —high_cards C01`

Note that, if your dataset includes non-default column names, you should also use the `—high-cards` flag (described below), to specify the names of high-cardinality columns.
Note that, if your dataset includes non-default column names, you should also use the `-high-cards` flag (described below), to specify the names of high-cardinality columns.

##### Categorical Encoding

By default, all categorical-column groups will be used for the final encoding transformation. The user can also specify a frequency threshold for groups to be included in the encoding with the `—freq-limit` (or `-f`) flag.
By default, all categorical-column groups will be used for the final encoding transformation. The user can also specify a frequency threshold for groups to be included in the encoding with the `-freq-limit` (or `-f`) flag.

e.g `-f 15` (groups with fewer than 15 instances in the dataset will not be used for encoding)

Expand All @@ -115,23 +115,23 @@ e.g `-f 15` (groups with fewer than 15 instances in the dataset will not be used

As described below, the specific algorithm used for categorical encoding can be column dependent. In this script, we use special options for a subset of “high-cardinality” columns. By default, these columns are "C20,C1,C22,C10”. However, the user can specify different column names with the `--high-cards` flag.

e.g. `—high_cards C01,C10`
e.g. `-high_cards C01,C10`

Note that only the columns specified with this flag (or the default columns) will be targetedby the `--tree-width` and/or `--cat-cache-high` flags (described below).
Note that only the columns specified with this flag (or the default columns) will be targeted by the `--split-out` and/or `--cat-cache-high` flags (described below).

##### Global-Categories Calculation (GroupbyStatistics)

In order encode categorical columns, NVTabular needs to calculate a global list of unique categories for each categorical column. This is accomplished with a global groupby-aggregation-based tree reduction on each column. In order to avoid memory pressure on the device, intermediate groupby data is moved to host memory between tasks in the global-aggregation tree. For users with a sufficient amount of total GPU memory, this device-to-host transfer can be avoided with the by adding the `—cats-on-device` flag to the execution command.
In order encode categorical columns, NVTabular needs to calculate a global list of unique categories for each categorical column. This is accomplished with a global groupby-aggregation-based tree reduction on each column. In order to avoid memory pressure on the device, intermediate groupby data is moved to host memory between tasks in the global-aggregation tree. For users with a sufficient amount of total GPU memory, this device-to-host transfer can be avoided with the by adding the `-cats-on-device` flag to the execution command.

e.g. `—cats-on-device`
e.g. `-cats-on-device`

In addition to controlling device-to-host data movement, the user can also use the `--tree-width` flag to specify the width of the groupby-aggregation tree for high-cardinality columns. Although NVTabular allows the user to specify the tree-width for each column independently, this option will target all columns specified with `high-cards`.
In addition to controlling device-to-host data movement, the user can also use the `--split-out` flag to specify the number of files needed to store unique values for high-cardinality columns. Although NVTabular allows the user to specify the split-out for each column independently, this option will target all columns specified with `--high-cards`.

e.g. `—tree_width 4`
e.g. `-tree_width 4`

##### Categorical Encoding (Categorify)

During the categorical-encoding transformation stage, the column-specific unique values must be read into GPU memory for the operation. Since each NVTabular process will only operate on a single partition at a time, the same unique-value statistics need to be re-read (for every categorical column) for each partition that is transformed. Unsurprisingly, the performance of categorical encoding can be dramatically improved by caching the unique values on each worker between transformation operations.
The user can specify caching location for low- and high-cardinality columns separately. Recall that high-cardinality columns can be specified with `—-high_cards` (and all remaining categorical columns will be treated as low-cardinality”). The user can specify the caching location of low-cardinality columns with the `--cat-cache-low` flag, and high-cardinality columns with the `--cat-cache-low` flag. For both cases, the options are “device”, “host”, or “disk”.

The user can specify caching location for low- and high-cardinality columns separately. Recall that high-cardinality columns can be specified with `—high_cards` (and all remaining categorical columns will be treated as low-cardinality”). The user can specify the caching location of low-cardinality columns with the `--cat-cache-low` flag, and high-cardinality columns with the `--cat-cache-low` flag. For both cases, the options are “device”, “host”, or “disk”.

Expand All @@ -141,12 +141,12 @@ e.g. `--cat-cache-low device --cat-cache-high host`

##### Dashboard

A wonderful advantage of the Dask-Distributed ecosystem is the convenient set of diagnostics utilities. By default (if Bokeh is installed on your system), the distributed scheduler will host a diagnostics dashboard at  `http://localhost:8787/status` (where localhost may need to be changed to the the IP address where the scheduler is running). If port 8787 is already in use, a different (random) port will be used. However, the user can specify a specific port using the `—dashboard-port` flag.
A wonderful advantage of the Dask-Distributed ecosystem is the convenient set of diagnostics utilities. By default (if Bokeh is installed on your system), the distributed scheduler will host a diagnostics dashboard at  `http://localhost:8787/status` (where localhost may need to be changed to the the IP address where the scheduler is running). If port 8787 is already in use, a different (random) port will be used. However, the user can specify a specific port using the `-dashboard-port` flag.

e.g. `—dashboard-port 3787`
e.g. `-dashboard-port 3787`

##### Profile

In addition to hosting a diagnostics dashboard, the distributed cluster can also collect and export profiling data on all scheduler and worker processes. To export an interactive profile report, the user can specify a file path with the `—profile` flag. If this flag is not used, no profile will be collected/exported.
In addition to hosting a diagnostics dashboard, the distributed cluster can also collect and export profiling data on all scheduler and worker processes. To export an interactive profile report, the user can specify a file path with the `-profile` flag. If this flag is not used, no profile will be collected/exported.

e.g. `—profile my-profile.html`
e.g. `-profile my-profile.html`
18 changes: 9 additions & 9 deletions bench/examples/dask-nvtabular-criteo-benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,14 @@ def main(args):
label_name = ["label"]

# Specify Categorify/GroupbyStatistics options
tree_width = {}
split_out = {}
cat_cache = {}
for col in cat_names:
if col in high_card_columns:
tree_width[col] = args.tree_width
split_out[col] = args.split_out
cat_cache[col] = args.cat_cache_high
else:
tree_width[col] = 1
split_out[col] = 1
cat_cache[col] = args.cat_cache_low

# Use total device size to calculate args.device_limit_frac
Expand Down Expand Up @@ -205,7 +205,7 @@ def main(args):

cat_features = cat_names >> ops.Categorify(
out_path=stats_path,
tree_width=tree_width,
split_out=split_out,
cat_cache=cat_cache,
freq_threshold=freq_limit,
search_sorted=not freq_limit,
Expand Down Expand Up @@ -360,16 +360,16 @@ def parse_args():
"--high-cards",
default="C20,C1,C22,C10",
type=str,
help="Specify a list of high-cardinality columns. The tree-width "
help="Specify a list of high-cardinality columns. The split-out "
"and cat-cache options will apply to these columns only."
'(Default "C20,C1,C22,C10")',
)
parser.add_argument(
"--tree-width",
default=8,
"--split-out",
default=1,
type=int,
help="Tree width for GroupbyStatistics operations on high-cardinality "
"columns (Default 8)",
help="Number of files needed to store unique values for high-cardinality "
"columns (Default 1)",
)
parser.add_argument(
"--cat-cache-high",
Expand Down
26 changes: 20 additions & 6 deletions cpp/nvtabular/inference/categorify.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ namespace nvtabular

if ((dtype.kind() == 'O') || (dtype.kind() == 'U'))
{
int64_t i = 0;
int64_t i = UNIQUE_OFFSET;
for (auto &value : values)
{
if (!py::cast<bool>(isnull(value)))
Expand Down Expand Up @@ -138,20 +138,29 @@ namespace nvtabular
size_t size = values.size();
for (size_t i = 0; i < size; ++i)
{
mapping_int[static_cast<int64_t>(data[i])] = i;
mapping_int[static_cast<int64_t>(data[i])] = i + UNIQUE_OFFSET;
}
}

template <typename T>
py::array transform_int(py::array_t<T> input) const
{
py::object pandas = py::module_::import("pandas");
py::object isnull = pandas.attr("isnull");
py::array_t<int64_t> output(input.size());
const T *input_data = input.data();
int64_t *output_data = output.mutable_data();
for (int64_t i = 0; i < input.size(); ++i)
{
auto it = mapping_int.find(static_cast<int64_t>(input_data[i]));
output_data[i] = it == mapping_int.end() ? 0 : it->second;
if (it == mapping_int.end())
{
output_data[i] = py::cast<bool>(isnull(input_data[i])) ? NULL_INDEX : OOV_INDEX;
}
else
{
output_data[i] = it->second;
}
}
return output;
}
Expand All @@ -169,18 +178,18 @@ namespace nvtabular
{
if (value.is_none())
{
data[i] = 0;
data[i] = NULL_INDEX;
}
else if (PyUnicode_Check(value.ptr()) || PyBytes_Check(value.ptr()))
{
std::string key = py::cast<std::string>(value);
auto it = mapping_str.find(key);
data[i] = it == mapping_str.end() ? 0 : it->second;
data[i] = it == mapping_str.end() ? OOV_INDEX : it->second;
}
else if (PyBool_Check(value.ptr()))
{
auto it = mapping_int.find(value.ptr() == Py_True);
data[i] = it == mapping_int.end() ? 0 : it->second;
data[i] = it == mapping_int.end() ? OOV_INDEX : it->second;
}
else
{
Expand Down Expand Up @@ -247,6 +256,11 @@ namespace nvtabular

std::unordered_map<std::string, int64_t> mapping_str;
std::unordered_map<int64_t, int64_t> mapping_int;

// TODO: Handle multiple OOV buckets?
const int64_t NULL_INDEX = 1;
const int64_t OOV_INDEX = 2;
const int64_t UNIQUE_OFFSET = 3;
};

// Reads in a parquet category mapping file in cpu memory using pandas
Expand Down
Loading

0 comments on commit 6dd49c9

Please sign in to comment.