diff --git a/bench/examples/MultiGPUBench.md b/bench/examples/MultiGPUBench.md index 8b31756c36..71fa41f7f3 100644 --- a/bench/examples/MultiGPUBench.md +++ b/bench/examples/MultiGPUBench.md @@ -16,17 +16,17 @@ The script is designed with a parquet-formatted dataset in mind. Although csv fi ### General Notes on Parameter Tuning -The script was originally developed and tested on an NVIDIA DGX-1 machine (8x 32GB V100s, 1TB RAM). Users with limited device and/or host memory may experience memory errors with the default options. Depending on the system, these users may need to modify one or more of the “Algorithm Options” described below. For example, it may be necessary to expand the list of “high-cardinality” columns, increase the tree-width and/or use “disk” for the cat-cache options. +The script was originally developed and tested on an NVIDIA DGX-1 machine (8x 32GB V100s, 1TB RAM). Users with limited device and/or host memory may experience memory errors with the default options. Depending on the system, these users may need to modify one or more of the “Algorithm Options” described below. For example, it may be necessary to expand the list of “high-cardinality” columns, increase the split-out and/or use “disk” for the cat-cache options. In addition to adjusting the algorithm details, users with limited device memory may also find it useful to adjust the `--device-pool-frac` and/or `--device-limit-frac` options (reduce both fractions). For all users, the most important benchmark options include the following. - **Device list**: `-d` (easiest way to set the number of workers) -- **Partition size**: `—part-mem-frac` (bigger partitions = better device efficiency) -- **Intermediate-category storage**: `—cats-on-device` (always use this option when total device memory is sufficiently large) +- **Partition size**: `—-part-mem-frac` (bigger partitions = better device efficiency) +- **Intermediate-category storage**: `—-cats-on-device` (always use this option when total device memory is sufficiently large) - **Communication protocol**: `-p` (use ucx whenever possible) -- **Output-file count**: `—out-files-per-proc` (fewer output files is faster) +- **Output-file count**: `—-out-files-per-proc` (fewer output files is faster) See option descriptions below for more information. @@ -78,13 +78,13 @@ NVTabular uses the file structure of the output dataset to shuffle data as it is e.g. `--out-files-per-proc 24` -Note that a large number of output files may be required to perform the “PER_WORKER” shuffle option (see description of the `—shuffle` flag below). This is because each file will be fully shuffled in device memory. +Note that a large number of output files may be required to perform the “PER_WORKER” shuffle option (see description of the `—-shuffle` flag below). This is because each file will be fully shuffled in device memory. ##### Shuffling -NVTabular currently offers two options for shuffling output data to disk. The `“PER_PARTITION”` option means that each partition will be independently shuffled after transformation, and then appended to some number of distinct output files. The number of output files is specified by the `--out-files-per-proc` flag (described above), and the files are uniquely mapped to each worker. The `“PER_WORKER”` option follows the same process, but the “files” are initially written to in-host-memory, and then reshuffled and persisted to disk after the full dataset is processed. The user can specify the specific shuffling algorithm to use with the `—shuffle` flag. +NVTabular currently offers two options for shuffling output data to disk. The `“PER_PARTITION”` option means that each partition will be independently shuffled after transformation, and then appended to some number of distinct output files. The number of output files is specified by the `--out-files-per-proc` flag (described above), and the files are uniquely mapped to each worker. The `“PER_WORKER”` option follows the same process, but the “files” are initially written to in-host-memory, and then reshuffled and persisted to disk after the full dataset is processed. The user can specify the specific shuffling algorithm to use with the `—-shuffle` flag. -e.g. `—shuffle PER_WORKER` +e.g. `—-shuffle PER_WORKER` Note that the `“PER_WORKER”` option may require a larger-than default output-file count (See description of the `--out-files-per-proc` flag above). @@ -97,15 +97,15 @@ By default this script will assume the following categorical and continuous colu - Categorical: “C1”, ”C2”, … , ”C26” - Continuous: “I1”, ”I2”, … , ”I13” -The user may specify different column names, or a subset of these names, by passing a column-separated list to the `--cat-names` and/or `—cont_names` flags. +The user may specify different column names, or a subset of these names, by passing a column-separated list to the `--cat-names` and/or `—-cont_names` flags. -e.g. `—cat-names C01,C02 --cont_names I01,I02 —high_cards C01` +e.g. `—-cat-names C01,C02 --cont_names I01,I02 —high_cards C01` -Note that, if your dataset includes non-default column names, you should also use the `—high-cards` flag (described below), to specify the names of high-cardinality columns. +Note that, if your dataset includes non-default column names, you should also use the `—-high-cards` flag (described below), to specify the names of high-cardinality columns. ##### Categorical Encoding -By default, all categorical-column groups will be used for the final encoding transformation. The user can also specify a frequency threshold for groups to be included in the encoding with the `—freq-limit` (or `-f`) flag. +By default, all categorical-column groups will be used for the final encoding transformation. The user can also specify a frequency threshold for groups to be included in the encoding with the `—-freq-limit` (or `-f`) flag. e.g `-f 15` (groups with fewer than 15 instances in the dataset will not be used for encoding) @@ -115,23 +115,23 @@ e.g `-f 15` (groups with fewer than 15 instances in the dataset will not be used As described below, the specific algorithm used for categorical encoding can be column dependent. In this script, we use special options for a subset of “high-cardinality” columns. By default, these columns are "C20,C1,C22,C10”. However, the user can specify different column names with the `--high-cards` flag. -e.g. `—high_cards C01,C10` +e.g. `—-high_cards C01,C10` -Note that only the columns specified with this flag (or the default columns) will be targetedby the `--tree-width` and/or `--cat-cache-high` flags (described below). +Note that only the columns specified with this flag (or the default columns) will be targeted by the `--split-out` and/or `--cat-cache-high` flags (described below). ##### Global-Categories Calculation (GroupbyStatistics) -In order encode categorical columns, NVTabular needs to calculate a global list of unique categories for each categorical column. This is accomplished with a global groupby-aggregation-based tree reduction on each column. In order to avoid memory pressure on the device, intermediate groupby data is moved to host memory between tasks in the global-aggregation tree. For users with a sufficient amount of total GPU memory, this device-to-host transfer can be avoided with the by adding the `—cats-on-device` flag to the execution command. +In order encode categorical columns, NVTabular needs to calculate a global list of unique categories for each categorical column. This is accomplished with a global groupby-aggregation-based tree reduction on each column. In order to avoid memory pressure on the device, intermediate groupby data is moved to host memory between tasks in the global-aggregation tree. For users with a sufficient amount of total GPU memory, this device-to-host transfer can be avoided with the by adding the `—-cats-on-device` flag to the execution command. -e.g. `—cats-on-device` +e.g. `—-cats-on-device` -In addition to controlling device-to-host data movement, the user can also use the `--tree-width` flag to specify the width of the groupby-aggregation tree for high-cardinality columns. Although NVTabular allows the user to specify the tree-width for each column independently, this option will target all columns specified with `—high-cards`. +In addition to controlling device-to-host data movement, the user can also use the `--split-out` flag to specify the number of files needed to store unique values for high-cardinality columns. Although NVTabular allows the user to specify the split-out for each column independently, this option will target all columns specified with `--high-cards`. -e.g. `—tree_width 4` +e.g. `—-tree_width 4` ##### Categorical Encoding (Categorify) -During the categorical-encoding transformation stage, the column-specific unique values must be read into GPU memory for the operation. Since each NVTabular process will only operate on a single partition at a time, the same unique-value statistics need to be re-read (for every categorical column) for each partition that is transformed. Unsurprisingly, the performance of categorical encoding can be dramatically improved by caching the unique values on each worker between transformation operations. +The user can specify caching location for low- and high-cardinality columns separately. Recall that high-cardinality columns can be specified with `—-high_cards` (and all remaining categorical columns will be treated as low-cardinality”). The user can specify the caching location of low-cardinality columns with the `--cat-cache-low` flag, and high-cardinality columns with the `--cat-cache-low` flag. For both cases, the options are “device”, “host”, or “disk”. The user can specify caching location for low- and high-cardinality columns separately. Recall that high-cardinality columns can be specified with `—high_cards` (and all remaining categorical columns will be treated as low-cardinality”). The user can specify the caching location of low-cardinality columns with the `--cat-cache-low` flag, and high-cardinality columns with the `--cat-cache-low` flag. For both cases, the options are “device”, “host”, or “disk”. @@ -141,12 +141,12 @@ e.g. `--cat-cache-low device --cat-cache-high host` ##### Dashboard -A wonderful advantage of the Dask-Distributed ecosystem is the convenient set of diagnostics utilities. By default (if Bokeh is installed on your system), the distributed scheduler will host a diagnostics dashboard at  `http://localhost:8787/status` (where localhost may need to be changed to the the IP address where the scheduler is running). If port 8787 is already in use, a different (random) port will be used. However, the user can specify a specific port using the `—dashboard-port` flag. +A wonderful advantage of the Dask-Distributed ecosystem is the convenient set of diagnostics utilities. By default (if Bokeh is installed on your system), the distributed scheduler will host a diagnostics dashboard at  `http://localhost:8787/status` (where localhost may need to be changed to the the IP address where the scheduler is running). If port 8787 is already in use, a different (random) port will be used. However, the user can specify a specific port using the `—-dashboard-port` flag. -e.g. `—dashboard-port 3787` +e.g. `—-dashboard-port 3787` ##### Profile -In addition to hosting a diagnostics dashboard, the distributed cluster can also collect and export profiling data on all scheduler and worker processes. To export an interactive profile report, the user can specify a file path with the `—profile` flag. If this flag is not used, no profile will be collected/exported. +In addition to hosting a diagnostics dashboard, the distributed cluster can also collect and export profiling data on all scheduler and worker processes. To export an interactive profile report, the user can specify a file path with the `—-profile` flag. If this flag is not used, no profile will be collected/exported. -e.g. `—profile my-profile.html` +e.g. `—-profile my-profile.html` diff --git a/bench/examples/dask-nvtabular-criteo-benchmark.py b/bench/examples/dask-nvtabular-criteo-benchmark.py index c2d5721a47..a08c20342d 100644 --- a/bench/examples/dask-nvtabular-criteo-benchmark.py +++ b/bench/examples/dask-nvtabular-criteo-benchmark.py @@ -141,14 +141,14 @@ def main(args): label_name = ["label"] # Specify Categorify/GroupbyStatistics options - tree_width = {} + split_out = {} cat_cache = {} for col in cat_names: if col in high_card_columns: - tree_width[col] = args.tree_width + split_out[col] = args.split_out cat_cache[col] = args.cat_cache_high else: - tree_width[col] = 1 + split_out[col] = 1 cat_cache[col] = args.cat_cache_low # Use total device size to calculate args.device_limit_frac @@ -205,7 +205,7 @@ def main(args): cat_features = cat_names >> ops.Categorify( out_path=stats_path, - tree_width=tree_width, + split_out=split_out, cat_cache=cat_cache, freq_threshold=freq_limit, search_sorted=not freq_limit, @@ -360,16 +360,16 @@ def parse_args(): "--high-cards", default="C20,C1,C22,C10", type=str, - help="Specify a list of high-cardinality columns. The tree-width " + help="Specify a list of high-cardinality columns. The split-out " "and cat-cache options will apply to these columns only." '(Default "C20,C1,C22,C10")', ) parser.add_argument( - "--tree-width", - default=8, + "--split-out", + default=1, type=int, - help="Tree width for GroupbyStatistics operations on high-cardinality " - "columns (Default 8)", + help="Number of files needed to store unique values for high-cardinality " + "columns (Default 1)", ) parser.add_argument( "--cat-cache-high", diff --git a/cpp/nvtabular/inference/categorify.cc b/cpp/nvtabular/inference/categorify.cc index d0e1f6496d..e9b50c0cdd 100644 --- a/cpp/nvtabular/inference/categorify.cc +++ b/cpp/nvtabular/inference/categorify.cc @@ -43,7 +43,7 @@ namespace nvtabular if ((dtype.kind() == 'O') || (dtype.kind() == 'U')) { - int64_t i = 0; + int64_t i = UNIQUE_OFFSET; for (auto &value : values) { if (!py::cast(isnull(value))) @@ -138,20 +138,29 @@ namespace nvtabular size_t size = values.size(); for (size_t i = 0; i < size; ++i) { - mapping_int[static_cast(data[i])] = i; + mapping_int[static_cast(data[i])] = i + UNIQUE_OFFSET; } } template py::array transform_int(py::array_t input) const { + py::object pandas = py::module_::import("pandas"); + py::object isnull = pandas.attr("isnull"); py::array_t output(input.size()); const T *input_data = input.data(); int64_t *output_data = output.mutable_data(); for (int64_t i = 0; i < input.size(); ++i) { auto it = mapping_int.find(static_cast(input_data[i])); - output_data[i] = it == mapping_int.end() ? 0 : it->second; + if (it == mapping_int.end()) + { + output_data[i] = py::cast(isnull(input_data[i])) ? NULL_INDEX : OOV_INDEX; + } + else + { + output_data[i] = it->second; + } } return output; } @@ -169,18 +178,18 @@ namespace nvtabular { if (value.is_none()) { - data[i] = 0; + data[i] = NULL_INDEX; } else if (PyUnicode_Check(value.ptr()) || PyBytes_Check(value.ptr())) { std::string key = py::cast(value); auto it = mapping_str.find(key); - data[i] = it == mapping_str.end() ? 0 : it->second; + data[i] = it == mapping_str.end() ? OOV_INDEX : it->second; } else if (PyBool_Check(value.ptr())) { auto it = mapping_int.find(value.ptr() == Py_True); - data[i] = it == mapping_int.end() ? 0 : it->second; + data[i] = it == mapping_int.end() ? OOV_INDEX : it->second; } else { @@ -247,6 +256,11 @@ namespace nvtabular std::unordered_map mapping_str; std::unordered_map mapping_int; + + // TODO: Handle multiple OOV buckets? + const int64_t NULL_INDEX = 1; + const int64_t OOV_INDEX = 2; + const int64_t UNIQUE_OFFSET = 3; }; // Reads in a parquet category mapping file in cpu memory using pandas diff --git a/nvtabular/ops/categorify.py b/nvtabular/ops/categorify.py index af2ea026f5..2f4285738f 100644 --- a/nvtabular/ops/categorify.py +++ b/nvtabular/ops/categorify.py @@ -13,8 +13,10 @@ # limitations under the License. # +import math import os import warnings +from collections import defaultdict from copy import deepcopy from dataclasses import dataclass from operator import getitem @@ -25,15 +27,17 @@ import numpy as np import pandas as pd import pyarrow as pa +import pyarrow.dataset as pa_ds from dask.base import tokenize +from dask.blockwise import BlockIndex from dask.core import flatten -from dask.dataframe.core import _concat +from dask.dataframe.core import DataFrame as DaskDataFrame +from dask.dataframe.core import _concat, new_dd_object from dask.dataframe.shuffle import shuffle_group from dask.delayed import Delayed from dask.highlevelgraph import HighLevelGraph from dask.utils import parse_bytes from fsspec.core import get_fs_token_paths -from pyarrow import parquet as pq from merlin.core import dispatch from merlin.core.dispatch import DataFrameType, annotate, is_cpu_object, nullable_series @@ -43,6 +47,12 @@ from merlin.schema import Schema, Tags from nvtabular.ops.operator import ColumnSelector, Operator +# Constants +# (NVTabular will reserve `0` for padding and `1` for nulls) +PAD_OFFSET = 0 +NULL_OFFSET = 1 +OOV_OFFSET = 2 + class Categorify(StatOperator): """ @@ -52,6 +62,13 @@ class Categorify(StatOperator): Categorify operation can be added to the workflow to transform categorical features into unique integer values. + Encoding Convention:: + + - `0`: Not used by `Categorify` (reserved for padding). + - `1`: Null and NaN values. + - `[2, 2 + num_buckets)`: OOV values (including hash buckets). + - `[2 + num_buckets, max_size)`: Unique vocabulary. + Example usage:: # Define pipeline @@ -121,7 +138,7 @@ class Categorify(StatOperator): freq_threshold : int or dictionary:{column: freq_limit_value}, default 0 Categories with a count/frequency below this threshold will be omitted from the encoding and corresponding data will be mapped - to the "null" category. Can be represented as both an integer or + to the OOV indices. Can be represented as both an integer or a dictionary with column names as keys and frequency limit as value. If dictionary is used, all columns targeted must be included in the dictionary. @@ -131,12 +148,15 @@ class Categorify(StatOperator): encoded as a new column. Note that replacement is not allowed for "combo", because the same column name can be included in multiple groups. - tree_width : dict or int, optional - Tree width of the hash-based groupby reduction for each categorical - column. High-cardinality columns may require a large `tree_width`, - while low-cardinality columns can likely use `tree_width=1`. + split_out : dict or int, optional + Number of files needed to store the unique values of each categorical + column. High-cardinality columns may require `split_out>1`, while + low-cardinality columns should be fine with the `split_out=1` default. If passing a dict, each key and value should correspond to the column - name and width, respectively. The default value is 8 for all columns. + name and value, respectively. The default value is 1 for all columns. + split_every : dict or int, optional + Number of adjacent partitions to aggregate in each tree-reduction + node. The default value is 8 for all columns. out_path : str, optional Root directory where groupby statistics will be written out in parquet format. @@ -145,8 +165,6 @@ class Categorify(StatOperator): groupby reduction. The extra host <-> device data movement can reduce performance. However, using `on_host=True` typically improves stability (by avoiding device-level memory pressure). - na_sentinel : default 0 - Label to use for null-category mapping cat_cache : {"device", "host", "disk"} or dict Location to cache the list of unique categories for each categorical column. If passing a dict, each key and value @@ -160,32 +178,24 @@ class Categorify(StatOperator): for multi-column groups. search_sorted : bool, default False. Set it True to apply searchsorted algorithm in encoding. - num_buckets : int, or dictionary:{column: num_hash_buckets} - Column-wise modulo to apply after hash function. Note that this - means that the corresponding value will be the categorical cardinality - of the transformed categorical feature. If given as an int, that value - will be used as the number of "hash buckets" for every feature. If a dictionary is passed, - it will be used to specify explicit mappings from a column name to a number of buckets. - In this case, only the columns specified in the keys of `num_buckets` - will be transformed. - max_size : int or dictionary:{column: max_size_value}, default 0 - This parameter allows you to set the maximum size for an embedding table for each column. - For example, if max_size is set to 1000 only the first 999 most frequent values for each - column will be be encoded, and the rest will be mapped to a single value (0). To map the - rest to a number of buckets, you can set the num_buckets parameter > 1. In that case, topK - value will be `max_size - num_buckets -1`. Setting the max_size param means that - freq_threshold should not be given. If the num_buckets parameter is set, it must be - smaller than the max_size value. - start_index: int, default 0 - The start index where Categorify will begin to translate dataframe entries - into integer values, including an initial out-of-vocabulary encoding value. - For instance, if our original translated dataframe entries appear - as [[1], [1, 4], [3, 2], [2]], with an out-of-vocabulary value of 0, then with a - start_index of 16, Categorify will reserve 16 as the out-of-vocabulary encoding value, - and our new translated dataframe entry will now be [[17], [17, 20], [19, 18], [18]]. - This parameter is useful to reserve an initial segment of non-negative translated integers - for special user-defined values. - cardinality_memory_limit: int or str, default None + num_buckets : int, or dictionary:{column: num_oov_indices}, optional + Number of indices to reserve for out-of-vocabulary (OOV) encoding at + transformation time. By default, all OOV values will be mapped to + the same index (`2`). If `num_buckets` is set to an integer greater + than one, a column-wise hash and modulo will be used to map each OOV + value to an index in the range `[2, 2 + num_buckets)`. A dictionary + may be used if the desired `num_buckets` behavior varies by column. + max_size : int or dictionary:{column: max_size_value}, optional + Set the maximum size of the expected embedding table for each column. + For example, if `max_size` is set to 1000, only the first 997 most- + frequent values will be included in the unique-value vocabulary, and + all remaining non-null values will be mapped to the OOV indices + (indices `0` and `1` will still be reserved for padding and nulls). + To use multiple OOV indices for infrequent values, set the `num_buckets` + parameter accordingly. Note that `max_size` cannot be combined with + `freq_threshold`, and it cannot be less than `num_buckets + 2`. By + default, the total number of encoding indices will be unconstrained. + cardinality_memory_limit: int or str, optional Upper limit on the "allowed" memory usage of the internal DataFrame and Table objects used to store unique categories. By default, this limit is 12.5% of the total memory. Note that this argument is meant as a guide for internal optimizations and UserWarnings @@ -196,8 +206,6 @@ def __init__( self, freq_threshold=0, out_path=None, - tree_width=None, - na_sentinel=None, cat_cache="host", dtype=None, on_host=True, @@ -207,10 +215,40 @@ def __init__( num_buckets=None, vocabs=None, max_size=0, - start_index=0, single_table=False, cardinality_memory_limit=None, + tree_width=None, + split_out=1, + split_every=8, + **kwargs, # Deprecated/unsupported arguments ): + # Handle deprecations and unsupported kwargs + if "start_index" in kwargs: + raise ValueError( + "start_index is now deprecated. `Categorify` will always " + "reserve index `0` for user-specific purposes, and will " + "use index `1` for null values." + ) + if "na_sentinel" in kwargs: + raise ValueError( + "na_sentinel is now deprecated. `Categorify` will always " + "reserve index `1` for null values, and the following " + "`num_buckets` indices for out-of-vocabulary values " + "(or just index `2` if `num_buckets is None`)." + ) + if kwargs: + raise ValueError(f"Unrecognized key-word arguments: {kwargs}") + + # Warn user if they set num_buckets without setting max_size or + # freq_threshold - This setting used to hash everything, but will + # now just use multiple indices for OOV encodings at transform time + if num_buckets and not (max_size or freq_threshold): + warnings.warn( + "You are setting num_buckets without using max_size or " + "freq_threshold to restrict the number of distinct " + "categories. Are you sure this is what you want?" + ) + # We need to handle three types of encoding here: # # (1) Conventional encoding. There are no multi-column groups. So, @@ -255,15 +293,15 @@ def __init__( self.single_table = single_table self.freq_threshold = freq_threshold or 0 self.out_path = out_path or "./" - self.tree_width = tree_width - self.na_sentinel = na_sentinel or 0 self.dtype = dtype self.on_host = on_host self.cat_cache = cat_cache self.encode_type = encode_type self.search_sorted = search_sorted - self.start_index = start_index self.cardinality_memory_limit = cardinality_memory_limit + self.split_every = split_every + self.split_out = split_out + _deprecate_tree_width(tree_width) if self.search_sorted and self.freq_threshold: raise ValueError( @@ -344,8 +382,12 @@ def fit(self, col_selector: ColumnSelector, ddf: dd.DataFrame): columns = [ list(c) if isinstance(c, tuple) else c for c in col_selector.grouped_names - if c not in cols_with_vocabs + if (_make_name(*c, sep=self.name_sep) if isinstance(c, tuple) else c) + not in cols_with_vocabs ] + if not columns: + return Delayed("no-op", {"no-op": {}}) + # Define a rough row-count at which we are likely to # start hitting memory-pressure issues that cannot # be accommodated with smaller partition sizes. @@ -381,19 +423,26 @@ def process_vocabs(self, vocabs): if isinstance(vocabs, dict) and all(dispatch.is_series_object(v) for v in vocabs.values()): fit_options = self._create_fit_options_from_columns(list(vocabs.keys())) base_path = os.path.join(self.out_path, fit_options.stat_name) + num_buckets = fit_options.num_buckets os.makedirs(base_path, exist_ok=True) for col, vocab in vocabs.items(): - vals = {col: vocab} - if vocab.iloc[0] is not None: - with_empty = dispatch.add_to_series(vocab, [None]).reset_index()[0] - vals = {col: with_empty} - - save_path = os.path.join(base_path, f"unique.{col}.parquet") - col_df = dispatch.make_df(vals) - col_df.to_parquet(save_path) - categories[col] = save_path + col_name = _make_name(*col, sep=self.name_sep) if isinstance(col, tuple) else col + vals = {col_name: vocab} + oov_count = 1 + if num_buckets: + oov_count = ( + num_buckets if isinstance(num_buckets, int) else num_buckets[col_name] + ) or 1 + col_df = dispatch.make_df(vals).dropna() + col_df.index += NULL_OFFSET + oov_count + save_path = _save_encodings(col_df, base_path, col_name) + categories[col_name] = save_path elif isinstance(vocabs, dict) and all(isinstance(v, str) for v in vocabs.values()): - categories = vocabs + # TODO: How to deal with the fact that this file may be missing null and oov rows?? + categories = { + (_make_name(*col, sep=self.name_sep) if isinstance(col, tuple) else col): path + for col, path in vocabs.items() + } else: error = """Unrecognized vocab type, please provide either a dictionary with paths to parquet files @@ -410,13 +459,14 @@ def _create_fit_options_from_columns(self, columns) -> "FitOptions": [], self.out_path, self.freq_threshold, - self.tree_width, + self.split_out, self.on_host, concat_groups=self.encode_type == "joint", name_sep=self.name_sep, max_size=self.max_size, num_buckets=self.num_buckets, cardinality_memory_limit=self.cardinality_memory_limit, + split_every=self.split_every, ) def set_storage_path(self, new_path, copy=False): @@ -463,7 +513,6 @@ def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFram path, df, self.cat_cache, - na_sentinel=self.na_sentinel, freq_threshold=self.freq_threshold[name] if isinstance(self.freq_threshold, dict) else self.freq_threshold, @@ -473,7 +522,12 @@ def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFram cat_names=column_names, max_size=self.max_size, dtype=self.output_dtype, - start_index=self.start_index, + split_out=( + self.split_out.get(storage_name, 1) + if isinstance(self.split_out, dict) + else self.split_out + ), + single_table=self.single_table, ) new_df[name] = encoded except Exception as e: @@ -516,7 +570,6 @@ def _compute_properties(self, col_schema, input_schema): "max_size": self.max_size[col_name] if isinstance(self.max_size, dict) else self.max_size, - "start_index": self.start_index, "cat_path": target_category_path, "domain": {"min": 0, "max": cardinality - 1, "name": category_name}, "embedding_sizes": {"cardinality": cardinality, "dimension": dimensions}, @@ -543,14 +596,7 @@ def compute_selector( return parents_selector def get_embedding_sizes(self, columns): - return _get_embeddings_dask( - self.categories, - columns, - self.num_buckets, - self.freq_threshold, - self.max_size, - self.start_index, - ) + return _get_embeddings_dask(self.categories, columns, self.num_buckets) def inference_initialize(self, columns, inference_config): # we don't currently support 'combo' @@ -616,34 +662,23 @@ def get_embedding_sizes(source, output_dtypes=None): return single_hots, multi_hots -def _get_embeddings_dask(paths, cat_names, buckets=0, freq_limit=0, max_size=0, start_index=0): +def _get_embeddings_dask(paths, cat_names, buckets=0): embeddings = {} - if isinstance(freq_limit, int): - freq_limit = {name: freq_limit for name in cat_names} if isinstance(buckets, int): buckets = {name: buckets for name in cat_names} - if isinstance(max_size, int): - max_size = {name: max_size for name in cat_names} for col in cat_names: path = paths.get(col) - num_rows = pq.ParquetFile(path).metadata.num_rows if path else 0 + num_rows = OOV_OFFSET + if path: + for file_frag in pa_ds.dataset(path, format="parquet").get_fragments(): + num_rows += file_frag.metadata.num_rows if isinstance(buckets, dict): bucket_size = buckets.get(col, 0) elif isinstance(buckets, int): bucket_size = buckets else: - bucket_size = 0 - - _has_frequency_limit = col in freq_limit and freq_limit[col] > 0 - _has_max_size = col in max_size and max_size[col] > 0 - - if bucket_size and not _has_frequency_limit and not _has_max_size: - # pure hashing (no categorical lookup) - num_rows = bucket_size - else: - num_rows += bucket_size - - num_rows += start_index + bucket_size = 1 + num_rows += bucket_size embeddings[col] = _emb_sz_rule(num_rows) return embeddings @@ -656,6 +691,136 @@ def _make_name(*args, sep="_"): return sep.join(args) +def _to_parquet_dask_lazy(df, path, write_index=False): + # Write DataFrame data to parquet (lazily) with dask + + # Check if we already have a dask collection + is_collection = isinstance(df, DaskDataFrame) + + # Use `ddf.to_parquet` method + kwargs = { + "overwrite": True, + "compute": False, + "write_index": write_index, + "schema": None, + } + return ( + df + if is_collection + else dispatch.convert_data( + df, + cpu=isinstance(df, pd.DataFrame), + to_collection=True, + ) + ).to_parquet(path, **kwargs) + + +def _save_encodings( + df, + base_path, + field_name, + preserve_index=False, + first_n=None, + freq_threshold=None, + oov_count=1, + null_size=None, +): + # Write DataFrame data to parquet (eagerly) with dask + + # Define paths + unique_path = "/".join([str(base_path), f"unique.{field_name}.parquet"]) + meta_path = "/".join([str(base_path), f"meta.{field_name}.parquet"]) + + # Check if we already have a dask collection + is_collection = isinstance(df, DaskDataFrame) + + # Create empty directory if it doesn't already exist + use_directory = is_collection and df.npartitions > 1 + fs = get_fs_token_paths(unique_path, mode="wb")[0] + _path = fs._strip_protocol(unique_path) + if fs.isdir(_path) or fs.exists(_path): + fs.rm(_path, recursive=True) + if use_directory: + fs.mkdir(_path, exists_ok=True) + + # Start tracking embedding metadata + record_size_meta = True + oov_size = 0 + unique_count = 0 + unique_size = 0 + + # Iterate over partitions and write to disk + size = oov_count + OOV_OFFSET # Reserve null and oov buckets + for p, part in enumerate(df.partitions if is_collection else [df]): + local_path = "/".join([unique_path, f"part.{p}.parquet"]) if use_directory else unique_path + _df = _compute_sync(part) if is_collection else part + _len = len(_df) + if _len == 0: + continue + + size_col = f"{field_name}_size" + if size_col not in _df.columns: + record_size_meta = False + + if record_size_meta: + # Set number of rows allowed from this part + if first_n is not None: + first_n_local = first_n - size + else: + first_n_local = _len + + # Update oov size + if first_n or freq_threshold: + removed = None + if freq_threshold: + sizes = _df[size_col] + removed = df[(sizes < freq_threshold) & (sizes > 0)] + _df = _df[(sizes >= freq_threshold) | (sizes == 0)] + if first_n and _len > first_n_local: + removed = _df.iloc[first_n_local:] + _df = _df.iloc[:first_n_local] + if removed is not None: + oov_size += removed[size_col].sum() + _len = len(_df) + + # Record unique-value metadata + unique_size += _df[size_col].sum() + + if not preserve_index: + # If we are NOT writing the index of df, + # then make sure we are writing a "correct" + # index. Note that we avoid using ddf.to_parquet + # so that we can make sure the index is correct + _df.set_index( + pd.RangeIndex( + start=size, + stop=size + _len, + step=1, + ), + drop=True, + inplace=True, + ) + + size += _len + unique_count += _len + _df.to_parquet(local_path, compression=None) + if first_n and size >= first_n: + break # Ignore any remaining files + + # Write encoding metadata + meta = { + "kind": ["pad", "null", "oov", "unique"], + "offset": [PAD_OFFSET, NULL_OFFSET, OOV_OFFSET, OOV_OFFSET + oov_count], + "num_indices": [1, 1, oov_count, unique_count], + } + if record_size_meta: + meta["num_observed"] = [0, null_size, oov_size, unique_size] + type(_df)(meta).to_parquet(meta_path) + + # Return path to uniques + return unique_path + + @dataclass class FitOptions: """Contains options on how to fit statistics. @@ -675,8 +840,8 @@ class FitOptions: Categories with a count/frequency below this threshold will be omitted from the encoding and corresponding data will be mapped to the "null" category. - tree_width: - Tree width of the hash-based groupby reduction for each categorical column. + split_out: + Number of output partitions to use for each category in ``fit``. on_host: Whether to convert cudf data to pandas between tasks in the groupby reduction. stat_name: @@ -690,10 +855,10 @@ class FitOptions: num_buckets: If specified will also do hashing operation for values that would otherwise be mapped to as unknown (by freq_limit or max_size parameters) - start_index: int - The index to start mapping our output categorical values to. cardinality_memory_limit: int Suggested upper limit on categorical data containers. + split_every: + Number of adjacent partitions to reduce in each tree node. """ col_groups: list @@ -701,15 +866,15 @@ class FitOptions: agg_list: list out_path: str freq_limit: Union[int, dict] - tree_width: Union[int, dict] + split_out: Union[int, dict] on_host: bool stat_name: str = "categories" concat_groups: bool = False name_sep: str = "-" max_size: Optional[Union[int, dict]] = None num_buckets: Optional[Union[int, dict]] = None - start_index: int = 0 cardinality_memory_limit: Optional[int] = None + split_every: Optional[Union[int, dict]] = 8 def __post_init__(self): if not isinstance(self.col_groups, ColumnSelector): @@ -733,8 +898,62 @@ def __post_init__(self): self.col_groups = col_selectors +def _general_concat( + frames, + cardinality_memory_limit=False, + col_selector=None, + **kwargs, +): + # Concatenate DataFrame or pa.Table objects + if isinstance(frames[0], pa.Table): + df = pa.concat_tables(frames, promote=True) + if ( + cardinality_memory_limit + and col_selector is not None + and df.nbytes > cardinality_memory_limit + ): + # Before fully converting this pyarrow Table + # to a cudf DatFrame, we can reduce the memory + # footprint of `df`. Since the size of `df` + # depends on the cardinality of the features, + # and NOT on the partition size, the remaining + # logic in this function has an OOM-error risk + # (even with tiny partitions). + size_columns = [] + for col in col_selector.names: + name = col + "_size" + if name in df.schema.names: + # Convert this column alone to cudf, + # and drop the field from df. Note that + # we are only converting this column to + # cudf to take advantage of fast `max` + # performance. + size_columns.append(dispatch.from_host(df.select([name]))) + df = df.drop([name]) + # Use numpy to calculate the "minimum" + # dtype needed to capture the "size" column, + # and cast the type + typ = np.min_scalar_type(size_columns[-1][name].max() * 2) + size_columns[-1][name] = size_columns[-1][name].astype(typ) + # Convert the remaining columns in df to cudf, + # and append the type-casted "size" columns + df = dispatch.concat_columns([dispatch.from_host(df)] + size_columns) + else: + # Empty DataFrame - No need for type-casting + df = dispatch.from_host(df) + return df + else: + # For now, if we are not concatenating in host memory, + # we will assume that reducing the memory footprint of + # "size" columns is not a priority. However, the same + # type-casting optimization can also be done for both + # pandas and cudf-backed data here. + return _concat(frames, **kwargs) + + @annotate("top_level_groupby", color="green", domain="nvt_python") -def _top_level_groupby(df, options: FitOptions): +def _top_level_groupby(df, options: FitOptions = None, spill=True): + assert options is not None sum_sq = "std" in options.agg_list or "var" in options.agg_list calculate_min = "min" in options.agg_list calculate_max = "max" in options.agg_list @@ -804,12 +1023,25 @@ def _top_level_groupby(df, options: FitOptions): ] gb.reset_index(inplace=True, drop=False) del df_gb + + # Extract null groups into gb_null + isnull = gb.isnull().any(1) + gb_null = gb[~isnull] + gb = gb[isnull] + if not len(gb_null): + gb_null = None + del isnull + # Split the result by the hash value of the categorical column - nsplits = options.tree_width[cat_col_selector_str] + nsplits = options.split_out[cat_col_selector_str] for j, split in shuffle_group( gb, cat_col_selector.names, 0, nsplits, nsplits, True, nsplits ).items(): - if options.on_host and not is_cpu_object(split): + if gb_null is not None: + # Guarantee that the first split will contain null groups + split = _concat([gb_null, split], ignore_index=True) + gb_null = None + if spill and options.on_host and not is_cpu_object(split): output[k] = split.to_arrow(preserve_index=False) else: output[k] = split @@ -819,28 +1051,32 @@ def _top_level_groupby(df, options: FitOptions): @annotate("mid_level_groupby", color="green", domain="nvt_python") -def _mid_level_groupby(dfs, col_selector: ColumnSelector, freq_limit_val, options: FitOptions): +def _mid_level_groupby(dfs, col_selector: ColumnSelector, options: FitOptions, spill=True): if options.concat_groups and len(col_selector.names) > 1: col_selector = ColumnSelector([_make_name(*col_selector.names, sep=options.name_sep)]) - if options.on_host and not is_cpu_object(dfs[0]): - # Construct gpu DataFrame from pyarrow data. - # `on_host=True` implies gpu-backed data. - df = pa.concat_tables(dfs, promote=True) - df = dispatch.from_host(df) - else: - df = _concat(dfs, ignore_index=True) - + df = _general_concat(dfs, ignore_index=True) groups = df.groupby(col_selector.names, dropna=False) gb = groups.agg( {col: _get_aggregation_type(col) for col in df.columns if col not in col_selector.names} ) gb.reset_index(drop=False, inplace=True) + if spill and options.on_host and not is_cpu_object(gb): + gb_pd = gb.to_arrow(preserve_index=False) + del gb + return gb_pd + return gb + + +@annotate("bottom_level_groupby", color="green", domain="nvt_python") +def _bottom_level_groupby(dfs, col_selector: ColumnSelector, options: FitOptions, spill=True): + gb = _mid_level_groupby(dfs, col_selector, options, spill=False) + if options.concat_groups and len(col_selector.names) > 1: + col_selector = ColumnSelector([_make_name(*col_selector.names, sep=options.name_sep)]) + name_count = _make_name(*(col_selector.names + ["count"]), sep=options.name_sep) name_size = _make_name(*(col_selector.names + ["size"]), sep=options.name_sep) - if options.freq_limit and not options.max_size: - gb = gb[gb[name_size] >= freq_limit_val] required = col_selector.names.copy() if "count" in options.agg_list: @@ -893,7 +1129,7 @@ def _mid_level_groupby(dfs, col_selector: ColumnSelector, freq_limit_val, option required.append(name_std) gb[name_std] = np.sqrt(result) - if options.on_host and not is_cpu_object(gb[required]): + if spill and options.on_host and not is_cpu_object(gb[required]): gb_pd = gb[required].to_arrow(preserve_index=False) del gb return gb_pd @@ -909,50 +1145,15 @@ def _get_aggregation_type(col): return "sum" -@annotate("write_gb_stats", color="green", domain="nvt_python") -def _write_gb_stats(dfs, base_path, col_selector: ColumnSelector, options: FitOptions): - if options.concat_groups and len(col_selector) > 1: - col_selector = ColumnSelector([_make_name(*col_selector.names, sep=options.name_sep)]) - - rel_path = "cat_stats.%s.parquet" % (_make_name(*col_selector.names, sep=options.name_sep)) - path = os.path.join(base_path, rel_path) - pwriter = None - if (not options.on_host or is_cpu_object(dfs[0])) and len(dfs): - # Want first non-empty df for schema (if there are any) - _d = next((df for df in dfs if len(df)), dfs[0]) - pwriter = dispatch.parquet_writer_dispatch(_d, path=path, compression=None) - - # Loop over dfs and append to file - # TODO: For high-cardinality columns, should support - # Dask-based to_parquet call here (but would need to - # support directory reading within dependent ops) - n_writes = 0 - for df in dfs: - if len(df): - if options.on_host and not is_cpu_object(df): - # Use pyarrow - df is already a pyarrow table - if pwriter is None: - pwriter = pq.ParquetWriter(path, df.schema, compression=None) - pwriter.write_table(df) - else: - # df is a cudf or pandas DataFrame - df.reset_index(drop=True, inplace=True) - pwriter.write_table(df) - n_writes += 1 - - # No data to write - if n_writes == 0: - raise RuntimeError("GroupbyStatistics result is empty.") - - # Close writer and return path - if pwriter is not None: - pwriter.close() - - return path - - @annotate("write_uniques", color="green", domain="nvt_python") -def _write_uniques(dfs, base_path, col_selector: ColumnSelector, options: FitOptions): +def _write_uniques( + dfs, + base_path, + col_selector: ColumnSelector, + options: FitOptions, + cpu: bool, + path: str = None, +): """Writes out a dataframe to a parquet file. Parameters @@ -976,53 +1177,114 @@ def _write_uniques(dfs, base_path, col_selector: ColumnSelector, options: FitOpt if options.concat_groups and len(col_selector.names) > 1: col_selector = ColumnSelector([_make_name(*col_selector.names, sep=options.name_sep)]) - if options.on_host: - # Construct gpu DataFrame from pyarrow data. - # `on_host=True` implies gpu-backed data, - # because CPU-backed data would have never - # been converted from pandas to pyarrow. - df = pa.concat_tables(dfs, promote=True) - if ( - df.nbytes > options.cardinality_memory_limit - if options.cardinality_memory_limit - else False - ): - # Before fully converting this pyarrow Table - # to a cudf DatFrame, we can reduce the memory - # footprint of `df`. Since the size of `df` - # depends on the cardinality of the features, - # and NOT on the partition size, the remaining - # logic in this function has an OOM-error risk - # (even with tiny partitions). - size_columns = [] - for col in col_selector.names: - name = col + "_size" - if name in df.schema.names: - # Convert this column alone to cudf, - # and drop the field from df. Note that - # we are only converting this column to - # cudf to take advantage of fast `max` - # performance. - size_columns.append(dispatch.from_host(df.select([name]))) - df = df.drop([name]) - # Use numpy to calculate the "minimum" - # dtype needed to capture the "size" column, - # and cast the type - typ = np.min_scalar_type(size_columns[-1][name].max() * 2) - size_columns[-1][name] = size_columns[-1][name].astype(typ) - # Convert the remaining columns in df to cudf, - # and append the type-casted "size" columns - df = dispatch.concat_columns([dispatch.from_host(df)] + size_columns) - else: - # Empty DataFrame - No need for type-casting - df = dispatch.from_host(df) + # Set max_emb_size + # This is the maximum number of rows we will write to + # the unique-value parquet files + col_name = col_selector.names[0] + max_emb_size = options.max_size + if max_emb_size: + max_emb_size = max_emb_size[col_name] if isinstance(max_emb_size, dict) else max_emb_size + + # Set num_buckets + # This is the maximum number of indices + num_buckets = options.num_buckets + if num_buckets: + num_buckets = num_buckets if isinstance(num_buckets, int) else num_buckets[col_name] + oov_count = num_buckets or 1 + + # Set freq_threshold + # This is the minimum unique count for a distinct + # category to be included in the unique-value files + freq_threshold = options.freq_limit + if freq_threshold: + freq_threshold = ( + freq_threshold if isinstance(freq_threshold, int) else freq_threshold[col_name] + ) + + # Sanity check + if max_emb_size and max_emb_size < oov_count + 2: + raise ValueError( + "`max_size` can never be less than the maximum of " + "`num_buckets + 2` and `3`, because we must always " + "reserve pad, null and at least 1 oov-bucket index." + ) + + null_size = None + if path: + # We have a parquet path to construct uniques from + # (rather than a list of DataFrame objects) + df = dispatch.read_dispatch(cpu=cpu, collection=True)( + path, + split_row_groups=False, + ).reset_index(drop=True) + + # Check if we need to compute the DataFrame collection + # of unique values. For now, we can avoid doing this when + # we are not jointly encoding multiple columns + if simple := (len(col_selector.names) == 1 and df.npartitions > 1): + col_name = col_selector.names[0] + name_size = col_name + "_size" + has_size = name_size in df + try: + # Sort by col_name + df = df.sort_values(col_name, na_position="first") + except (NotImplementedError, TypeError): + # Dask-based sort failed - Need to compute first + simple = False + + # At this point, `simple` may have changed from True to False + # if the backend library failed to sort by the target column. + if simple: + # Define the null row + def _drop_first_row(part, index): + return part.iloc[1:] if index == (0,) else part + + null_row = df.head(1) + if null_row[col_name].iloc[:1].isnull().any(): + df = df.map_partitions(_drop_first_row, BlockIndex((df.npartitions,))) + if has_size: + null_size = null_row[name_size].iloc[0] + else: + null_size = 0 + + # Sort by size (without null and oov rows) + if has_size: + # Avoid using dask_cudf to calculate divisions + # (since it may produce too-few partitions) + df = df.sort_values( + name_size, + ascending=False, + divisions=dd.shuffle._calculate_divisions( + df, df[name_size], False, df.npartitions + )[0][::-1], + ) + + unique_path = _save_encodings( + df, + base_path, + _make_name(*col_selector.names, sep=options.name_sep), + first_n=max_emb_size, + freq_threshold=freq_threshold, + oov_count=oov_count, + null_size=null_size, + ) + + # TODO: Delete temporary parquet file(s) now thet the final + # uniques are written to disk? (May not want to wait on deletion) + return unique_path + + # If we have reached this point, we have a dask collection + # that must be computed before continuing + df = _compute_sync(df) else: - # For now, if we are not concatenating in host memory, - # we will assume that reducing the memory footprint of - # "size" columns is not a priority. However, the same - # type-casting optimization can also be done for both - # pandas and cudf-backed data here. - df = _concat(dfs, ignore_index=True) + # We have a list of DataFrame objects. + # Collect aggregation results into single frame + df = _general_concat( + dfs, + cardinality_memory_limit=options.cardinality_memory_limit, + col_selector=col_selector, + ignore_index=True, + ) # Check if we should warn user that this Column is likely # to cause memory-pressure issues @@ -1035,146 +1297,48 @@ def _write_uniques(dfs, base_path, col_selector: ColumnSelector, options: FitOpt f"(12.5% of the total memory by default)" ) - rel_path = "unique.%s.parquet" % (_make_name(*col_selector.names, sep=options.name_sep)) - path = "/".join([base_path, rel_path]) if len(df): # Make sure first category is Null. # Use ignore_index=True to avoid allocating memory for # an index we don't even need df = df.sort_values(col_selector.names, na_position="first", ignore_index=True) - name_size_multi = "_".join(col_selector.names + ["size"]) - if len(col_selector.names) > 1 and name_size_multi in df: - # Using "combo" encoding - df = _combo_encode(df, name_size_multi, col_selector, options) - else: - # Using (default) "joint" encoding - df = _joint_encode(df, col_selector, options) + has_size = name_size_multi in df + + # Check if we already have a null row + has_nans = df[col_selector.names].iloc[0].transpose().isnull().all() + if hasattr(has_nans, "iloc"): + has_nans = has_nans[0] - df.to_parquet(path, index=False, compression=None) + if has_nans: + if has_size: + null_size = df[name_size_multi].iloc[0] + df = df.iloc[1:] + else: + null_size = 0 + if has_size: + df = df.sort_values(name_size_multi, ascending=False, ignore_index=True) + df_write = df else: if hasattr(df, "convert_dtypes"): df = df.convert_dtypes() df_null = type(df)({c: [None] for c in col_selector.names}) for c in col_selector.names: df_null[c] = df_null[c].astype(df[c].dtype) - df_null.to_parquet(path, index=False, compression=None) - + df_write = df_null + + unique_path = _save_encodings( + df_write, + base_path, + _make_name(*col_selector.names, sep=options.name_sep), + first_n=max_emb_size, + freq_threshold=freq_threshold, + oov_count=oov_count, + null_size=null_size, + ) del df - return path - - -@annotate("_combo_encode", color="green", domain="nvt_python") -def _combo_encode(df, name_size_multi: str, col_selector: ColumnSelector, options: FitOptions): - # Combo-encoding utility (used by _write_uniques) - - # Account for max_size and num_buckets - if options.max_size: - max_emb_size = options.max_size - if isinstance(options.max_size, dict): - raise NotImplementedError( - "Cannot specify max_size as a dictionary for 'combo' encoding." - ) - if options.num_buckets: - if isinstance(options.num_buckets, dict): - raise NotImplementedError( - "Cannot specify num_buckets as a dictionary for 'combo' encoding." - ) - nlargest = max_emb_size - options.num_buckets - 1 - else: - nlargest = max_emb_size - 1 - - if nlargest <= 0: - raise ValueError("`nlargest` cannot be 0 or negative") - - if nlargest < len(df): - # sort based on count (name_size_multi column) - df = df.nlargest(n=nlargest, columns=name_size_multi) - - # Deal with nulls - has_nans = df[col_selector.names].iloc[0].transpose().isnull().all() - if hasattr(has_nans, "iloc"): - has_nans = has_nans[0] - if not has_nans: - null_data = {col: nullable_series([None], df, df[col].dtype) for col in col_selector.names} - null_data[name_size_multi] = [0] - null_df = type(df)(null_data) - df = _concat([null_df, df], ignore_index=True) - - return df - - -@annotate("_joint_encode", color="green", domain="nvt_python") -def _joint_encode(df, col_selector: ColumnSelector, options: FitOptions): - # Joint-encoding utility (used by _write_uniques) - - new_cols = {} - nulls_missing = False - for col in col_selector.names: - name_size = col + "_size" - null_size = 0 - # Set null size if first element in `col` is - # null, and the `size` aggregation is known - if name_size in df and df[col].iloc[:1].isnull().any(): - null_size = df[name_size].iloc[0] - if options.max_size: - max_emb_size = options.max_size - if isinstance(options.max_size, dict): - max_emb_size = max_emb_size[col] - if options.num_buckets: - if isinstance(options.num_buckets, int): - nlargest = max_emb_size - options.num_buckets - 1 - else: - nlargest = max_emb_size - options.num_buckets[col] - 1 - else: - nlargest = max_emb_size - 1 - - if nlargest <= 0: - raise ValueError("`nlargest` cannot be 0 or negative") - - if nlargest < len(df) and name_size in df: - # remove NAs from column, we have na count from above. - df = df.dropna() # TODO: This seems dangerous - Check this - # sort based on count (name_size column) - df = df.nlargest(n=nlargest, columns=name_size) - new_cols[col] = _concat( - [nullable_series([None], df, df[col].dtype), df[col]], - ignore_index=True, - ) - new_cols[name_size] = _concat( - [nullable_series([null_size], df, df[name_size].dtype), df[name_size]], - ignore_index=True, - ) - # recreate newly "count" ordered df - df = type(df)(new_cols) - if not dispatch.series_has_nulls(df[col]): - if name_size in df: - df = df.sort_values(name_size, ascending=False, ignore_index=True) - - nulls_missing = True - new_cols[col] = _concat( - [nullable_series([None], df, df[col].dtype), df[col]], - ignore_index=True, - ) - if name_size in df: - new_cols[name_size] = _concat( - [nullable_series([null_size], df, df[name_size].dtype), df[name_size]], - ignore_index=True, - ) - - else: - # ensure None aka "unknown" stays at index 0 - if name_size in df: - df_0 = df.iloc[0:1] - df_1 = df.iloc[1:].sort_values(name_size, ascending=False, ignore_index=True) - df = _concat([df_0, df_1]) - new_cols[col] = df[col].copy(deep=False) - - if name_size in df: - new_cols[name_size] = df[name_size].copy(deep=False) - if nulls_missing: - return type(df)(new_cols) - return df + del df_write + return unique_path def _finish_labels(paths, cols): @@ -1183,7 +1347,7 @@ def _finish_labels(paths, cols): def _groupby_to_disk(ddf, write_func, options: FitOptions): if not options.col_groups: - return {} + raise ValueError("no column groups to aggregate") if options.concat_groups: if options.agg_list and not set(options.agg_list).issubset({"count", "size"}): @@ -1193,21 +1357,27 @@ def _groupby_to_disk(ddf, write_func, options: FitOptions): if options.agg_cols: raise ValueError("Cannot aggregate continuous-column stats with concat_groups=True") - # Update tree_width - tw = {} + # Update split_out and split_every + so, se = {}, {} for col in options.col_groups: col = [col] if isinstance(col, str) else col if isinstance(col, tuple): col = list(col) - col_str = _make_name(*col.names, sep=options.name_sep) - if options.tree_width is None: - tw[col_str] = 8 - elif isinstance(options.tree_width, int): - tw[col_str] = options.tree_width - else: - tw[col_str] = options.tree_width.get(col_str, None) or 8 - options.tree_width = tw + + for _d, _opt, _default in [ + (so, options.split_out, 1), + (se, options.split_every, 8), + ]: + if _opt is None: + _d[col_str] = _default + elif isinstance(_opt, int): + _d[col_str] = _opt + else: + _d[col_str] = _opt.get(col_str, _default) + + options.split_out = so + options.split_every = se # Make dedicated output directory for the categories fs = get_fs_token_paths(options.out_path)[0] @@ -1220,59 +1390,157 @@ def _groupby_to_disk(ddf, write_func, options: FitOptions): options.col_groups, options.out_path, options.freq_limit, - options.tree_width, + options.split_out, + options.split_every, options.on_host, ) - level_1_name = "level_1-" + token split_name = "split-" + token - level_2_name = "level_2-" + token - level_3_name = "level_3-" + token + reduce_1_name = "reduce_1-" + token + reduce_3_name = "reduce_3-" + token finalize_labels_name = options.stat_name + "-" + token + + # Use map_partitions to improve task fusion + grouped = ddf.to_bag(format="frame").map_partitions( + _top_level_groupby, options=options, token="level_1" + ) + _grouped_meta = _top_level_groupby(ddf._meta, options=options) + _grouped_meta_col = {} + + dsk_split = defaultdict(dict) for p in range(ddf.npartitions): - dsk[(level_1_name, p)] = (_top_level_groupby, (ddf._name, p), options) k = 0 for c, col in enumerate(options.col_groups): col = [col] if isinstance(col, str) else col col_str = _make_name(*col.names, sep=options.name_sep) - for s in range(options.tree_width[col_str]): - dsk[(split_name, p, c, s)] = (getitem, (level_1_name, p), k) + _grouped_meta_col[c] = _grouped_meta[k] + for s in range(options.split_out[col_str]): + dsk_split[c][(split_name, p, c, s)] = (getitem, (grouped.name, p), k) k += 1 col_groups_str = [] + col_group_frames = [] for c, col in enumerate(options.col_groups): col = [col] if isinstance(col, str) else col col_str = _make_name(*col.names, sep=options.name_sep) col_groups_str.append(col_str) - freq_limit_val = None - if options.freq_limit: - freq_limit_val = ( - options.freq_limit[col_str] - if isinstance(options.freq_limit, dict) - else options.freq_limit - ) - for s in range(options.tree_width[col_str]): - dsk[(level_2_name, c, s)] = ( - _mid_level_groupby, - [(split_name, p, c, s) for p in range(ddf.npartitions)], - col, - freq_limit_val, - options, - ) + reduce_2_name = f"reduce_2-{c}-" + token + for s in range(options.split_out[col_str]): + split_every = options.split_every[col_str] + parts = ddf.npartitions + widths = [parts] + while parts > 1: + parts = math.ceil(parts / split_every) + widths.append(int(parts)) + height = len(widths) + if height >= 2: + # Loop over reduction levels + for depth in range(1, height): + # Loop over reduction groups + for group in range(widths[depth]): + # Calculate inputs for the current group + p_max = widths[depth - 1] + lstart = split_every * group + lstop = min(lstart + split_every, p_max) + if depth == 1: + # Input nodes are from input layer + input_keys = [(split_name, p, c, s) for p in range(lstart, lstop)] + else: + # Input nodes are tree-reduction nodes + input_keys = [ + (reduce_1_name, p, c, s, depth - 1) for p in range(lstart, lstop) + ] + + # Define task + if depth == height - 1: + # Final Node + assert ( + group == 0 + ), f"group = {group}, not 0 for final tree reduction task" + dsk_split[c][(reduce_2_name, s)] = ( + _bottom_level_groupby, + input_keys, + col, + options, + False, + ) + else: + # Intermediate Node + dsk_split[c][(reduce_1_name, group, c, s, depth)] = ( + _mid_level_groupby, + input_keys, + col, + options, + ) + else: + # Deal with single-partition case + dsk_split[c][(reduce_2_name, s)] = ( + _bottom_level_groupby, + [(split_name, 0, c, s)], + col, + options, + False, + ) - dsk[(level_3_name, c)] = ( - write_func, - [(level_2_name, c, s) for s in range(options.tree_width[col_str])], - out_path, + # Make DataFrame collection for column-group result + _meta = _bottom_level_groupby( + [_grouped_meta_col[c]], col, options, + spill=False, ) + _divisions = (None,) * (options.split_out[col_str] + 1) + graph = HighLevelGraph.from_collections(reduce_2_name, dsk_split[c], dependencies=[grouped]) + col_group_frames.append(new_dd_object(graph, reduce_2_name, _meta, _divisions)) + + # Write data to (possibly temporary) parquet files + cpu = isinstance(col_group_frames[-1]._meta, pd.DataFrame) + if write_func is None: + # Write results directly to disk, and use + # a final "barrier" task + if options.concat_groups and len(col) > 1: + col_selector = ColumnSelector([_make_name(*col.names, sep=options.name_sep)]) + else: + col_selector = col + rel_path = "cat_stats.%s.parquet" % ( + _make_name(*col_selector.names, sep=options.name_sep) + ) + path = os.path.join(out_path, rel_path) + col_group_frames[-1] = _to_parquet_dask_lazy(col_group_frames[-1], path) + # Barrier-only task + dsk[(reduce_3_name, c)] = ( + lambda keys, path: path, + col_group_frames[-1].__dask_keys__(), + path, + ) + else: + # Possibly write data to temporary parquet files, + # and perform write operation(s) in final `write_func` task + assert callable(write_func) + if col_group_frames[-1].npartitions > 1 and write_func.__name__ == "_write_uniques": + path = os.path.join(out_path, f"tmp.uniques.{col_str}") + col_group_frames[-1] = _to_parquet_dask_lazy(col_group_frames[-1], path) + else: + path = None + # Write + barrier task + dsk[(reduce_3_name, c)] = ( + write_func, + col_group_frames[-1].__dask_keys__(), + out_path, + col, + options, + cpu, + path, + ) + # Tie everything together into a graph with a single output key dsk[finalize_labels_name] = ( _finish_labels, - [(level_3_name, c) for c, col in enumerate(options.col_groups)], + [(reduce_3_name, c) for c, col in enumerate(options.col_groups)], col_groups_str, ) - graph = HighLevelGraph.from_collections(finalize_labels_name, dsk, dependencies=[ddf]) + graph = HighLevelGraph.from_collections( + finalize_labels_name, dsk, dependencies=col_group_frames + ) return graph, finalize_labels_name @@ -1288,7 +1556,7 @@ def _category_stats(ddf, options: FitOptions): if options.agg_list == []: options.agg_list = ["count"] - return _groupby_to_disk(ddf, _write_gb_stats, options) + return _groupby_to_disk(ddf, None, options) def _encode( @@ -1297,7 +1565,6 @@ def _encode( path, df, cat_cache, - na_sentinel=-1, freq_threshold=0, search_sorted=False, buckets=None, @@ -1305,7 +1572,8 @@ def _encode( cat_names=None, max_size=0, dtype=None, - start_index=0, + split_out=1, + single_table=False, ): """The _encode method is responsible for transforming a dataframe by taking the written out vocabulary file and looking up values to translate inputs to numeric @@ -1318,8 +1586,6 @@ def _encode( path : str df : DataFrame cat_cache : - na_sentinel : int - Sentinel for NA value. Defaults to -1. freq_threshold : int Categories with a count or frequency below this threshold will be omitted from the encoding and corresponding data will be @@ -1336,11 +1602,6 @@ def _encode( Defaults to 0. dtype : Defaults to None. - start_index : int - The index to start outputting categorical values to. This is useful - to, for instance, reserve an initial segment of non-negative - integers for out-of-vocabulary or other special values. Defaults - to 1. Returns ------- @@ -1349,16 +1610,25 @@ def _encode( """ if isinstance(buckets, int): buckets = {name: buckets for name in cat_names} - # this is to apply freq_hashing logic - if max_size: - freq_threshold = 1 value = None selection_l = ColumnSelector(name if isinstance(name, list) else [name]) selection_r = ColumnSelector(name if isinstance(name, list) else [storage_name]) list_col = is_list_col(selection_l, df) + + # Find number of oov buckets + if buckets and storage_name in buckets: + num_oov_buckets = buckets[storage_name] + search_sorted = False + else: + num_oov_buckets = 1 + if path: - read_pq_func = dispatch.read_parquet_dispatch(df) - if cat_cache is not None: + read_pq_func = dispatch.read_dispatch( + df, + fmt="parquet", + collection=split_out > 1, + ) + if cat_cache is not None and split_out == 1: cat_cache = ( cat_cache if isinstance(cat_cache, str) else cat_cache.get(storage_name, "disk") ) @@ -1372,22 +1642,60 @@ def _encode( cats_only=True, reader=read_pq_func, ) + if len(value) and value["labels"].iloc[0] < OOV_OFFSET + num_oov_buckets: + # See: https://github.com/rapidsai/cudf/issues/12837 + value["labels"] += OOV_OFFSET + num_oov_buckets else: value = read_pq_func( # pylint: disable=unexpected-keyword-arg - path, columns=selection_r.names + path, + columns=selection_r.names, + **({"split_row_groups": False} if split_out > 1 else {}), ) - value.index.name = "labels" - value.reset_index(drop=False, inplace=True) + + value.index = value.index.rename("labels") + if split_out > 1: + value = value.reset_index(drop=False) + if type(df).__module__.split(".")[0] == "cudf": + # `cudf.read_parquet` may drop the RangeIndex, so we need + # to use the parquet metadata to set a proper RangeIndex. + # We can avoid this workaround for cudf>=23.04 + # (See: https://github.com/rapidsai/cudf/issues/12837) + ranges, size = [], OOV_OFFSET + num_oov_buckets + for file_frag in pa_ds.dataset(path, format="parquet").get_fragments(): + part_size = file_frag.metadata.num_rows + ranges.append((size, size + part_size)) + size += part_size + value["labels"] = dd.from_map(lambda r: pd.RangeIndex(*r), ranges) + else: + value.reset_index(drop=False, inplace=True) if value is None: value = type(df)() for c in selection_r.names: typ = df[selection_l.names[0]].dtype if len(selection_l.names) == 1 else df[c].dtype value[c] = nullable_series([None], df, typ) - value.index.name = "labels" + value.index = value.index.rename("labels") value.reset_index(drop=False, inplace=True) - if not search_sorted: + use_collection = isinstance(value, DaskDataFrame) + if use_collection and value.npartitions == 1: + # Use simple merge for single-partition case + value = _compute_sync(value) + use_collection = False + + # Determine encoding offsets + null_encoding_offset = value["labels"].head(1).iloc[0] if single_table else NULL_OFFSET + bucket_encoding_offset = null_encoding_offset + 1 # 2 (if not single_table) + distinct_encoding_offset = bucket_encoding_offset + num_oov_buckets + + # Determine indices of "real" null values + # (these will always be encoded to `1`) + expr = df[selection_l.names[0]].isna() + for _name in selection_l.names[1:]: + expr = expr & df[_name].isna() + nulls = df[expr].index + + if use_collection or not search_sorted: if list_col: codes = dispatch.flatten_list_column(df[selection_l.names[0]]) codes["order"] = dispatch.arange(len(codes), like_df=df) @@ -1402,44 +1710,98 @@ def _encode( else: codes[cl] = df[cl].copy().astype(value[cr].dtype) + indistinct = bucket_encoding_offset if buckets and storage_name in buckets: - na_sentinel = _hash_bucket(df, buckets, selection_l.names, encode_type=encode_type) + # apply hashing for "infrequent" categories + indistinct = ( + _hash_bucket(df, buckets, selection_l.names, encode_type=encode_type) + + bucket_encoding_offset + ) + + if use_collection: + # Manual broadcast merge + merged_df = _concat( + [ + codes.merge( + _compute_sync(part), + left_on=selection_l.names, + right_on=selection_r.names, + how="left", + ).dropna(subset=["labels"]) + for part in value.partitions + ], + ignore_index=False, + ).sort_values("order") + else: + merged_df = codes.merge( + value, left_on=selection_l.names, right_on=selection_r.names, how="left" + ).sort_values("order") - # apply frequency hashing - if freq_threshold and buckets and storage_name in buckets: - merged_df = codes.merge( - value, left_on=selection_l.names, right_on=selection_r.names, how="left" - ).sort_values("order") merged_df.reset_index(drop=True, inplace=True) - max_id = merged_df["labels"].max() - merged_df["labels"].fillna( - df._constructor_sliced(na_sentinel + max_id + 1), inplace=True - ) - labels = merged_df["labels"].values - # only do hashing - elif buckets and storage_name in buckets: - labels = na_sentinel - # no hashing + if len(merged_df) < len(codes): + # Missing nulls + labels = df._constructor_sliced(indistinct) + labels.iloc[merged_df["order"]] = merged_df["labels"] + labels = labels.values + else: + merged_df["labels"].fillna(df._constructor_sliced(indistinct), inplace=True) + labels = merged_df["labels"].values else: - na_sentinel = 0 - labels = codes.merge( - value, left_on=selection_l.names, right_on=selection_r.names, how="left" - ).sort_values("order")["labels"] - labels.fillna(na_sentinel, inplace=True) + # no hashing + if use_collection: + # Manual broadcast merge + merged_df = _concat( + [ + codes.merge( + _compute_sync(part), + left_on=selection_l.names, + right_on=selection_r.names, + how="left", + ).dropna(subset=["labels"]) + for part in value.partitions + ], + ignore_index=True, + ) + if len(merged_df) < len(codes): + # Missing nulls + labels = codes._constructor_sliced( + np.full( + len(codes), + indistinct, + like=merged_df["labels"].values, + ), + ) + labels.iloc[merged_df["order"]] = merged_df["labels"] + else: + labels = merged_df.sort_values("order")["labels"].reset_index(drop=True) + else: + labels = codes.merge( + value, left_on=selection_l.names, right_on=selection_r.names, how="left" + ).sort_values("order")["labels"] + labels.fillna(indistinct, inplace=True) labels = labels.values else: # Use `searchsorted` if we are using a "full" encoding if list_col: - labels = value[selection_r.names].searchsorted( - df[selection_l.names[0]].list.leaves, side="left", na_position="first" + labels = ( + value[selection_r.names].searchsorted( + df[selection_l.names[0]].list.leaves, side="left", na_position="first" + ) + + distinct_encoding_offset ) else: - labels = value[selection_r.names].searchsorted( - df[selection_l.names], side="left", na_position="first" + labels = ( + value[selection_r.names].searchsorted( + df[selection_l.names], side="left", na_position="first" + ) + + distinct_encoding_offset ) - labels[labels >= len(value[selection_r.names])] = na_sentinel + labels[labels >= len(value[selection_r.names])] = bucket_encoding_offset - labels = labels + start_index + # Make sure nulls are encoded to `null_encoding_offset` + # (This should be `1` in most casese) + if len(nulls): + labels[nulls] = null_encoding_offset if list_col: labels = dispatch.encode_list_column(df[selection_l.names[0]], labels, dtype=dtype) @@ -1512,12 +1874,35 @@ def _copy_storage(existing_stats, existing_path, new_path, copy): def _reset_df_index(col_name, cat_file_path, idx_count): - cat_df = dispatch.read_parquet_dispatch(None)(cat_file_path) + cat_df = _compute_sync(dispatch.read_dispatch(collection=True)(cat_file_path)) # change indexes for category - cat_df.index += idx_count + cat_df.index = cat_df.index + idx_count # update count idx_count += cat_df.shape[0] # save the new indexes in file - new_cat_file_path = Path(cat_file_path).parent / f"unique.{col_name}.all.parquet" - cat_df.to_parquet(new_cat_file_path) + new_cat_file_path = _save_encodings( + cat_df, + Path(cat_file_path).parent, + col_name, + preserve_index=True, + ) return idx_count, new_cat_file_path + + +def _deprecate_tree_width(tree_width): + # Warn user if tree_width is specified + if tree_width is not None: + warnings.warn( + "The tree_width argument is now deprecated, and will be ignored. " + "Please use split_out and split_every.", + FutureWarning, + ) + + +def _compute_sync(collection): + # Simple utility to compute a dask collection with + # a synchronous scheduler (and to catch warnings + # that are intended for users doing this by accident) + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", message="Running on a single-machine scheduler.*") + return collection.compute(scheduler="synchronous") diff --git a/nvtabular/ops/drop_low_cardinality.py b/nvtabular/ops/drop_low_cardinality.py index 5b2340e944..a4e88f70f0 100644 --- a/nvtabular/ops/drop_low_cardinality.py +++ b/nvtabular/ops/drop_low_cardinality.py @@ -25,7 +25,7 @@ class DropLowCardinality(Operator): first encoding these columns using Categorify. """ - def __init__(self, min_cardinality=2): + def __init__(self, min_cardinality=4): super().__init__() self.min_cardinality = min_cardinality diff --git a/nvtabular/ops/join_groupby.py b/nvtabular/ops/join_groupby.py index d6cb4b0e9a..ed168e3e00 100644 --- a/nvtabular/ops/join_groupby.py +++ b/nvtabular/ops/join_groupby.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # - import dask.dataframe as dd import numpy as np import pandas as pd @@ -63,12 +62,15 @@ class JoinGroupby(StatOperator): that "count" corresponds to the group itself, while all other statistics correspond to a specific continuous column. Supported statistics include ["count", "sum", "mean", "std", "var"]. - tree_width : dict or int, optional - Tree width of the hash-based groupby reduction for each categorical - column. High-cardinality columns may require a large `tree_width`, - while low-cardinality columns can likely use `tree_width=1`. + split_out : dict or int, optional + Number of files needed to store the final result of each groupby + reduction. High-cardinality groups may require a large `split_out`, + while low-cardinality columns can likely use `split_out=1` (default). If passing a dict, each key and value should correspond to the column - name and width, respectively. The default value is 8 for all columns. + name and value, respectively. The default value is 1 for all columns. + split_every : dict or int, optional + Number of adjacent partitions to aggregate in each tree-reduction + node. The default value is 8 for all columns. cat_cache: ToDo Describe TEXT out_path : str, optional @@ -88,22 +90,26 @@ def __init__( self, cont_cols=None, stats=("count",), - tree_width=None, + split_out=None, + split_every=None, cat_cache="host", out_path=None, on_host=True, name_sep="_", + tree_width=None, ): super().__init__() self.storage_name = {} self.name_sep = name_sep self.stats = stats - self.tree_width = tree_width + self.split_out = split_out + self.split_every = split_every self.out_path = out_path or "./" self.on_host = on_host self.cat_cache = cat_cache self.categories = {} + nvt_cat._deprecate_tree_width(tree_width) self._cont_names = None @@ -153,10 +159,11 @@ def fit(self, col_selector: ColumnSelector, ddf: dd.DataFrame): self.stats, self.out_path, 0, - self.tree_width, + self.split_out, self.on_host, concat_groups=False, name_sep=self.name_sep, + split_every=self.split_every, ), ) return Delayed(key, dsk) diff --git a/nvtabular/ops/target_encoding.py b/nvtabular/ops/target_encoding.py index 6b71b0892f..eb76dc5931 100644 --- a/nvtabular/ops/target_encoding.py +++ b/nvtabular/ops/target_encoding.py @@ -97,12 +97,15 @@ class TargetEncoding(StatOperator): elements must be unique). out_dtype : str, default is problem-specific dtype of output target-encoding columns. - tree_width : dict or int, optional - Tree width of the hash-based groupby reduction for each categorical - column. High-cardinality columns may require a large `tree_width`, - while low-cardinality columns can likely use `tree_width=1`. + split_out : dict or int, optional + Number of files needed to store the final result of each groupby + reduction. High-cardinality groups may require a large `split_out`, + while low-cardinality columns can likely use `split_out=1` (default). If passing a dict, each key and value should correspond to the column - name and width, respectively. The default value is 8 for all columns. + name and value, respectively. The default value is 1 for all columns. + split_every : dict or int, optional + Number of adjacent partitions to aggregate in each tree-reduction + node. The default value is 8 for all columns. cat_cache : {"device", "host", "disk"} or dict Location to cache the list of unique categories for each categorical column. If passing a dict, each key and value @@ -132,12 +135,14 @@ def __init__( p_smooth=20, out_col=None, out_dtype=None, - tree_width=None, + split_out=None, + split_every=None, cat_cache="host", out_path=None, on_host=True, name_sep="_", drop_folds=True, + tree_width=None, ): super().__init__() @@ -151,7 +156,8 @@ def __init__( self.p_smooth = p_smooth self.out_col = [out_col] if isinstance(out_col, str) else out_col self.out_dtype = out_dtype - self.tree_width = tree_width + self.split_out = split_out + self.split_every = split_every self.out_path = out_path or "./" self.on_host = on_host self.cat_cache = cat_cache @@ -160,6 +166,7 @@ def __init__( self.fold_name = "__fold__" self.stats = {} self.means = {} # TODO: just update target_mean? + nvt_cat._deprecate_tree_width(tree_width) def fit(self, col_selector: ColumnSelector, ddf: dd.DataFrame): moments = None @@ -197,10 +204,11 @@ def fit(self, col_selector: ColumnSelector, ddf: dd.DataFrame): ["count", "sum"], self.out_path, 0, - self.tree_width, + self.split_out, self.on_host, concat_groups=False, name_sep=self.name_sep, + split_every=self.split_every, ), ) return Delayed(key, dsk), moments diff --git a/tests/unit/ops/test_categorify.py b/tests/unit/ops/test_categorify.py index 80a52c06a3..41a69ef346 100644 --- a/tests/unit/ops/test_categorify.py +++ b/tests/unit/ops/test_categorify.py @@ -26,6 +26,7 @@ from merlin.core.dispatch import make_df from nvtabular import ColumnSelector, ops from nvtabular.ops.categorify import get_embedding_sizes +from tests.conftest import assert_eq if cudf: _CPU = [True, False] @@ -77,9 +78,9 @@ def test_categorify_size(tmpdir, cpu, include_nulls, cardinality_memory_limit): } else: # Ignore first element if it is NaN - if vocab["session_id"].iloc[:1].isna().any(): - session_id = vocab["session_id"].iloc[1:] - session_id_size = vocab["session_id_size"].iloc[1:] + if vocab["session_id"].iloc[:2].isna().any(): + session_id = vocab["session_id"].iloc[2:] + session_id_size = vocab["session_id_size"].iloc[2:] else: session_id = vocab["session_id"] session_id_size = vocab["session_id_size"] @@ -112,14 +113,12 @@ def test_na_value_count(tmpdir): workflow.fit(train_dataset) workflow.transform(train_dataset).to_ddf().compute() - single_cat = dispatch.read_dispatch("./categories/unique.brand.parquet")( - "./categories/unique.brand.parquet" - ) - second_cat = dispatch.read_dispatch("./categories/unique.productID.parquet")( - "./categories/unique.productID.parquet" - ) - assert single_cat["brand_size"][0] == 5 - assert second_cat["productID_size"][0] == 3 + single_meta = dispatch.read_dispatch(fmt="parquet")("./categories/meta.brand.parquet") + second_meta = dispatch.read_dispatch(fmt="parquet")("./categories/meta.productID.parquet") + assert single_meta["kind"].iloc[1] == "null" + assert single_meta["num_observed"].iloc[1] == 5 + assert second_meta["kind"].iloc[1] == "null" + assert second_meta["num_observed"].iloc[1] == 3 @pytest.mark.parametrize("freq_threshold", [0, 1, 2]) @@ -153,51 +152,9 @@ def test_categorify_lists(tmpdir, freq_threshold, cpu, dtype, vocabs): compare = df_out["Authors"].to_arrow().to_pylist() if freq_threshold < 2 or vocabs is not None: - assert compare == [[1], [1, 4], [3, 2], [2]] - else: - assert compare == [[1], [1, 0], [0, 2], [2]] - - -@pytest.mark.parametrize("cpu", _CPU) -@pytest.mark.parametrize("start_index", [1, 2, 16]) -def test_categorify_lists_with_start_index(tmpdir, cpu, start_index): - df = dispatch.make_df( - { - "Authors": [["User_A"], ["User_A", "User_E"], ["User_B", "User_C"], ["User_C"]], - "Engaging User": ["User_B", "User_B", "User_A", "User_D"], - "Post": [1, 2, 3, 4], - } - ) - cat_names = ["Authors", "Engaging User"] - label_name = ["Post"] - dataset = nvt.Dataset(df, cpu=cpu) - cat_features = cat_names >> ops.Categorify(out_path=str(tmpdir), start_index=start_index) - processor = nvt.Workflow(cat_features + label_name) - processor.fit(dataset) - df_out = processor.transform(dataset).to_ddf().compute() - - if cpu: - compare = [list(row) for row in df_out["Authors"].tolist()] + assert compare == [[3], [3, 6], [5, 4], [4]] else: - compare = df_out["Authors"].to_arrow().to_pylist() - - # Note that start_index is the start_index of the range of encoding, which - # includes both an initial value for the encoding for out-of-vocabulary items, - # as well as the values for the rest of the in-vocabulary items. - # In this group of tests below, there are no out-of-vocabulary items, so our start index - # value does not appear in the expected comparison object. - if start_index == 0: - assert compare == [[1], [1, 4], [3, 2], [2]] - elif start_index == 1: - assert compare == [[2], [2, 5], [4, 3], [3]] - elif start_index == 16: - assert compare == [[17], [17, 20], [19, 18], [18]] - - # We expect five entries in the embedding size, one for each author, - # plus start_index many additional entries for our offset start_index. - embeddings = nvt.ops.get_embedding_sizes(processor) - - assert embeddings[1]["Authors"][0] == (5 + start_index) + assert compare == [[3], [3, 2], [2, 4], [4]] @pytest.mark.parametrize("cat_names", [[["Author", "Engaging User"]], ["Author", "Engaging User"]]) @@ -234,8 +191,8 @@ def test_categorify_multi(tmpdir, cat_names, kind, cpu): else df_out["Engaging User"].to_arrow().to_pylist() ) # again userB has highest frequency given lowest encoding - assert compare_authors == [2, 5, 1, 3] - assert compare_engaging == [1, 1, 2, 4] + assert compare_authors == [4, 7, 3, 5] + assert compare_engaging == [3, 3, 4, 6] else: # Column combinations are encoded compare_engaging = ( @@ -243,7 +200,7 @@ def test_categorify_multi(tmpdir, cat_names, kind, cpu): if cpu else df_out["Author_Engaging User"].to_arrow().to_pylist() ) - assert compare_engaging == [1, 4, 2, 3] + assert compare_engaging == [3, 6, 4, 5] else: # Columns are encoded independently compare_authors = ( @@ -254,9 +211,9 @@ def test_categorify_multi(tmpdir, cat_names, kind, cpu): if cpu else df_out["Engaging User"].to_arrow().to_pylist() ) - assert compare_authors == [1, 4, 2, 3] + assert compare_authors == [3, 6, 4, 5] # User B is first in frequency based ordering - assert compare_engaging == [1, 1, 2, 3] + assert compare_engaging == [3, 3, 4, 5] @pytest.mark.parametrize("cpu", _CPU) @@ -279,9 +236,9 @@ def test_categorify_multi(tmpdir, cat_names, kind, cpu): "Engaging User": ["User_C", "User_B", "User_A", "User_D"], "Post": [1, 2, 3, 4], }, - "expected_a": [1, 3, 1, 2], - "expected_e": [3, 2, 1, 4], - "expected_ae": [2, 4, 1, 3], + "expected_a": [3, 5, 3, 4], + "expected_e": [5, 4, 3, 6], + "expected_ae": [4, 6, 3, 5], }, # dupes in both Engaging user { @@ -290,9 +247,9 @@ def test_categorify_multi(tmpdir, cat_names, kind, cpu): "Engaging User": ["User_B", "User_B", "User_A", "User_D"], "Post": [1, 2, 3, 4], }, - "expected_a": [1, 4, 2, 3], - "expected_e": [1, 1, 2, 3], - "expected_ae": [1, 4, 2, 3], + "expected_a": [3, 6, 4, 5], + "expected_e": [3, 3, 4, 5], + "expected_ae": [3, 6, 4, 5], }, # dupes in both Author and Engaging User { @@ -301,9 +258,9 @@ def test_categorify_multi(tmpdir, cat_names, kind, cpu): "Engaging User": ["User_B", "User_B", "User_A", "User_D"], "Post": [1, 2, 3, 4], }, - "expected_a": [1, 3, 2, 1], - "expected_e": [1, 1, 2, 3], - "expected_ae": [2, 4, 1, 3], + "expected_a": [3, 5, 4, 3], + "expected_e": [3, 3, 4, 5], + "expected_ae": [4, 6, 3, 5], }, # dupes in both, lining up { @@ -312,9 +269,9 @@ def test_categorify_multi(tmpdir, cat_names, kind, cpu): "Engaging User": ["User_A", "User_B", "User_C", "User_C"], "Post": [1, 2, 3, 4], }, - "expected_a": [2, 3, 1, 1], - "expected_e": [2, 3, 1, 1], - "expected_ae": [1, 2, 3, 3], + "expected_a": [4, 5, 3, 3], + "expected_e": [4, 5, 3, 3], + "expected_ae": [4, 5, 3, 3], }, # no dupes { @@ -323,9 +280,9 @@ def test_categorify_multi(tmpdir, cat_names, kind, cpu): "Engaging User": ["User_C", "User_B", "User_A", "User_D"], "Post": [1, 2, 3, 4], }, - "expected_a": [3, 4, 2, 1], - "expected_e": [3, 2, 1, 4], - "expected_ae": [3, 4, 2, 1], + "expected_a": [5, 6, 4, 3], + "expected_e": [5, 4, 3, 6], + "expected_ae": [5, 6, 4, 3], }, # Include null value { @@ -334,9 +291,9 @@ def test_categorify_multi(tmpdir, cat_names, kind, cpu): "Engaging User": ["User_C", "User_B", "User_A", "User_D"], "Post": [1, 2, 3, 4], }, - "expected_a": [0, 3, 2, 1], - "expected_e": [3, 2, 1, 4], - "expected_ae": [1, 4, 3, 2], + "expected_a": [1, 5, 4, 3], + "expected_e": [5, 4, 3, 6], + "expected_ae": [3, 6, 5, 4], }, ], ) @@ -423,39 +380,49 @@ def test_categorify_freq_limit(tmpdir, freq_limit, buckets, search_sort, cpu): .compute(scheduler="synchronous") ) + # Check size statistics add up to len(df) + for col in ["Author", "Engaging User"]: + check_meta = dispatch.read_dispatch(fmt="parquet")( + str(tmpdir) + f"/categories/meta.{col}.parquet" + ) + assert check_meta["num_observed"].sum() == len(df) + + null, oov = 1, 1 + unique = {"Author": 5, "Engaging User": 4} + freq_limited = {"Author": 2, "Engaging User": 1} if freq_limit and not buckets: # Column combinations are encoded if isinstance(freq_limit, dict): - assert df_out["Author"].max() == 2 - assert df_out["Engaging User"].max() == 1 + assert df_out["Author"].max() == null + oov + freq_limited["Author"] + assert df_out["Engaging User"].max() == null + oov + freq_limited["Engaging User"] else: assert len(df["Author"].unique()) == df_out["Author"].max() assert len(df["Engaging User"].unique()) == df_out["Engaging User"].max() elif not freq_limit and buckets: if isinstance(buckets, dict): - assert df_out["Author"].max() <= 9 - assert df_out["Engaging User"].max() <= 19 + assert df_out["Author"].max() <= null + buckets["Author"] + unique["Author"] + assert ( + df_out["Engaging User"].max() + <= null + buckets["Engaging User"] + unique["Engaging User"] + ) else: - assert df_out["Author"].max() <= 9 - assert df_out["Engaging User"].max() <= 9 + assert df_out["Author"].max() <= null + buckets + unique["Author"] + assert df_out["Engaging User"].max() <= null + buckets + unique["Engaging User"] elif freq_limit and buckets: if ( isinstance(buckets, dict) - and isinstance(buckets, dict) + and isinstance(freq_limit, dict) and not isinstance(df, pd.DataFrame) ): - assert ( - df_out["Author"].max() - <= (df["Author"].hash_values() % buckets["Author"]).max() + 2 + 1 - ) + assert df_out["Author"].max() <= null + freq_limited["Author"] + buckets["Author"] assert ( df_out["Engaging User"].max() - <= (df["Engaging User"].hash_values() % buckets["Engaging User"]).max() + 1 + 1 + <= null + freq_limited["Engaging User"] + buckets["Engaging User"] ) @pytest.mark.parametrize("cpu", _CPU) -def test_categorify_hash_bucket(cpu): +def test_categorify_hash_bucket_only(cpu): df = dispatch.make_df( { "Authors": ["User_A", "User_A", "User_E", "User_B", "User_C"], @@ -465,18 +432,19 @@ def test_categorify_hash_bucket(cpu): ) cat_names = ["Authors", "Engaging_User"] buckets = 10 + max_size = buckets + 2 # Must include pad and null indices dataset = nvt.Dataset(df, cpu=cpu) - hash_features = cat_names >> ops.Categorify(num_buckets=buckets) + hash_features = cat_names >> ops.Categorify(num_buckets=buckets, max_size=max_size) processor = nvt.Workflow(hash_features) processor.fit(dataset) new_gdf = processor.transform(dataset).to_ddf().compute() # check hashed values - assert new_gdf["Authors"].max() <= (buckets - 1) - assert new_gdf["Engaging_User"].max() <= (buckets - 1) + assert new_gdf["Authors"].max() <= max_size + assert new_gdf["Engaging_User"].max() <= max_size # check embedding size is equal to the num_buckets after hashing - assert nvt.ops.get_embedding_sizes(processor)["Authors"][0] == buckets - assert nvt.ops.get_embedding_sizes(processor)["Engaging_User"][0] == buckets + assert nvt.ops.get_embedding_sizes(processor)["Authors"][0] == max_size + assert nvt.ops.get_embedding_sizes(processor)["Engaging_User"][0] == max_size @pytest.mark.parametrize("max_emb_size", [6, {"Author": 8, "Engaging_User": 7}]) @@ -524,18 +492,18 @@ def test_categorify_max_size(max_emb_size): max_emb_size = {name: max_emb_size for name in cat_names} # check encoded values after freq_hashing with fix emb size - assert new_gdf["Author"].max() <= max_emb_size["Author"] - assert new_gdf["Engaging_User"].max() <= max_emb_size["Engaging_User"] + assert new_gdf["Author"].max() <= max_emb_size["Author"] + 1 + assert new_gdf["Engaging_User"].max() <= max_emb_size["Engaging_User"] + 1 # check embedding size is less than max_size after hashing with fix emb size. embedding_sizes = nvt.ops.get_embedding_sizes(processor) - assert embedding_sizes["Author"][0] <= max_emb_size["Author"] - assert embedding_sizes["Engaging_User"][0] <= max_emb_size["Engaging_User"] + assert embedding_sizes["Author"][0] <= max_emb_size["Author"] + 1 + assert embedding_sizes["Engaging_User"][0] <= max_emb_size["Engaging_User"] + 1 # make sure we can also get embedding sizes from the workflow_node embedding_sizes = nvt.ops.get_embedding_sizes(cat_features) - assert embedding_sizes["Author"][0] <= max_emb_size["Author"] - assert embedding_sizes["Engaging_User"][0] <= max_emb_size["Engaging_User"] + assert embedding_sizes["Author"][0] <= max_emb_size["Author"] + 1 + assert embedding_sizes["Engaging_User"][0] <= max_emb_size["Engaging_User"] + 1 def test_categorify_single_table(): @@ -553,7 +521,7 @@ def test_categorify_single_table(): processor.fit(dataset) new_gdf = processor.transform(dataset).to_ddf().compute() - old_max = 0 + old_max = 1 for name in cat_names: curr_min = new_gdf[name].min() assert old_max <= curr_min @@ -569,7 +537,7 @@ def test_categorify_embedding_sizes(dataset, engine): workflow = nvt.Workflow(cat_1 + cat_2) workflow.fit_transform(dataset) - assert get_embedding_sizes(workflow) == {"name-cat": (27, 16), "name-string_test": (27, 16)} + assert get_embedding_sizes(workflow) == {"name-cat": (29, 16), "name-string_test": (29, 16)} def test_categorify_no_nulls(): @@ -583,9 +551,9 @@ def test_categorify_no_nulls(): workflow = nvt.Workflow(["user_id", "item_id"] >> ops.Categorify()) workflow.fit(nvt.Dataset(df)) - df = pd.read_parquet("./categories/unique.user_id.parquet") - assert df["user_id"].iloc[:1].isnull().any() - assert df["user_id_size"][0] == 0 + df = pd.read_parquet("./categories/meta.user_id.parquet") + assert df["kind"].iloc[1] == "null" + assert df["num_observed"].iloc[1] == 0 @pytest.mark.parametrize("cat_names", [[["Author", "Engaging User"]], ["Author", "Engaging User"]]) @@ -654,15 +622,15 @@ def test_categorify_max_size_null_iloc_check(): workflow = nvt.Workflow(cat_features) workflow.fit(train_dataset) workflow.transform(train_dataset) - # read back the unique categories - unique_C1 = pd.read_parquet("./categories/unique.C1.parquet") - assert str(unique_C1["C1"].iloc[0]) in ["", "nan"] - assert unique_C1["C1_size"].iloc[0] == 5 + # read back the C1 encoding metadata + meta_C1 = pd.read_parquet("./categories/meta.C1.parquet") + assert meta_C1["kind"].iloc[1] == "null" + assert meta_C1["num_observed"].iloc[1] == 5 - # read back the unique categories - unique_C2 = pd.read_parquet("./categories/unique.C2.parquet") - assert str(unique_C2["C2"].iloc[0]) in ["", "nan"] - assert unique_C2["C2_size"].iloc[0] == 0 + # read back the C2 encoding metadata + meta_C2 = pd.read_parquet("./categories/meta.C2.parquet") + assert meta_C2["kind"].iloc[1] == "null" + assert meta_C2["num_observed"].iloc[1] == 0 @pytest.mark.parametrize("cpu", _CPU) @@ -693,8 +661,51 @@ def test_categorify_joint_list(cpu): else df_out["Engaging User"].explode().dropna().to_arrow().to_pylist() ) - assert compare_a == [1, 5, 2, 3] - assert compare_e == [2, 3, 1, 4, 1] + assert compare_a == [3, 7, 4, 5] + assert compare_e == [4, 5, 3, 6, 3] + + +@pytest.mark.parametrize("cpu", _CPU) +@pytest.mark.parametrize("split_out", [2, 3]) +@pytest.mark.parametrize("max_size", [0, 6]) +@pytest.mark.parametrize("buckets", [None, 3]) +def test_categorify_split_out(tmpdir, cpu, split_out, max_size, buckets): + # Test that the result of split_out>1 is + # equivalent to that of split_out=1 + df = make_df({"user_id": [1, 2, 3, 4, 6, 8, 5, 3] * 10}) + dataset = nvt.Dataset(df, cpu=cpu) + + kwargs = dict( + max_size=max_size, + num_buckets=buckets, + out_path=str(tmpdir), + ) + check_path = "/".join([str(tmpdir), "categories/unique.user_id.parquet"]) + + workflow_1 = nvt.Workflow(["user_id"] >> ops.Categorify(split_out=1, **kwargs)) + workflow_1.fit(dataset) + cats_1 = dispatch.read_dispatch(fmt="parquet")(check_path) + result_1 = workflow_1.transform(dataset).compute() + + workflow_n = nvt.Workflow(["user_id"] >> ops.Categorify(split_out=split_out, **kwargs)) + workflow_n.fit(dataset) + cats_n = dispatch.read_dispatch(fmt="parquet", collection=True)(check_path).compute( + scheduler="synchronous" + ) + result_n = workflow_n.transform(dataset).compute() + + # Make sure categories are the same + # (Note that pandas may convert int64 to float64, + # instead of nullable Int64) + cats_n["user_id"] = cats_n["user_id"].astype(cats_1["user_id"].dtype) + assert_eq(cats_n, cats_1) + + # Check that transform works + assert_eq(result_n, result_1) + + # Check for tree_width FutureWarning + with pytest.warns(FutureWarning): + nvt.Workflow(["user_id"] >> ops.Categorify(tree_width=8)) def test_categorify_inference(): diff --git a/tests/unit/ops/test_drop_low_cardinality.py b/tests/unit/ops/test_drop_low_cardinality.py index 5da70f754b..94b2c25489 100644 --- a/tests/unit/ops/test_drop_low_cardinality.py +++ b/tests/unit/ops/test_drop_low_cardinality.py @@ -44,6 +44,6 @@ def test_drop_low_cardinality(tmpdir, cpu): assert workflow.output_schema.column_names == ["col2", "col3"] expected = df.drop(["col1"], axis=1) - expected["col2"] = [1, 1, 1, 1, 2] - expected["col3"] = [1, 1, 2, 2, 3] + expected["col2"] = [3, 3, 3, 3, 4] + expected["col3"] = [3, 3, 4, 4, 5] assert_eq(transformed, expected) diff --git a/tests/unit/workflow/test_cpu_workflow.py b/tests/unit/workflow/test_cpu_workflow.py index 8086873111..cb6bc9afa0 100644 --- a/tests/unit/workflow/test_cpu_workflow.py +++ b/tests/unit/workflow/test_cpu_workflow.py @@ -56,13 +56,13 @@ def get_norms(tar: pd.Series): cats_expected0 = df["name-cat"].unique() cats0 = get_cats(workflow, "name-cat", cpu=True) # adding the None entry as a string because of move from gpu - assert all(cat in [None] + sorted(cats_expected0.tolist()) for cat in cats0.tolist()) - assert len(cats0.tolist()) == len(cats_expected0.tolist() + [None]) + assert all(cat in sorted(cats_expected0.tolist()) for cat in cats0.tolist()) + assert len(cats0.tolist()) == len(cats_expected0.tolist()) cats_expected1 = df["name-string"].unique() cats1 = get_cats(workflow, "name-string", cpu=True) # adding the None entry as a string because of move from gpu - assert all(cat in [None] + sorted(cats_expected1.tolist()) for cat in cats1.tolist()) - assert len(cats1.tolist()) == len(cats_expected1.tolist() + [None]) + assert all(cat in sorted(cats_expected1.tolist()) for cat in cats1.tolist()) + assert len(cats1.tolist()) == len(cats_expected1.tolist()) # Write to new "shuffled" and "processed" dataset workflow.transform(dataset).to_parquet( diff --git a/tests/unit/workflow/test_workflow.py b/tests/unit/workflow/test_workflow.py index 621719f737..ff6b57a410 100755 --- a/tests/unit/workflow/test_workflow.py +++ b/tests/unit/workflow/test_workflow.py @@ -138,8 +138,8 @@ def get_norms(tar): cats_expected0 = df["name-cat"].unique().values_host if HAS_GPU else df["name-cat"].unique() cats0 = get_cats(workflow, "name-cat") # adding the None entry as a string because of move from gpu - assert all(cat in [None] + sorted(cats_expected0.tolist()) for cat in cats0.tolist()) - assert len(cats0.tolist()) == len(cats_expected0.tolist() + [None]) + assert all(cat in sorted(cats_expected0.tolist()) for cat in cats0.tolist()) + assert len(cats0.tolist()) == len(cats_expected0.tolist()) if HAS_GPU: cats_expected1 = ( df["name-string"].unique().values_host if HAS_GPU else df["name-string"].unique() @@ -148,8 +148,8 @@ def get_norms(tar): cats_expected1 = df["name-string"].unique() cats1 = get_cats(workflow, "name-string") # adding the None entry as a string because of move from gpu - assert all(cat in [None] + sorted(cats_expected1.tolist()) for cat in cats1.tolist()) - assert len(cats1.tolist()) == len(cats_expected1.tolist() + [None]) + assert all(cat in sorted(cats_expected1.tolist()) for cat in cats1.tolist()) + assert len(cats1.tolist()) == len(cats_expected1.tolist()) # Write to new "shuffled" and "processed" dataset workflow.transform(dataset).to_parquet( @@ -233,15 +233,15 @@ def get_norms(tar): cats_expected0 = df["name-cat"].unique().values_host if HAS_GPU else df["name-cat"].unique() cats0 = get_cats(workflow, "name-cat") # adding the None entry as a string because of move from gpu - assert all(cat in [None] + sorted(cats_expected0.tolist()) for cat in cats0.tolist()) - assert len(cats0.tolist()) == len(cats_expected0.tolist() + [None]) + assert all(cat in sorted(cats_expected0.tolist()) for cat in cats0.tolist()) + assert len(cats0.tolist()) == len(cats_expected0.tolist()) cats_expected1 = ( df["name-string"].unique().values_host if HAS_GPU else df["name-string"].unique() ) cats1 = get_cats(workflow, "name-string") # adding the None entry as a string because of move from gpu - assert all(cat in [None] + sorted(cats_expected1.tolist()) for cat in cats1.tolist()) - assert len(cats1.tolist()) == len(cats_expected1.tolist() + [None]) + assert all(cat in sorted(cats_expected1.tolist()) for cat in cats1.tolist()) + assert len(cats1.tolist()) == len(cats_expected1.tolist()) # Write to new "shuffled" and "processed" dataset workflow.transform(dataset).to_parquet( @@ -314,15 +314,15 @@ def get_norms(tar): cats_expected0 = df["name-cat"].unique().values_host if HAS_GPU else df["name-cat"].unique() cats0 = get_cats(workflow, "name-cat") # adding the None entry as a string because of move from gpu - assert all(cat in [None] + sorted(cats_expected0.tolist()) for cat in cats0.tolist()) - assert len(cats0.tolist()) == len(cats_expected0.tolist() + [None]) + assert all(cat in sorted(cats_expected0.tolist()) for cat in cats0.tolist()) + assert len(cats0.tolist()) == len(cats_expected0.tolist()) cats_expected1 = ( df["name-string"].unique().values_host if HAS_GPU else df["name-string"].unique() ) cats1 = get_cats(workflow, "name-string") # adding the None entry as a string because of move from gpu - assert all(cat in [None] + sorted(cats_expected1.tolist()) for cat in cats1.tolist()) - assert len(cats1.tolist()) == len(cats_expected1.tolist() + [None]) + assert all(cat in sorted(cats_expected1.tolist()) for cat in cats1.tolist()) + assert len(cats1.tolist()) == len(cats_expected1.tolist()) # Write to new "shuffled" and "processed" dataset workflow.transform(dataset).to_parquet(