From 0d26d977746b22157458c151f75e587b2a1df622 Mon Sep 17 00:00:00 2001 From: Julio Date: Fri, 9 Jun 2023 02:46:06 -0400 Subject: [PATCH 1/4] working subgraphs in workflow --- nvtabular/workflow/workflow.py | 8 ++ .../unit/workflow/test_workflow_subgraphs.py | 129 ++++++++++++++++++ 2 files changed, 137 insertions(+) create mode 100644 tests/unit/workflow/test_workflow_subgraphs.py diff --git a/nvtabular/workflow/workflow.py b/nvtabular/workflow/workflow.py index 909c6bad05..38c4815952 100755 --- a/nvtabular/workflow/workflow.py +++ b/nvtabular/workflow/workflow.py @@ -142,6 +142,10 @@ def fit_schema(self, input_schema: Schema): self.graph.construct_schema(input_schema) return self + @property + def subgraphs(self): + return self.graph.subgraphs.keys() + @property def input_dtypes(self): return self.graph.input_dtypes @@ -165,6 +169,10 @@ def output_node(self): def _input_columns(self): return self.graph._input_columns() + def get_subgraph(self, subgraph_name): + subgraph = self.graph.subgraph(subgraph_name) + return Workflow(subgraph.output_node) + def remove_inputs(self, input_cols) -> "Workflow": """Removes input columns from the workflow. diff --git a/tests/unit/workflow/test_workflow_subgraphs.py b/tests/unit/workflow/test_workflow_subgraphs.py new file mode 100644 index 0000000000..d1521dc211 --- /dev/null +++ b/tests/unit/workflow/test_workflow_subgraphs.py @@ -0,0 +1,129 @@ +# +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import glob +import math +import os + +import numpy as np +import pytest +from pandas.api.types import is_integer_dtype + +import nvtabular as nvt +from merlin.core import dispatch +from merlin.core.dispatch import HAS_GPU +from merlin.core.utils import set_dask_client +from merlin.dag.ops.subgraph import Subgraph +from nvtabular import Dataset, Workflow, ops +from tests.conftest import get_cats + + +@pytest.mark.parametrize("gpu_memory_frac", [0.01, 0.1]) +@pytest.mark.parametrize("engine", ["parquet", "csv", "csv-no-header"]) +@pytest.mark.parametrize("dump", [True, False]) +@pytest.mark.parametrize("replace", [True, False]) +def test_workflow_subgraphs(tmpdir, client, df, dataset, gpu_memory_frac, engine, dump, replace): + cat_names = ["name-cat", "name-string"] if engine == "parquet" else ["name-string"] + cont_names = ["x", "y", "id"] + label_name = ["label"] + + norms = ops.Normalize() + cat_features = cat_names >> ops.Categorify() + if replace: + cont_features = cont_names >> ops.FillMissing() >> ops.LogOp >> norms + else: + fillmissing_logop = ( + cont_names + >> ops.FillMissing() + >> ops.LogOp + >> ops.Rename(postfix="_FillMissing_1_LogOp_1") + ) + cont_features = cont_names + fillmissing_logop >> norms + + set_dask_client(client=client) + wkflow_ops = Subgraph("cat_graph", cat_features) + Subgraph("cont_graph", cont_features) + workflow = Workflow(wkflow_ops + label_name) + + workflow.fit(dataset) + + if dump: + workflow_dir = os.path.join(tmpdir, "workflow") + workflow.save(workflow_dir) + workflow = None + + workflow = Workflow.load(workflow_dir) + + def get_norms(tar): + ser_median = tar.dropna().quantile(0.5, interpolation="linear") + gdf = tar.fillna(ser_median) + gdf = np.log(gdf + 1) + return gdf + + # Check mean and std - No good right now we have to add all other changes; Clip, Log + + concat_ops = "_FillMissing_1_LogOp_1" + if replace: + concat_ops = "" + assert math.isclose(get_norms(df.x).mean(), norms.means["x" + concat_ops], rel_tol=1e-1) + assert math.isclose(get_norms(df.y).mean(), norms.means["y" + concat_ops], rel_tol=1e-1) + + assert math.isclose(get_norms(df.x).std(), norms.stds["x" + concat_ops], rel_tol=1e-1) + assert math.isclose(get_norms(df.y).std(), norms.stds["y" + concat_ops], rel_tol=1e-1) + # Check that categories match + if engine == "parquet": + cats_expected0 = df["name-cat"].unique().values_host if HAS_GPU else df["name-cat"].unique() + cats0 = get_cats(workflow, "name-cat") + # adding the None entry as a string because of move from gpu + assert all(cat in sorted(cats_expected0.tolist()) for cat in cats0.tolist()) + assert len(cats0.tolist()) == len(cats_expected0.tolist()) + cats_expected1 = ( + df["name-string"].unique().values_host if HAS_GPU else df["name-string"].unique() + ) + cats1 = get_cats(workflow, "name-string") + # adding the None entry as a string because of move from gpu + assert all(cat in sorted(cats_expected1.tolist()) for cat in cats1.tolist()) + assert len(cats1.tolist()) == len(cats_expected1.tolist()) + + # Write to new "shuffled" and "processed" dataset + workflow.transform(dataset).to_parquet( + tmpdir, + out_files_per_proc=10, + shuffle=nvt.io.Shuffle.PER_PARTITION, + ) + + dataset_2 = Dataset(glob.glob(str(tmpdir) + "/*.parquet"), part_mem_fraction=gpu_memory_frac) + + df_pp = dispatch.concat(list(dataset_2.to_iter()), axis=0) + + if engine == "parquet": + assert is_integer_dtype(df_pp["name-cat"].dtype) + assert is_integer_dtype(df_pp["name-string"].dtype) + + num_rows, num_row_groups, col_names = dispatch.read_parquet_metadata(str(tmpdir) + "/_metadata") + assert num_rows == len(df_pp) + + subgraph_cat = workflow.get_subgraph("cat_graph") + subgraph_cont = workflow.get_subgraph("cont_graph") + assert isinstance(subgraph_cat, Workflow) + assert isinstance(subgraph_cont, Workflow) + # will not be the same nodes of saved out and loaded back + if not dump: + assert subgraph_cat.output_node == cat_features + assert subgraph_cont.output_node == cont_features + # check failure path works as expected + with pytest.raises(ValueError) as exc: + workflow.get_subgraph("not_exist") + assert "No subgraph named" in str(exc.value) From 96cd7c04203d06216b22d5e4bd9e20df56b2803f Mon Sep 17 00:00:00 2001 From: Julio Date: Fri, 9 Jun 2023 11:04:42 -0400 Subject: [PATCH 2/4] add in subgraph transform to compare --- .../unit/workflow/test_workflow_subgraphs.py | 29 ++++++++----------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/tests/unit/workflow/test_workflow_subgraphs.py b/tests/unit/workflow/test_workflow_subgraphs.py index d1521dc211..72fae8ef47 100644 --- a/tests/unit/workflow/test_workflow_subgraphs.py +++ b/tests/unit/workflow/test_workflow_subgraphs.py @@ -14,7 +14,6 @@ # limitations under the License. # -import glob import math import os @@ -22,13 +21,11 @@ import pytest from pandas.api.types import is_integer_dtype -import nvtabular as nvt -from merlin.core import dispatch from merlin.core.dispatch import HAS_GPU from merlin.core.utils import set_dask_client from merlin.dag.ops.subgraph import Subgraph -from nvtabular import Dataset, Workflow, ops -from tests.conftest import get_cats +from nvtabular import Workflow, ops +from tests.conftest import assert_eq, get_cats @pytest.mark.parametrize("gpu_memory_frac", [0.01, 0.1]) @@ -98,23 +95,12 @@ def get_norms(tar): assert len(cats1.tolist()) == len(cats_expected1.tolist()) # Write to new "shuffled" and "processed" dataset - workflow.transform(dataset).to_parquet( - tmpdir, - out_files_per_proc=10, - shuffle=nvt.io.Shuffle.PER_PARTITION, - ) - - dataset_2 = Dataset(glob.glob(str(tmpdir) + "/*.parquet"), part_mem_fraction=gpu_memory_frac) - - df_pp = dispatch.concat(list(dataset_2.to_iter()), axis=0) + df_pp = workflow.transform(dataset).to_ddf().compute() if engine == "parquet": assert is_integer_dtype(df_pp["name-cat"].dtype) assert is_integer_dtype(df_pp["name-string"].dtype) - num_rows, num_row_groups, col_names = dispatch.read_parquet_metadata(str(tmpdir) + "/_metadata") - assert num_rows == len(df_pp) - subgraph_cat = workflow.get_subgraph("cat_graph") subgraph_cont = workflow.get_subgraph("cont_graph") assert isinstance(subgraph_cat, Workflow) @@ -127,3 +113,12 @@ def get_norms(tar): with pytest.raises(ValueError) as exc: workflow.get_subgraph("not_exist") assert "No subgraph named" in str(exc.value) + + sub_cat_df = subgraph_cat.transform(dataset).to_ddf().compute() + assert assert_eq(sub_cat_df.reset_index(drop=True), df_pp[cat_names].reset_index(drop=True)) + + cont_names = [name + concat_ops for name in cont_names] + sub_cont_df = subgraph_cont.transform(dataset).to_ddf().compute() + assert assert_eq( + sub_cont_df[cont_names].reset_index(drop=True), df_pp[cont_names].reset_index(drop=True) + ) From dcc59ba0bfe80229a054cb01177ebe0340ae84fe Mon Sep 17 00:00:00 2001 From: Julio Date: Fri, 9 Jun 2023 12:04:07 -0400 Subject: [PATCH 3/4] change to subworkflows in workflow instead of subgraph4 --- nvtabular/workflow/workflow.py | 6 ++-- .../unit/workflow/test_workflow_subgraphs.py | 29 +++---------------- 2 files changed, 7 insertions(+), 28 deletions(-) diff --git a/nvtabular/workflow/workflow.py b/nvtabular/workflow/workflow.py index 38c4815952..9c5e944a37 100755 --- a/nvtabular/workflow/workflow.py +++ b/nvtabular/workflow/workflow.py @@ -143,8 +143,8 @@ def fit_schema(self, input_schema: Schema): return self @property - def subgraphs(self): - return self.graph.subgraphs.keys() + def subworkflows(self): + return list(self.graph.subgraphs.keys()) @property def input_dtypes(self): @@ -169,7 +169,7 @@ def output_node(self): def _input_columns(self): return self.graph._input_columns() - def get_subgraph(self, subgraph_name): + def get_subworkflow(self, subgraph_name): subgraph = self.graph.subgraph(subgraph_name) return Workflow(subgraph.output_node) diff --git a/tests/unit/workflow/test_workflow_subgraphs.py b/tests/unit/workflow/test_workflow_subgraphs.py index 72fae8ef47..40e169e997 100644 --- a/tests/unit/workflow/test_workflow_subgraphs.py +++ b/tests/unit/workflow/test_workflow_subgraphs.py @@ -14,18 +14,16 @@ # limitations under the License. # -import math import os import numpy as np import pytest from pandas.api.types import is_integer_dtype -from merlin.core.dispatch import HAS_GPU from merlin.core.utils import set_dask_client from merlin.dag.ops.subgraph import Subgraph from nvtabular import Workflow, ops -from tests.conftest import assert_eq, get_cats +from tests.conftest import assert_eq @pytest.mark.parametrize("gpu_memory_frac", [0.01, 0.1]) @@ -74,25 +72,6 @@ def get_norms(tar): concat_ops = "_FillMissing_1_LogOp_1" if replace: concat_ops = "" - assert math.isclose(get_norms(df.x).mean(), norms.means["x" + concat_ops], rel_tol=1e-1) - assert math.isclose(get_norms(df.y).mean(), norms.means["y" + concat_ops], rel_tol=1e-1) - - assert math.isclose(get_norms(df.x).std(), norms.stds["x" + concat_ops], rel_tol=1e-1) - assert math.isclose(get_norms(df.y).std(), norms.stds["y" + concat_ops], rel_tol=1e-1) - # Check that categories match - if engine == "parquet": - cats_expected0 = df["name-cat"].unique().values_host if HAS_GPU else df["name-cat"].unique() - cats0 = get_cats(workflow, "name-cat") - # adding the None entry as a string because of move from gpu - assert all(cat in sorted(cats_expected0.tolist()) for cat in cats0.tolist()) - assert len(cats0.tolist()) == len(cats_expected0.tolist()) - cats_expected1 = ( - df["name-string"].unique().values_host if HAS_GPU else df["name-string"].unique() - ) - cats1 = get_cats(workflow, "name-string") - # adding the None entry as a string because of move from gpu - assert all(cat in sorted(cats_expected1.tolist()) for cat in cats1.tolist()) - assert len(cats1.tolist()) == len(cats_expected1.tolist()) # Write to new "shuffled" and "processed" dataset df_pp = workflow.transform(dataset).to_ddf().compute() @@ -101,8 +80,8 @@ def get_norms(tar): assert is_integer_dtype(df_pp["name-cat"].dtype) assert is_integer_dtype(df_pp["name-string"].dtype) - subgraph_cat = workflow.get_subgraph("cat_graph") - subgraph_cont = workflow.get_subgraph("cont_graph") + subgraph_cat = workflow.get_subworkflow("cat_graph") + subgraph_cont = workflow.get_subworkflow("cont_graph") assert isinstance(subgraph_cat, Workflow) assert isinstance(subgraph_cont, Workflow) # will not be the same nodes of saved out and loaded back @@ -111,7 +90,7 @@ def get_norms(tar): assert subgraph_cont.output_node == cont_features # check failure path works as expected with pytest.raises(ValueError) as exc: - workflow.get_subgraph("not_exist") + workflow.get_subworkflow("not_exist") assert "No subgraph named" in str(exc.value) sub_cat_df = subgraph_cat.transform(dataset).to_ddf().compute() From aa2dafcdb3561de594613e24f858baf122434c69 Mon Sep 17 00:00:00 2001 From: Julio Date: Fri, 9 Jun 2023 12:33:25 -0400 Subject: [PATCH 4/4] remove reset_index calls in asserts of test --- tests/unit/workflow/test_workflow_subgraphs.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/tests/unit/workflow/test_workflow_subgraphs.py b/tests/unit/workflow/test_workflow_subgraphs.py index 40e169e997..047edac246 100644 --- a/tests/unit/workflow/test_workflow_subgraphs.py +++ b/tests/unit/workflow/test_workflow_subgraphs.py @@ -67,13 +67,10 @@ def get_norms(tar): gdf = np.log(gdf + 1) return gdf - # Check mean and std - No good right now we have to add all other changes; Clip, Log - concat_ops = "_FillMissing_1_LogOp_1" if replace: concat_ops = "" - # Write to new "shuffled" and "processed" dataset df_pp = workflow.transform(dataset).to_ddf().compute() if engine == "parquet": @@ -93,11 +90,10 @@ def get_norms(tar): workflow.get_subworkflow("not_exist") assert "No subgraph named" in str(exc.value) + # test transform results from subgraph sub_cat_df = subgraph_cat.transform(dataset).to_ddf().compute() - assert assert_eq(sub_cat_df.reset_index(drop=True), df_pp[cat_names].reset_index(drop=True)) + assert_eq(sub_cat_df, df_pp[cat_names]) cont_names = [name + concat_ops for name in cont_names] sub_cont_df = subgraph_cont.transform(dataset).to_ddf().compute() - assert assert_eq( - sub_cont_df[cont_names].reset_index(drop=True), df_pp[cont_names].reset_index(drop=True) - ) + assert_eq(sub_cont_df[cont_names], df_pp[cont_names])