diff --git a/nvtabular/ops/categorify.py b/nvtabular/ops/categorify.py index 7fbfa1de774..556e2a005a9 100644 --- a/nvtabular/ops/categorify.py +++ b/nvtabular/ops/categorify.py @@ -1693,7 +1693,7 @@ def _encode( expr = df[selection_l.names[0]].isna() for _name in selection_l.names[1:]: expr = expr & df[_name].isna() - nulls = df[expr].index + nulls = df[expr].index.values if use_collection or not search_sorted: if list_col: @@ -1861,12 +1861,24 @@ def _copy_storage(existing_stats, existing_path, new_path, copy): existing_fs = get_fs_token_paths(existing_path)[0] new_fs = get_fs_token_paths(new_path)[0] new_locations = {} + for column, existing_file in existing_stats.items(): new_file = existing_file.replace(str(existing_path), str(new_path)) if copy and new_file != existing_file: new_fs.makedirs(os.path.dirname(new_file), exist_ok=True) - with new_fs.open(new_file, "wb") as output: - output.write(existing_fs.open(existing_file, "rb").read()) + + # For some ops, the existing "file" is a directory containing `part.N.parquet` files. + # In that case, new_file is actually a directory and we will iterate through the "part" + # files and copy them individually + if os.path.isdir(existing_file): + new_fs.makedirs(new_file, exist_ok=True) + for existing_file_part in existing_fs.ls(existing_file): + new_file_part = os.path.join(new_file, os.path.basename(existing_file_part)) + with new_fs.open(new_file_part, "wb") as output: + output.write(existing_fs.open(existing_file_part, "rb").read()) + else: + with new_fs.open(new_file, "wb") as output: + output.write(existing_fs.open(existing_file, "rb").read()) new_locations[column] = new_file diff --git a/nvtabular/workflow/workflow.py b/nvtabular/workflow/workflow.py index 41f8b104778..9c5e944a37c 100755 --- a/nvtabular/workflow/workflow.py +++ b/nvtabular/workflow/workflow.py @@ -17,12 +17,13 @@ import inspect import json import logging +import os import sys import time import types import warnings from functools import singledispatchmethod -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Union import cloudpickle import fsspec @@ -141,6 +142,10 @@ def fit_schema(self, input_schema: Schema): self.graph.construct_schema(input_schema) return self + @property + def subworkflows(self): + return list(self.graph.subgraphs.keys()) + @property def input_dtypes(self): return self.graph.input_dtypes @@ -164,6 +169,10 @@ def output_node(self): def _input_columns(self): return self.graph._input_columns() + def get_subworkflow(self, subgraph_name): + subgraph = self.graph.subgraph(subgraph_name) + return Workflow(subgraph.output_node) + def remove_inputs(self, input_cols) -> "Workflow": """Removes input columns from the workflow. @@ -295,12 +304,12 @@ def _getmodules(cls, fs): return [mod for mod in result if mod.__name__ not in exclusions] - def save(self, path, modules_byvalue=None): + def save(self, path: Union[str, os.PathLike], modules_byvalue=None): """Save this workflow to disk Parameters ---------- - path: str + path: Union[str, os.PathLike] The path to save the workflow to modules_byvalue: A list of modules that should be serialized by value. This @@ -314,6 +323,8 @@ def save(self, path, modules_byvalue=None): # avoid a circular import getting the version from nvtabular import __version__ as nvt_version + path = str(path) + fs = fsspec.get_fs_token_paths(path)[0] fs.makedirs(path, exist_ok=True) @@ -385,12 +396,12 @@ def save(self, path, modules_byvalue=None): cloudpickle.unregister_pickle_by_value(sys.modules[m]) @classmethod - def load(cls, path, client=None) -> "Workflow": + def load(cls, path: Union[str, os.PathLike], client=None) -> "Workflow": """Load up a saved workflow object from disk Parameters ---------- - path: str + path: Union[str, os.PathLike] The path to load the workflow from client: distributed.Client, optional The Dask distributed client to use for multi-gpu processing and multi-node processing @@ -403,6 +414,8 @@ def load(cls, path, client=None) -> "Workflow": # avoid a circular import getting the version from nvtabular import __version__ as nvt_version + path = str(path) + fs = fsspec.get_fs_token_paths(path)[0] # check version information from the metadata blob, and warn if we have a mismatch diff --git a/tests/unit/framework_utils/test_tf_layers.py b/tests/unit/framework_utils/test_tf_layers.py index 106be0fa457..38e2778cab7 100644 --- a/tests/unit/framework_utils/test_tf_layers.py +++ b/tests/unit/framework_utils/test_tf_layers.py @@ -318,4 +318,4 @@ def test_multihot_empty_rows(): ) y_hat = model(x).numpy() - np.testing.assert_allclose(y_hat, multi_hot_embedding_rows, rtol=1e-06) + np.testing.assert_allclose(y_hat, multi_hot_embedding_rows, rtol=1e-05) diff --git a/tests/unit/workflow/test_workflow.py b/tests/unit/workflow/test_workflow.py index ff6b57a4103..77009da1354 100755 --- a/tests/unit/workflow/test_workflow.py +++ b/tests/unit/workflow/test_workflow.py @@ -27,9 +27,11 @@ import nvtabular as nvt from merlin.core import dispatch from merlin.core.compat import cudf, dask_cudf -from merlin.core.dispatch import HAS_GPU, make_df +from merlin.core.dispatch import HAS_GPU, create_multihot_col, make_df, make_series from merlin.core.utils import set_dask_client from merlin.dag import ColumnSelector, postorder_iter_nodes +from merlin.dataloader.loader_base import LoaderBase as Loader +from merlin.dataloader.ops.embeddings import EmbeddingOperator from merlin.schema import Tags from nvtabular import Dataset, Workflow, ops from tests.conftest import assert_eq, get_cats, mycols_csv @@ -671,6 +673,43 @@ def test_workflow_saved_schema(tmpdir): assert node.output_schema is not None +def test_stat_op_workflow_roundtrip(tmpdir): + """ + Categorify and TargetEncoding produce intermediate stats files that must be properly + saved and re-loaded. + """ + N = 100 + + df = Dataset( + make_df( + { + "a": np.random.randint(0, 100000, N), + "item_id": np.random.randint(0, 100, N), + "user_id": np.random.randint(0, 100, N), + "click": np.random.randint(0, 2, N), + } + ), + ) + + outputs = ["a"] >> nvt.ops.Categorify() + + continuous = ( + ["user_id", "item_id"] + >> nvt.ops.TargetEncoding(["click"], kfold=1, p_smooth=20) + >> nvt.ops.Normalize() + ) + outputs += continuous + wf = nvt.Workflow(outputs) + + wf.fit(df) + expected = wf.transform(df).compute() + wf.save(tmpdir) + + wf2 = nvt.Workflow.load(tmpdir) + transformed = wf2.transform(df).compute() + assert_eq(transformed, expected) + + def test_workflow_infer_modules_byvalue(tmp_path): module_fn = tmp_path / "not_a_real_module.py" sys.path.append(str(tmp_path)) @@ -737,3 +776,57 @@ def test_workflow_auto_infer_modules_byvalue(tmp_path): os.unlink(str(tmp_path / "not_a_real_module.py")) Workflow.load(str(tmp_path / "identity-workflow")) + + +@pytest.mark.parametrize("cpu", [None, "cpu"] if HAS_GPU else ["cpu"]) +def test_embedding_cat_export_import(tmpdir, cpu): + string_ids = ["alpha", "bravo", "charlie", "delta", "foxtrot"] + training_data = make_df( + { + "string_id": string_ids, + } + ) + training_data["embeddings"] = create_multihot_col( + [0, 5, 10, 15, 20, 25], make_series(np.random.rand(25)) + ) + + cat_op = nvt.ops.Categorify() + + # first workflow that categorifies all data + graph1 = ["string_id"] >> cat_op + emb_res = Workflow(graph1 + ["embeddings"]).fit_transform( + Dataset(training_data, cpu=(cpu is not None)) + ) + npy_path = str(tmpdir / "embeddings.npy") + emb_res.to_npy(npy_path) + + embeddings = np.load(npy_path) + # second workflow that categorifies the embedding table data + df = make_df({"string_id": np.random.choice(string_ids, 30)}) + graph2 = ["string_id"] >> cat_op + train_res = Workflow(graph2).transform(Dataset(df, cpu=(cpu is not None))) + + data_loader = Loader( + train_res, + batch_size=1, + transforms=[ + EmbeddingOperator( + embeddings[:, 1:], + id_lookup_table=embeddings[:, 0].astype(int), + lookup_key="string_id", + ) + ], + shuffle=False, + device=cpu, + ) + origin_df = train_res.to_ddf().merge(emb_res.to_ddf(), on="string_id", how="left").compute() + for idx, batch in enumerate(data_loader): + batch + b_df = batch[0].to_df() + org_df = origin_df.iloc[idx] + if not cpu: + assert (b_df["string_id"].to_numpy() == org_df["string_id"].to_numpy()).all() + assert (b_df["embeddings"].list.leaves == org_df["embeddings"].list.leaves).all() + else: + assert (b_df["string_id"].values == org_df["string_id"]).all() + assert b_df["embeddings"].values[0] == org_df["embeddings"].tolist() diff --git a/tests/unit/workflow/test_workflow_subgraphs.py b/tests/unit/workflow/test_workflow_subgraphs.py new file mode 100644 index 00000000000..047edac2462 --- /dev/null +++ b/tests/unit/workflow/test_workflow_subgraphs.py @@ -0,0 +1,99 @@ +# +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os + +import numpy as np +import pytest +from pandas.api.types import is_integer_dtype + +from merlin.core.utils import set_dask_client +from merlin.dag.ops.subgraph import Subgraph +from nvtabular import Workflow, ops +from tests.conftest import assert_eq + + +@pytest.mark.parametrize("gpu_memory_frac", [0.01, 0.1]) +@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"]) +@pytest.mark.parametrize("dump", [True, False]) +@pytest.mark.parametrize("replace", [True, False]) +def test_workflow_subgraphs(tmpdir, client, df, dataset, gpu_memory_frac, engine, dump, replace): + cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"] + cont_names = ["x", "y", "id"] + label_name = ["label"] + + norms = ops.Normalize() + cat_features = cat_names >> ops.Categorify() + if replace: + cont_features = cont_names >> ops.FillMissing() >> ops.LogOp >> norms + else: + fillmissing_logop = ( + cont_names + >> ops.FillMissing() + >> ops.LogOp + >> ops.Rename(postfix="_FillMissing_1_LogOp_1") + ) + cont_features = cont_names + fillmissing_logop >> norms + + set_dask_client(client=client) + wkflow_ops = Subgraph("cat_graph", cat_features) + Subgraph("cont_graph", cont_features) + workflow = Workflow(wkflow_ops + label_name) + + workflow.fit(dataset) + + if dump: + workflow_dir = os.path.join(tmpdir, "workflow") + workflow.save(workflow_dir) + workflow = None + + workflow = Workflow.load(workflow_dir) + + def get_norms(tar): + ser_median = tar.dropna().quantile(0.5, interpolation="linear") + gdf = tar.fillna(ser_median) + gdf = np.log(gdf + 1) + return gdf + + concat_ops = "_FillMissing_1_LogOp_1" + if replace: + concat_ops = "" + + df_pp = workflow.transform(dataset).to_ddf().compute() + + if engine == "parquet": + assert is_integer_dtype(df_pp["name-cat"].dtype) + assert is_integer_dtype(df_pp["name-string"].dtype) + + subgraph_cat = workflow.get_subworkflow("cat_graph") + subgraph_cont = workflow.get_subworkflow("cont_graph") + assert isinstance(subgraph_cat, Workflow) + assert isinstance(subgraph_cont, Workflow) + # will not be the same nodes of saved out and loaded back + if not dump: + assert subgraph_cat.output_node == cat_features + assert subgraph_cont.output_node == cont_features + # check failure path works as expected + with pytest.raises(ValueError) as exc: + workflow.get_subworkflow("not_exist") + assert "No subgraph named" in str(exc.value) + + # test transform results from subgraph + sub_cat_df = subgraph_cat.transform(dataset).to_ddf().compute() + assert_eq(sub_cat_df, df_pp[cat_names]) + + cont_names = [name + concat_ops for name in cont_names] + sub_cont_df = subgraph_cont.transform(dataset).to_ddf().compute() + assert_eq(sub_cont_df[cont_names], df_pp[cont_names])