Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Categorify #1692

Merged
merged 72 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
621b4dd
use dask.dataframe's aca
rjzamora Oct 3, 2022
6e42a22
Merge remote-tracking branch 'upstream/main' into refactor-categorify
rjzamora Oct 3, 2022
c65b736
support on_host
rjzamora Oct 3, 2022
977363b
fix split-out (band-aid)
rjzamora Oct 5, 2022
a012ea1
second attempt to refactor categorify
rjzamora Oct 5, 2022
67c7d5d
use temporary write
rjzamora Oct 5, 2022
4ba2f3f
save state - still need to figure out how to 'always' write cats to a…
rjzamora Oct 7, 2022
60b00f2
using dask to write/read categories
rjzamora Oct 7, 2022
dc22a71
use official dispatch
rjzamora Oct 7, 2022
3a8ec12
support multi-partition cats
rjzamora Oct 7, 2022
8344aa3
avoid manual bcast merge for single partition
rjzamora Oct 7, 2022
d49630b
clean up _to_parquet_dask a bit
rjzamora Oct 10, 2022
8a4e395
deprecate tree_width
rjzamora Oct 11, 2022
93557fc
fix split_out default
rjzamora Oct 11, 2022
1e8241a
update join_groupby and target_encoding options
rjzamora Oct 11, 2022
011909d
improve split_out=1 performance
rjzamora Oct 11, 2022
a79a37e
uuse np.arange(... like)
rjzamora Oct 11, 2022
79886ef
Merge branch 'main' into refactor-categorify-2
rjzamora Oct 11, 2022
660734c
Merge remote-tracking branch 'upstream/main' into refactor-categorify-2
rjzamora Oct 21, 2022
89adf74
Merge branch 'refactor-categorify-2' of https://github.com/rjzamora/N…
rjzamora Oct 21, 2022
d845589
improve test coverage
rjzamora Oct 27, 2022
d6ef34a
update docs
rjzamora Oct 27, 2022
48c271c
use schema=None too avoid need for head()
rjzamora Oct 27, 2022
6927d04
arbitrary change
rjzamora Oct 27, 2022
1b7a041
improve options and test coverage
rjzamora Oct 28, 2022
062b6e3
use dask to calculate divisions for now
rjzamora Oct 31, 2022
418d18b
cast cats_n type to be consistent with cats_1
rjzamora Oct 31, 2022
a2c3df1
catch expected warnings
rjzamora Oct 31, 2022
cc3801d
formatting
rjzamora Oct 31, 2022
5a766d6
formatting
rjzamora Oct 31, 2022
c9d6751
formatting (one more time)
rjzamora Oct 31, 2022
fc98ca9
Update bench/examples/dask-nvtabular-criteo-benchmark.py
rjzamora Nov 1, 2022
69daa93
Update nvtabular/ops/categorify.py
rjzamora Nov 1, 2022
4b2fc20
Merge branch 'main' into refactor-categorify-2
rjzamora Nov 2, 2022
d716d98
enable multi-columnn vocabs and simplify dask utilities
rjzamora Nov 2, 2022
cb9e936
Merge remote-tracking branch 'upstream/main' into refactor-categorify-2
rjzamora Nov 2, 2022
3418cec
Merge branch 'main' into refactor-categorify-2
rjzamora Nov 8, 2022
03a7550
minor change to trigger CI
rjzamora Nov 10, 2022
f29e2f6
Merge branch 'main' into refactor-categorify-2
rjzamora Nov 18, 2022
a4bc29a
Merge branch 'main' into refactor-categorify-2
karlhigley Nov 19, 2022
0606096
Merge branch 'main' into refactor-categorify-2
rjzamora Nov 28, 2022
8aa77f2
Merge branch 'main' into refactor-categorify-2
rjzamora Dec 2, 2022
9784ccb
Merge remote-tracking branch 'upstream/main' into refactor-categorify-2
rjzamora Feb 24, 2023
31dc74b
Merge branch 'refactor-categorify-2' of https://github.com/rjzamora/N…
rjzamora Feb 24, 2023
160ab46
formatting
rjzamora Feb 24, 2023
0dacf2b
deprecate start_index
rjzamora Feb 24, 2023
7f3734c
deprecate start_index and na_sentinel
rjzamora Feb 24, 2023
0c7a41e
pushing on categorify redesign
rjzamora Feb 27, 2023
bd74e75
get tests passing with new categorify standard
rjzamora Mar 1, 2023
0df8095
update size stats when split_out == 1
rjzamora Mar 1, 2023
6dd560e
fix test_categorify_freq_limit
rjzamora Mar 1, 2023
a43f584
create _save_embeddings function
rjzamora Mar 1, 2023
e066304
start writing meta.*.parquet file for non-unique statistics
rjzamora Mar 2, 2023
0cdc47b
use meta.*.parquet
rjzamora Mar 2, 2023
09399ec
fix embedding size info
rjzamora Mar 2, 2023
288f255
Merge remote-tracking branch 'upstream/main' into refactor-categorify-2
rjzamora Mar 2, 2023
2616e0d
revert stale test changes
rjzamora Mar 2, 2023
113a0c7
update test_drop_low_cardinality
rjzamora Mar 7, 2023
e4cacca
fix test_cpu_workflow.py
rjzamora Mar 7, 2023
7ddf268
fix test_workflow.py
rjzamora Mar 7, 2023
4bef601
catch TypeError in dask sort
rjzamora Mar 7, 2023
bedd58e
fix single_table
rjzamora Mar 7, 2023
9378ca0
Merge branch 'main' into refactor-categorify-2
rjzamora Mar 21, 2023
e67e055
Merge branch 'main' into refactor-categorify-2
rjzamora Mar 23, 2023
59cbf87
update documentation
rjzamora Mar 23, 2023
2779392
Merge branch 'refactor-categorify-2' of https://github.com/rjzamora/N…
rjzamora Mar 23, 2023
5834f8c
Merge branch 'main' into refactor-categorify-2
rjzamora Mar 27, 2023
e98b163
Merge branch 'main' into refactor-categorify-2
rjzamora Apr 10, 2023
983bb7c
Merge remote-tracking branch 'upstream/main' into refactor-categorify-2
rjzamora May 9, 2023
aa98b37
Merge remote-tracking branch 'upstream/main' into refactor-categorify-2
rjzamora May 30, 2023
2df007c
cpp experiment
rjzamora May 30, 2023
e3e31f9
missing parentheses
rjzamora May 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions bench/examples/MultiGPUBench.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ The script is designed with a parquet-formatted dataset in mind. Although csv fi

