Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to for embedding op with categorify from scratch #1827

Merged
merged 7 commits into from
Jun 8, 2023
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion tests/unit/workflow/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import nvtabular as nvt
from merlin.core import dispatch
from merlin.core.compat import cudf, dask_cudf
from merlin.core.dispatch import HAS_GPU, make_df
from merlin.core.dispatch import HAS_GPU, create_multihot_col, make_df
from merlin.core.utils import set_dask_client
from merlin.dag import ColumnSelector, postorder_iter_nodes
from merlin.dataloader.loader_base import LoaderBase as Loader
from merlin.dataloader.ops.embeddings import EmbeddingOperator
from merlin.schema import Tags
from nvtabular import Dataset, Workflow, ops
from tests.conftest import assert_eq, get_cats, mycols_csv
Expand Down Expand Up @@ -737,3 +739,55 @@ def test_workflow_auto_infer_modules_byvalue(tmp_path):
os.unlink(str(tmp_path / "not_a_real_module.py"))

Workflow.load(str(tmp_path / "identity-workflow"))


@pytest.mark.parametrize("cpu", [None, "cpu"] if HAS_GPU else ["cpu"])
def test_embedding_cat_export_import(tmpdir, cpu):
string_ids = ["alpha", "bravo", "charlie", "delta", "foxtrot"]
training_data = make_df(
{
"string_id": string_ids,
}
)
training_data["embeddings"] = create_multihot_col([0, 5, 10, 15, 20, 25], np.random.rand(25))

cat_op = nvt.ops.Categorify()

# first workflow that categorifies all data
graph1 = ["string_id"] >> cat_op
emb_res = Workflow(graph1 + ["embeddings"]).fit_transform(
Dataset(training_data, cpu=(cpu is not None))
)
npy_path = str(tmpdir / "embeddings.npy")
emb_res.to_npy(npy_path)

embeddings = np.load(npy_path)
# second workflow that categorifies the embedding table data
df = make_df({"string_id": np.random.choice(string_ids, 30)})
graph2 = ["string_id"] >> cat_op
train_res = Workflow(graph2).transform(Dataset(df, cpu=(cpu is not None)))

data_loader = Loader(
train_res,
batch_size=1,
transforms=[
EmbeddingOperator(
embeddings[:, 1:],
id_lookup_table=embeddings[:, 0].astype(int),
lookup_key="string_id",
)
],
shuffle=False,
device=cpu,
)
origin_df = train_res.to_ddf().merge(emb_res.to_ddf(), on="string_id", how="left").compute()
for idx, batch in enumerate(data_loader):
batch
b_df = batch[0].to_df()
org_df = origin_df.iloc[idx]
if not cpu:
assert (b_df["string_id"].to_numpy() == org_df["string_id"].to_numpy()).all()
assert (b_df["embeddings"].list.leaves == org_df["embeddings"].list.leaves).all()
else:
assert (b_df["string_id"].values == org_df["string_id"]).all()
assert b_df["embeddings"].values[0] == org_df["embeddings"].tolist()