From 25151f73f1f388f596a086dc9e271008486c0c88 Mon Sep 17 00:00:00 2001 From: Julio Perez <37191411+jperez999@users.noreply.github.com> Date: Thu, 8 Jun 2023 15:23:22 -0400 Subject: [PATCH] How to for embedding op with categorify from scratch (#1827) * embedding op test from start * setup asserts to verify logic * use make series to handle cpu-gpu env --------- Co-authored-by: Karl Higley Co-authored-by: rnyak <16246900+rnyak@users.noreply.github.com> --- tests/unit/workflow/test_workflow.py | 58 +++++++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/tests/unit/workflow/test_workflow.py b/tests/unit/workflow/test_workflow.py index a5e2688ba7..77009da135 100755 --- a/tests/unit/workflow/test_workflow.py +++ b/tests/unit/workflow/test_workflow.py @@ -27,9 +27,11 @@ import nvtabular as nvt from merlin.core import dispatch from merlin.core.compat import cudf, dask_cudf -from merlin.core.dispatch import HAS_GPU, make_df +from merlin.core.dispatch import HAS_GPU, create_multihot_col, make_df, make_series from merlin.core.utils import set_dask_client from merlin.dag import ColumnSelector, postorder_iter_nodes +from merlin.dataloader.loader_base import LoaderBase as Loader +from merlin.dataloader.ops.embeddings import EmbeddingOperator from merlin.schema import Tags from nvtabular import Dataset, Workflow, ops from tests.conftest import assert_eq, get_cats, mycols_csv @@ -774,3 +776,57 @@ def test_workflow_auto_infer_modules_byvalue(tmp_path): os.unlink(str(tmp_path / "not_a_real_module.py")) Workflow.load(str(tmp_path / "identity-workflow")) + + +@pytest.mark.parametrize("cpu", [None, "cpu"] if HAS_GPU else ["cpu"]) +def test_embedding_cat_export_import(tmpdir, cpu): + string_ids = ["alpha", "bravo", "charlie", "delta", "foxtrot"] + training_data = make_df( + { + "string_id": string_ids, + } + ) + training_data["embeddings"] = create_multihot_col( + [0, 5, 10, 15, 20, 25], make_series(np.random.rand(25)) + ) + + cat_op = nvt.ops.Categorify() + + # first workflow that categorifies all data + graph1 = ["string_id"] >> cat_op + emb_res = Workflow(graph1 + ["embeddings"]).fit_transform( + Dataset(training_data, cpu=(cpu is not None)) + ) + npy_path = str(tmpdir / "embeddings.npy") + emb_res.to_npy(npy_path) + + embeddings = np.load(npy_path) + # second workflow that categorifies the embedding table data + df = make_df({"string_id": np.random.choice(string_ids, 30)}) + graph2 = ["string_id"] >> cat_op + train_res = Workflow(graph2).transform(Dataset(df, cpu=(cpu is not None))) + + data_loader = Loader( + train_res, + batch_size=1, + transforms=[ + EmbeddingOperator( + embeddings[:, 1:], + id_lookup_table=embeddings[:, 0].astype(int), + lookup_key="string_id", + ) + ], + shuffle=False, + device=cpu, + ) + origin_df = train_res.to_ddf().merge(emb_res.to_ddf(), on="string_id", how="left").compute() + for idx, batch in enumerate(data_loader): + batch + b_df = batch[0].to_df() + org_df = origin_df.iloc[idx] + if not cpu: + assert (b_df["string_id"].to_numpy() == org_df["string_id"].to_numpy()).all() + assert (b_df["embeddings"].list.leaves == org_df["embeddings"].list.leaves).all() + else: + assert (b_df["string_id"].values == org_df["string_id"]).all() + assert b_df["embeddings"].values[0] == org_df["embeddings"].tolist()