### General Notes on Parameter Tuning

The script was originally developed and tested on an NVIDIA DGX-1 machine (8x 32GB V100s, 1TB RAM). Users with limited device and/or host memory may experience memory errors with the default options. Depending on the system, these users may need to modify one or more of the “Algorithm Options” described below. For example, it may be necessary to expand the list of “high-cardinality” columns, increase the tree-width and/or use “disk” for the cat-cache options.
The script was originally developed and tested on an NVIDIA DGX-1 machine (8x 32GB V100s, 1TB RAM). Users with limited device and/or host memory may experience memory errors with the default options. Depending on the system, these users may need to modify one or more of the “Algorithm Options” described below. For example, it may be necessary to expand the list of “high-cardinality” columns, increase the split-out and/or use “disk” for the cat-cache options.

In addition to adjusting the algorithm details, users with limited device memory may also find it useful to adjust the `--device-pool-frac` and/or `--device-limit-frac` options (reduce both fractions).

For all users, the most important benchmark options include the following.

- **Device list**: `-d` (easiest way to set the number of workers)
- **Partition size**: `—part-mem-frac` (bigger partitions = better device efficiency)
- **Intermediate-category storage**: `—cats-on-device` (always use this option when total device memory is sufficiently large)
- **Partition size**: `-part-mem-frac` (bigger partitions = better device efficiency)
- **Intermediate-category storage**: `-cats-on-device` (always use this option when total device memory is sufficiently large)
- **Communication protocol**: `-p` (use ucx whenever possible)
- **Output-file count**: `—out-files-per-proc` (fewer output files is faster)
- **Output-file count**: `-out-files-per-proc` (fewer output files is faster)

See option descriptions below for more information.

Expand Down Expand Up @@ -78,13 +78,13 @@ NVTabular uses the file structure of the output dataset to shuffle data as it is

e.g. `--out-files-per-proc 24`

Note that a large number of output files may be required to perform the “PER_WORKER” shuffle option (see description of the `—shuffle` flag below). This is because each file will be fully shuffled in device memory.
Note that a large number of output files may be required to perform the “PER_WORKER” shuffle option (see description of the `-shuffle` flag below). This is because each file will be fully shuffled in device memory.

##### Shuffling

NVTabular currently offers two options for shuffling output data to disk. The `“PER_PARTITION”` option means that each partition will be independently shuffled after transformation, and then appended to some number of distinct output files. The number of output files is specified by the `--out-files-per-proc` flag (described above), and the files are uniquely mapped to each worker. The `“PER_WORKER”` option follows the same process, but the “files” are initially written to in-host-memory, and then reshuffled and persisted to disk after the full dataset is processed. The user can specify the specific shuffling algorithm to use with the `—shuffle` flag.
NVTabular currently offers two options for shuffling output data to disk. The `“PER_PARTITION”` option means that each partition will be independently shuffled after transformation, and then appended to some number of distinct output files. The number of output files is specified by the `--out-files-per-proc` flag (described above), and the files are uniquely mapped to each worker. The `“PER_WORKER”` option follows the same process, but the “files” are initially written to in-host-memory, and then reshuffled and persisted to disk after the full dataset is processed. The user can specify the specific shuffling algorithm to use with the `-shuffle` flag.

e.g. `—shuffle PER_WORKER`
e.g. `-shuffle PER_WORKER`

Note that the `“PER_WORKER”` option may require a larger-than default output-file count (See description of the `--out-files-per-proc` flag above).

Expand All @@ -97,15 +97,15 @@ By default this script will assume the following categorical and continuous colu
- Categorical: “C1”, ”C2”, … , ”C26”
- Continuous: “I1”, ”I2”, … , ”I13”

The user may specify different column names, or a subset of these names, by passing a column-separated list to the `--cat-names` and/or `—cont_names` flags.
The user may specify different column names, or a subset of these names, by passing a column-separated list to the `--cat-names` and/or `-cont_names` flags.

e.g. `—cat-names C01,C02 --cont_names I01,I02 —high_cards C01`
e.g. `-cat-names C01,C02 --cont_names I01,I02 —high_cards C01`

Note that, if your dataset includes non-default column names, you should also use the `—high-cards` flag (described below), to specify the names of high-cardinality columns.
Note that, if your dataset includes non-default column names, you should also use the `-high-cards` flag (described below), to specify the names of high-cardinality columns.

##### Categorical Encoding

By default, all categorical-column groups will be used for the final encoding transformation. The user can also specify a frequency threshold for groups to be included in the encoding with the `—freq-limit` (or `-f`) flag.
By default, all categorical-column groups will be used for the final encoding transformation. The user can also specify a frequency threshold for groups to be included in the encoding with the `-freq-limit` (or `-f`) flag.

e.g `-f 15` (groups with fewer than 15 instances in the dataset will not be used for encoding)

Expand All @@ -115,23 +115,23 @@ e.g `-f 15` (groups with fewer than 15 instances in the dataset will not be used

As described below, the specific algorithm used for categorical encoding can be column dependent. In this script, we use special options for a subset of “high-cardinality” columns. By default, these columns are "C20,C1,C22,C10”. However, the user can specify different column names with the `--high-cards` flag.

e.g. `—high_cards C01,C10`
e.g. `-high_cards C01,C10`

Note that only the columns specified with this flag (or the default columns) will be targetedby the `--tree-width` and/or `--cat-cache-high` flags (described below).
Note that only the columns specified with this flag (or the default columns) will be targeted by the `--split-out` and/or `--cat-cache-high` flags (described below).

##### Global-Categories Calculation (GroupbyStatistics)

In order encode categorical columns, NVTabular needs to calculate a global list of unique categories for each categorical column. This is accomplished with a global groupby-aggregation-based tree reduction on each column. In order to avoid memory pressure on the device, intermediate groupby data is moved to host memory between tasks in the global-aggregation tree. For users with a sufficient amount of total GPU memory, this device-to-host transfer can be avoided with the by adding the `—cats-on-device` flag to the execution command.
In order encode categorical columns, NVTabular needs to calculate a global list of unique categories for each categorical column. This is accomplished with a global groupby-aggregation-based tree reduction on each column. In order to avoid memory pressure on the device, intermediate groupby data is moved to host memory between tasks in the global-aggregation tree. For users with a sufficient amount of total GPU memory, this device-to-host transfer can be avoided with the by adding the `-cats-on-device` flag to the execution command.

e.g. `—cats-on-device`
e.g. `-cats-on-device`

In addition to controlling device-to-host data movement, the user can also use the `--tree-width` flag to specify the width of the groupby-aggregation tree for high-cardinality columns. Although NVTabular allows the user to specify the tree-width for each column independently, this option will target all columns specified with `high-cards`.
In addition to controlling device-to-host data movement, the user can also use the `--split-out` flag to specify the number of files needed to store unique values for high-cardinality columns. Although NVTabular allows the user to specify the split-out for each column independently, this option will target all columns specified with `--high-cards`.

e.g. `—tree_width 4`
e.g. `-tree_width 4`

##### Categorical Encoding (Categorify)

During the categorical-encoding transformation stage, the column-specific unique values must be read into GPU memory for the operation. Since each NVTabular process will only operate on a single partition at a time, the same unique-value statistics need to be re-read (for every categorical column) for each partition that is transformed. Unsurprisingly, the performance of categorical encoding can be dramatically improved by caching the unique values on each worker between transformation operations.
The user can specify caching location for low- and high-cardinality columns separately. Recall that high-cardinality columns can be specified with `—-high_cards` (and all remaining categorical columns will be treated as low-cardinality”). The user can specify the caching location of low-cardinality columns with the `--cat-cache-low` flag, and high-cardinality columns with the `--cat-cache-low` flag. For both cases, the options are “device”, “host”, or “disk”.

The user can specify caching location for low- and high-cardinality columns separately. Recall that high-cardinality columns can be specified with `—high_cards` (and all remaining categorical columns will be treated as low-cardinality”). The user can specify the caching location of low-cardinality columns with the `--cat-cache-low` flag, and high-cardinality columns with the `--cat-cache-low` flag. For both cases, the options are “device”, “host”, or “disk”.

Expand All @@ -141,12 +141,12 @@ e.g. `--cat-cache-low device --cat-cache-high host`

##### Dashboard

A wonderful advantage of the Dask-Distributed ecosystem is the convenient set of diagnostics utilities. By default (if Bokeh is installed on your system), the distributed scheduler will host a diagnostics dashboard at  `http://localhost:8787/status` (where localhost may need to be changed to the the IP address where the scheduler is running). If port 8787 is already in use, a different (random) port will be used. However, the user can specify a specific port using the `—dashboard-port` flag.
A wonderful advantage of the Dask-Distributed ecosystem is the convenient set of diagnostics utilities. By default (if Bokeh is installed on your system), the distributed scheduler will host a diagnostics dashboard at  `http://localhost:8787/status` (where localhost may need to be changed to the the IP address where the scheduler is running). If port 8787 is already in use, a different (random) port will be used. However, the user can specify a specific port using the `-dashboard-port` flag.

e.g. `—dashboard-port 3787`
e.g. `-dashboard-port 3787`

##### Profile

In addition to hosting a diagnostics dashboard, the distributed cluster can also collect and export profiling data on all scheduler and worker processes. To export an interactive profile report, the user can specify a file path with the `—profile` flag. If this flag is not used, no profile will be collected/exported.
In addition to hosting a diagnostics dashboard, the distributed cluster can also collect and export profiling data on all scheduler and worker processes. To export an interactive profile report, the user can specify a file path with the `-profile` flag. If this flag is not used, no profile will be collected/exported.

e.g. `—profile my-profile.html`
e.g. `-profile my-profile.html`
18 changes: 9 additions & 9 deletions bench/examples/dask-nvtabular-criteo-benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,14 @@ def main(args):
label_name = ["label"]

# Specify Categorify/GroupbyStatistics options
tree_width = {}
split_out = {}
cat_cache = {}
for col in cat_names:
if col in high_card_columns:
tree_width[col] = args.tree_width
split_out[col] = args.split_out
cat_cache[col] = args.cat_cache_high
else:
tree_width[col] = 1
split_out[col] = 1
cat_cache[col] = args.cat_cache_low

# Use total device size to calculate args.device_limit_frac
Expand Down Expand Up @@ -205,7 +205,7 @@ def main(args):

cat_features = cat_names >> ops.Categorify(
out_path=stats_path,
tree_width=tree_width,
split_out=split_out,
cat_cache=cat_cache,
freq_threshold=freq_limit,
search_sorted=not freq_limit,
Expand Down Expand Up @@ -360,16 +360,16 @@ def parse_args():
"--high-cards",
default="C20,C1,C22,C10",
type=str,
help="Specify a list of high-cardinality columns. The tree-width "
help="Specify a list of high-cardinality columns. The split-out "
"and cat-cache options will apply to these columns only."
'(Default "C20,C1,C22,C10")',
)
parser.add_argument(
"--tree-width",
default=8,
"--split-out",
default=1,
type=int,
help="Tree width for GroupbyStatistics operations on high-cardinality "
"columns (Default 8)",
help="Number of files needed to store unique values for high-cardinality "
"columns (Default 1)",
)
parser.add_argument(
"--cat-cache-high",
Expand Down
26 changes: 20 additions & 6 deletions cpp/nvtabular/inference/categorify.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ namespace nvtabular

if ((dtype.kind() == 'O') || (dtype.kind() == 'U'))
{
int64_t i = 0;
int64_t i = UNIQUE_OFFSET;
for (auto &value : values)
{
if (!py::cast<bool>(isnull(value)))
Expand Down Expand Up @@ -138,20 +138,29 @@ namespace nvtabular
size_t size = values.size();
for (size_t i = 0; i < size; ++i)
{
mapping_int[static_cast<int64_t>(data[i])] = i;
mapping_int[static_cast<int64_t>(data[i])] = i + UNIQUE_OFFSET;
}
}

template <typename T>
py::array transform_int(py::array_t<T> input) const
{
py::object pandas = py::module_::import("pandas");
py::object isnull = pandas.attr("isnull");
py::array_t<int64_t> output(input.size());
const T *input_data = input.data();
int64_t *output_data = output.mutable_data();
for (int64_t i = 0; i < input.size(); ++i)
{
auto it = mapping_int.find(static_cast<int64_t>(input_data[i]));
output_data[i] = it == mapping_int.end() ? 0 : it->second;
if (it == mapping_int.end())
{
output_data[i] = py::cast<bool>(isnull(input_data[i])) ? NULL_INDEX : OOV_INDEX;
}
else
{
output_data[i] = it->second;
}
}
return output;
}
Expand All @@ -169,18 +178,18 @@ namespace nvtabular
{
if (value.is_none())
{
data[i] = 0;
data[i] = NULL_INDEX;
}
else if (PyUnicode_Check(value.ptr()) || PyBytes_Check(value.ptr()))
{
std::string key = py::cast<std::string>(value);
auto it = mapping_str.find(key);
data[i] = it == mapping_str.end() ? 0 : it->second;
data[i] = it == mapping_str.end() ? OOV_INDEX : it->second;
}
else if (PyBool_Check(value.ptr()))
{
auto it = mapping_int.find(value.ptr() == Py_True);
data[i] = it == mapping_int.end() ? 0 : it->second;
data[i] = it == mapping_int.end() ? OOV_INDEX : it->second;
}
else
{
Expand Down Expand Up @@ -247,6 +256,11 @@ namespace nvtabular

std::unordered_map<std::string, int64_t> mapping_str;
std::unordered_map<int64_t, int64_t> mapping_int;

// TODO: Handle multiple OOV buckets?
const int64_t NULL_INDEX = 1;
const int64_t OOV_INDEX = 2;
const int64_t UNIQUE_OFFSET = 3;
};

// Reads in a parquet category mapping file in cpu memory using pandas
Expand Down
Loading