diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index e319c363a23..f503ea3f1d1 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -13,8 +13,8 @@ from __future__ import annotations -import dataclasses import itertools +import json from functools import cache from pathlib import Path from typing import TYPE_CHECKING, Any, ClassVar @@ -27,10 +27,11 @@ import cudf_polars.dsl.expr as expr from cudf_polars.containers import Column, DataFrame -from cudf_polars.utils import dtypes, sorting +from cudf_polars.dsl.nodebase import Node +from cudf_polars.utils import dtypes if TYPE_CHECKING: - from collections.abc import Callable, MutableMapping + from collections.abc import Callable, Hashable, MutableMapping, Sequence from typing import Literal from cudf_polars.typing import Schema @@ -121,16 +122,27 @@ def broadcast(*columns: Column, target_length: int | None = None) -> list[Column ] -@dataclasses.dataclass -class IR: +class IR(Node): """Abstract plan node, representing an unevaluated dataframe.""" + __slots__ = ("schema",) + _non_child: ClassVar[tuple[str, ...]] = ("schema",) schema: Schema """Mapping from column names to their data types.""" + children: tuple[IR, ...] = () - def __post_init__(self): - """Validate preconditions.""" - pass # noqa: PIE790 + def get_hashable(self) -> Hashable: + """ + Hashable representation of node, treating schema dictionary. + + Since the schema is a dictionary, even though it is morally + immutable, it is not hashable. We therefore convert it to + tuples for hashing purposes. + """ + # Schema is the first constructor argument + args = self._ctor_arguments(self.children)[1:] + schema_hash = tuple(self.schema.items()) + return (type(self), schema_hash, args) def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """ @@ -159,24 +171,49 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: ) # pragma: no cover -@dataclasses.dataclass class PythonScan(IR): """Representation of input from a python function.""" + __slots__ = ("options", "predicate") + _non_child = ("schema", "options", "predicate") options: Any """Arbitrary options.""" predicate: expr.NamedExpr | None """Filter to apply to the constructed dataframe before returning it.""" - def __post_init__(self): - """Validate preconditions.""" + def __init__(self, schema: Schema, options: Any, predicate: expr.NamedExpr | None): + self.schema = schema + self.options = options + self.predicate = predicate raise NotImplementedError("PythonScan not implemented") -@dataclasses.dataclass class Scan(IR): """Input from files.""" + __slots__ = ( + "typ", + "reader_options", + "cloud_options", + "paths", + "with_columns", + "skip_rows", + "n_rows", + "row_index", + "predicate", + ) + _non_child = ( + "schema", + "typ", + "reader_options", + "cloud_options", + "paths", + "with_columns", + "skip_rows", + "n_rows", + "row_index", + "predicate", + ) typ: str """What type of file are we reading? Parquet, CSV, etc...""" reader_options: dict[str, Any] @@ -185,7 +222,7 @@ class Scan(IR): """Cloud-related authentication options, currently ignored.""" paths: list[str] """List of paths to read from.""" - with_columns: list[str] + with_columns: list[str] | None """Projected columns to return.""" skip_rows: int """Rows to skip at the start when reading.""" @@ -196,9 +233,29 @@ class Scan(IR): predicate: expr.NamedExpr | None """Mask to apply to the read dataframe.""" - def __post_init__(self) -> None: - """Validate preconditions.""" - super().__post_init__() + def __init__( + self, + schema: Schema, + typ: str, + reader_options: dict[str, Any], + cloud_options: dict[str, Any] | None, + paths: list[str], + with_columns: list[str] | None, + skip_rows: int, + n_rows: int, + row_index: tuple[str, int] | None, + predicate: expr.NamedExpr | None, + ): + self.schema = schema + self.typ = typ + self.reader_options = reader_options + self.cloud_options = cloud_options + self.paths = paths + self.with_columns = with_columns + self.skip_rows = skip_rows + self.n_rows = n_rows + self.row_index = row_index + self.predicate = predicate if self.typ not in ("csv", "parquet", "ndjson"): # pragma: no cover # This line is unhittable ATM since IPC/Anonymous scan raise # on the polars side @@ -258,6 +315,28 @@ def __post_init__(self) -> None: "Reading only parquet metadata to produce row index." ) + def get_hashable(self) -> Hashable: + """ + Hashable representation of the node. + + The options dictionaries are serialised for hashing purposes + as json strings. + """ + schema_hash = tuple(self.schema.items()) + return ( + type(self), + schema_hash, + self.typ, + json.dumps(self.reader_options), + json.dumps(self.cloud_options), + tuple(self.paths), + tuple(self.with_columns) if self.with_columns is not None else None, + self.skip_rows, + self.n_rows, + self.row_index, + self.predicate, + ) + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" with_columns = self.with_columns @@ -401,7 +480,6 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return df.filter(mask) -@dataclasses.dataclass class Cache(IR): """ Return a cached plan node. @@ -409,20 +487,28 @@ class Cache(IR): Used for CSE at the plan level. """ + __slots__ = ("key", "children") + _non_child = ("schema", "key") + children: tuple[IR] key: int """The cache key.""" value: IR """The unevaluated node to cache.""" + def __init__(self, schema: Schema, key: int, value: IR): + self.schema = schema + self.key = key + self.children = (value,) + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" try: return cache[self.key] except KeyError: - return cache.setdefault(self.key, self.value.evaluate(cache=cache)) + (value,) = self.children + return cache.setdefault(self.key, value.evaluate(cache=cache)) -@dataclasses.dataclass class DataFrameScan(IR): """ Input from an existing polars DataFrame. @@ -430,13 +516,37 @@ class DataFrameScan(IR): This typically arises from ``q.collect().lazy()`` """ + __slots__ = ("df", "projection", "predicate") + _non_child = ("schema", "df", "projection", "predicate") df: Any """Polars LazyFrame object.""" - projection: list[str] + projection: tuple[str, ...] | None """List of columns to project out.""" predicate: expr.NamedExpr | None """Mask to apply.""" + def __init__( + self, + schema: Schema, + df: Any, + projection: Sequence[str] | None, + predicate: expr.NamedExpr | None, + ): + self.schema = schema + self.df = df + self.projection = tuple(projection) if projection is not None else None + self.predicate = predicate + + def get_hashable(self) -> Hashable: + """ + Hashable representation of the node. + + The (heavy) dataframe object is hashed as its id, so this is + not stable across runs, or repeat instances of the same equal dataframes. + """ + schema_hash = tuple(self.schema.items()) + return (type(self), schema_hash, id(self.df), self.projection, self.predicate) + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" pdf = pl.DataFrame._from_pydf(self.df) @@ -454,28 +564,42 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return df -@dataclasses.dataclass class Select(IR): """Produce a new dataframe selecting given expressions from an input.""" + __slots__ = ("exprs", "children", "should_broadcast") + _non_child = ("schema", "exprs", "should_broadcast") + children: tuple[IR] df: IR """Input dataframe.""" - expr: list[expr.NamedExpr] + exprs: tuple[expr.NamedExpr, ...] """List of expressions to evaluate to form the new dataframe.""" should_broadcast: bool """Should columns be broadcast?""" + def __init__( + self, + schema: Schema, + exprs: Sequence[expr.NamedExpr], + should_broadcast: bool, # noqa: FBT001 + df: IR, + ): + self.schema = schema + self.exprs = tuple(exprs) + self.should_broadcast = should_broadcast + self.children = (df,) + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" - df = self.df.evaluate(cache=cache) + (child,) = self.children + df = child.evaluate(cache=cache) # Handle any broadcasting - columns = [e.evaluate(df) for e in self.expr] + columns = [e.evaluate(df) for e in self.exprs] if self.should_broadcast: columns = broadcast(*columns) return DataFrame(columns) -@dataclasses.dataclass class Reduce(IR): """ Produce a new dataframe selecting given expressions from an input. @@ -483,36 +607,70 @@ class Reduce(IR): This is a special case of :class:`Select` where all outputs are a single row. """ + __slots__ = ("exprs", "children") + _non_child = ("schema", "exprs") + df: IR """Input dataframe.""" - expr: list[expr.NamedExpr] + exprs: tuple[expr.NamedExpr, ...] """List of expressions to evaluate to form the new dataframe.""" + def __init__( + self, schema: Schema, exprs: Sequence[expr.NamedExpr], df: IR + ): # pragma: no cover; polars doesn't emit this node yet + self.schema = schema + self.exprs = tuple(exprs) + self.children = (df,) + def evaluate( self, *, cache: MutableMapping[int, DataFrame] ) -> DataFrame: # pragma: no cover; polars doesn't emit this node yet """Evaluate and return a dataframe.""" - df = self.df.evaluate(cache=cache) - columns = broadcast(*(e.evaluate(df) for e in self.expr)) + (child,) = self.children + df = child.evaluate(cache=cache) + columns = broadcast(*(e.evaluate(df) for e in self.exprs)) assert all(column.obj.size() == 1 for column in columns) return DataFrame(columns) -@dataclasses.dataclass class GroupBy(IR): """Perform a groupby.""" - df: IR - """Input dataframe.""" - agg_requests: list[expr.NamedExpr] - """List of expressions to evaluate groupwise.""" - keys: list[expr.NamedExpr] - """List of expressions forming the keys.""" - maintain_order: bool - """Should the order of the input dataframe be maintained?""" - options: Any - """Options controlling style of groupby.""" - agg_infos: list[expr.AggInfo] = dataclasses.field(init=False) + __slots__ = ( + "agg_requests", + "keys", + "maintain_order", + "options", + "agg_infos", + "children", + ) + _non_child = ("schema", "keys", "agg_requests", "maintain_order", "options") + children: tuple[IR] + + def __init__( + self, + schema: Schema, + keys: Sequence[expr.NamedExpr], + agg_requests: Sequence[expr.NamedExpr], + maintain_order: bool, # noqa: FBT001 + options: Any, + df: IR, + ): + self.schema = schema + self.keys = tuple(keys) + self.agg_requests = tuple(agg_requests) + self.maintain_order = maintain_order + self.options = options + self.children = (df,) + if self.options.rolling: + raise NotImplementedError( + "rolling window/groupby" + ) # pragma: no cover; rollingwindow constructor has already raised + if any(GroupBy.check_agg(a.value) > 1 for a in self.agg_requests): + raise NotImplementedError("Nested aggregations in groupby") + self.agg_infos = [req.collect_agg(depth=0) for req in self.agg_requests] + if len(self.keys) == 0: + raise NotImplementedError("dynamic groupby") @staticmethod def check_agg(agg: expr.Expr) -> int: @@ -542,22 +700,10 @@ def check_agg(agg: expr.Expr) -> int: else: raise NotImplementedError(f"No handler for {agg=}") - def __post_init__(self) -> None: - """Check whether all the aggregations are implemented.""" - super().__post_init__() - if self.options.rolling: - raise NotImplementedError( - "rolling window/groupby" - ) # pragma: no cover; rollingwindow constructor has already raised - if any(GroupBy.check_agg(a.value) > 1 for a in self.agg_requests): - raise NotImplementedError("Nested aggregations in groupby") - self.agg_infos = [req.collect_agg(depth=0) for req in self.agg_requests] - if len(self.keys) == 0: - raise NotImplementedError("dynamic groupby") - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" - df = self.df.evaluate(cache=cache) + (child,) = self.children + df = child.evaluate(cache=cache) keys = broadcast( *(k.evaluate(df) for k in self.keys), target_length=df.num_rows ) @@ -646,17 +792,14 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return DataFrame(broadcasted).slice(self.options.slice) -@dataclasses.dataclass class Join(IR): """A join of two dataframes.""" - left: IR - """Left frame.""" - right: IR - """Right frame.""" - left_on: list[expr.NamedExpr] + __slots__ = ("left_on", "right_on", "options", "children") + _non_child = ("schema", "left_on", "right_on", "options") + left_on: tuple[expr.NamedExpr, ...] """List of expressions used as keys in the left frame.""" - right_on: list[expr.NamedExpr] + right_on: tuple[expr.NamedExpr, ...] """List of expressions used as keys in the right frame.""" options: tuple[ Literal["inner", "left", "right", "full", "leftsemi", "leftanti", "cross"], @@ -674,9 +817,20 @@ class Join(IR): - coalesce: should key columns be coalesced (only makes sense for outer joins) """ - def __post_init__(self) -> None: - """Validate preconditions.""" - super().__post_init__() + def __init__( + self, + schema: Schema, + left_on: Sequence[expr.NamedExpr], + right_on: Sequence[expr.NamedExpr], + options: Any, + left: IR, + right: IR, + ): + self.schema = schema + self.left_on = tuple(left_on) + self.right_on = tuple(right_on) + self.options = options + self.children = (left, right) if any( isinstance(e.value, expr.Literal) for e in itertools.chain(self.left_on, self.right_on) @@ -777,8 +931,7 @@ def _reorder_maps( def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" - left = self.left.evaluate(cache=cache) - right = self.right.evaluate(cache=cache) + left, right = (c.evaluate(cache=cache) for c in self.children) how, join_nulls, zlice, suffix, coalesce = self.options suffix = "_right" if suffix is None else suffix if how == "cross": @@ -866,20 +1019,29 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return result.slice(zlice) -@dataclasses.dataclass class HStack(IR): """Add new columns to a dataframe.""" - df: IR - """Input dataframe.""" - columns: list[expr.NamedExpr] - """List of expressions to produce new columns.""" - should_broadcast: bool - """Should columns be broadcast?""" + __slots__ = ("columns", "should_broadcast", "children") + _non_child = ("schema", "columns", "should_broadcast") + children: tuple[IR] + + def __init__( + self, + schema: Schema, + columns: Sequence[expr.NamedExpr], + should_broadcast: bool, # noqa: FBT001 + df: IR, + ): + self.schema = schema + self.columns = tuple(columns) + self.should_broadcast = should_broadcast + self.children = (df,) def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" - df = self.df.evaluate(cache=cache) + (child,) = self.children + df = child.evaluate(cache=cache) columns = [c.evaluate(df) for c in self.columns] if self.should_broadcast: columns = broadcast(*columns, target_length=df.num_rows) @@ -895,20 +1057,28 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return df.with_columns(columns) -@dataclasses.dataclass class Distinct(IR): """Produce a new dataframe with distinct rows.""" - df: IR - """Input dataframe.""" - keep: plc.stream_compaction.DuplicateKeepOption - """Which rows to keep.""" - subset: set[str] | None - """Which columns to inspect when computing distinct rows.""" - zlice: tuple[int, int] | None - """Optional slice to perform after compaction.""" - stable: bool - """Should order be preserved?""" + __slots__ = ("keep", "subset", "zlice", "stable", "children") + _non_child = ("schema", "keep", "subset", "zlice", "stable") + children: tuple[IR] + + def __init__( + self, + schema: Schema, + keep: plc.stream_compaction.DuplicateKeepOption, + subset: frozenset[str] | None, + zlice: tuple[int, int] | None, + stable: bool, # noqa: FBT001 + df: IR, + ): + self.schema = schema + self.keep = keep + self.subset = subset + self.zlice = zlice + self.stable = stable + self.children = (df,) _KEEP_MAP: ClassVar[dict[str, plc.stream_compaction.DuplicateKeepOption]] = { "first": plc.stream_compaction.DuplicateKeepOption.KEEP_FIRST, @@ -917,18 +1087,10 @@ class Distinct(IR): "any": plc.stream_compaction.DuplicateKeepOption.KEEP_ANY, } - def __init__(self, schema: Schema, df: IR, options: Any) -> None: - self.schema = schema - self.df = df - (keep, subset, maintain_order, zlice) = options - self.keep = Distinct._KEEP_MAP[keep] - self.subset = set(subset) if subset is not None else None - self.stable = maintain_order - self.zlice = zlice - def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" - df = self.df.evaluate(cache=cache) + (child,) = self.children + df = child.evaluate(cache=cache) if self.subset is None: indices = list(range(df.num_columns)) keys_sorted = all(c.is_sorted for c in df.column_map.values()) @@ -967,46 +1129,35 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return result.slice(self.zlice) -@dataclasses.dataclass class Sort(IR): """Sort a dataframe.""" - df: IR - """Input.""" - by: list[expr.NamedExpr] - """List of expressions to produce sort keys.""" - do_sort: Callable[..., plc.Table] - """pylibcudf sorting function.""" - zlice: tuple[int, int] | None - """Optional slice to apply after sorting.""" - order: list[plc.types.Order] - """Order keys should be sorted in.""" - null_order: list[plc.types.NullOrder] - """Where nulls sort to.""" + __slots__ = ("by", "order", "null_order", "stable", "zlice", "children") + _non_child = ("schema", "by", "order", "null_order", "stable", "zlice") + children: tuple[IR] def __init__( self, schema: Schema, - df: IR, - by: list[expr.NamedExpr], - options: Any, + by: Sequence[expr.NamedExpr], + order: Sequence[plc.types.Order], + null_order: Sequence[plc.types.NullOrder], + stable: bool, # noqa: FBT001 zlice: tuple[int, int] | None, - ) -> None: + df: IR, + ): self.schema = schema - self.df = df - self.by = by + self.by = tuple(by) + self.order = tuple(order) + self.null_order = tuple(null_order) + self.stable = stable self.zlice = zlice - stable, nulls_last, descending = options - self.order, self.null_order = sorting.sort_order( - descending, nulls_last=nulls_last, num_keys=len(by) - ) - self.do_sort = ( - plc.sorting.stable_sort_by_key if stable else plc.sorting.sort_by_key - ) + self.children = (df,) def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" - df = self.df.evaluate(cache=cache) + (child,) = self.children + df = child.evaluate(cache=cache) sort_keys = broadcast( *(k.evaluate(df) for k in self.by), target_length=df.num_rows ) @@ -1016,11 +1167,14 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: for i, k in enumerate(sort_keys) if k.name in df.column_map and k.obj is df.column_map[k.name].obj } - table = self.do_sort( + do_sort = ( + plc.sorting.stable_sort_by_key if self.stable else plc.sorting.sort_by_key + ) + table = do_sort( df.table, plc.Table([k.obj for k in sort_keys]), - self.order, - self.null_order, + list(self.order), + list(self.null_order), ) columns: list[Column] = [] for name, c in zip(df.column_map, table.columns(), strict=True): @@ -1037,49 +1191,65 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return DataFrame(columns).slice(self.zlice) -@dataclasses.dataclass class Slice(IR): """Slice a dataframe.""" - df: IR - """Input.""" + __slots__ = ("offset", "length", "children") + _non_child = ("schema", "offset", "length") + children: tuple[IR] offset: int """Start of the slice.""" length: int """Length of the slice.""" + def __init__(self, schema: Schema, offset: int, length: int, df: IR): + self.schema = schema + self.offset = offset + self.length = length + self.children = (df,) + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" - df = self.df.evaluate(cache=cache) + (child,) = self.children + df = child.evaluate(cache=cache) return df.slice((self.offset, self.length)) -@dataclasses.dataclass class Filter(IR): """Filter a dataframe with a boolean mask.""" - df: IR - """Input.""" - mask: expr.NamedExpr - """Expression evaluating to a mask.""" + __slots__ = ("mask", "children") + _non_child = ("schema", "mask") + children: tuple[IR] + + def __init__(self, schema: Schema, mask: expr.NamedExpr, df: IR): + self.schema = schema + self.mask = mask + self.children = (df,) def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" - df = self.df.evaluate(cache=cache) + (child,) = self.children + df = child.evaluate(cache=cache) (mask,) = broadcast(self.mask.evaluate(df), target_length=df.num_rows) return df.filter(mask) -@dataclasses.dataclass class Projection(IR): """Select a subset of columns from a dataframe.""" - df: IR - """Input.""" + __slots__ = ("children",) + _non_child = ("schema",) + children: tuple[IR] + + def __init__(self, schema: Schema, df: IR): + self.schema = schema + self.children = (df,) def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" - df = self.df.evaluate(cache=cache) + (child,) = self.children + df = child.evaluate(cache=cache) # This can reorder things. columns = broadcast( *(df.column_map[name] for name in self.schema), target_length=df.num_rows @@ -1087,16 +1257,13 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return DataFrame(columns) -@dataclasses.dataclass class MapFunction(IR): """Apply some function to a dataframe.""" - df: IR - """Input.""" - name: str - """Function name.""" + __slots__ = ("name", "options", "children") + _non_child = ("schema", "name", "options") + children: tuple[IR] options: Any - """Arbitrary options, interpreted per function.""" _NAMES: ClassVar[frozenset[str]] = frozenset( [ @@ -1111,9 +1278,11 @@ class MapFunction(IR): ] ) - def __post_init__(self) -> None: - """Validate preconditions.""" - super().__post_init__() + def __init__(self, schema: Schema, name: str, options: Any, df: IR): + self.schema = schema + self.name = name + self.options = options + self.children = (df,) if self.name not in MapFunction._NAMES: raise NotImplementedError(f"Unhandled map function {self.name}") if self.name == "explode": @@ -1127,7 +1296,7 @@ def __post_init__(self) -> None: old, new, _ = self.options # TODO: perhaps polars should validate renaming in the IR? if len(new) != len(set(new)) or ( - set(new) & (set(self.df.schema.keys()) - set(old)) + set(new) & (set(df.schema.keys()) - set(old)) ): raise NotImplementedError("Duplicate new names in rename.") elif self.name == "unpivot": @@ -1136,31 +1305,31 @@ def __post_init__(self) -> None: variable_name = "variable" if variable_name is None else variable_name if len(pivotees) == 0: index = frozenset(indices) - pivotees = [name for name in self.df.schema if name not in index] + pivotees = [name for name in df.schema if name not in index] if not all( - dtypes.can_cast(self.df.schema[p], self.schema[value_name]) - for p in pivotees + dtypes.can_cast(df.schema[p], self.schema[value_name]) for p in pivotees ): raise NotImplementedError( "Unpivot cannot cast all input columns to " f"{self.schema[value_name].id()}" ) - self.options = (indices, pivotees, variable_name, value_name) + self.options = (tuple(indices), tuple(pivotees), variable_name, value_name) def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" + (child,) = self.children if self.name == "rechunk": # No-op in our data model # Don't think this appears in a plan tree from python - return self.df.evaluate(cache=cache) # pragma: no cover + return child.evaluate(cache=cache) # pragma: no cover elif self.name == "rename": - df = self.df.evaluate(cache=cache) + df = child.evaluate(cache=cache) # final tag is "swapping" which is useful for the # optimiser (it blocks some pushdown operations) old, new, _ = self.options return df.rename_columns(dict(zip(old, new, strict=True))) elif self.name == "explode": - df = self.df.evaluate(cache=cache) + df = child.evaluate(cache=cache) ((to_explode,),) = self.options index = df.column_names.index(to_explode) subset = df.column_names_set - {to_explode} @@ -1170,7 +1339,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: elif self.name == "unpivot": indices, pivotees, variable_name, value_name = self.options npiv = len(pivotees) - df = self.df.evaluate(cache=cache) + df = child.evaluate(cache=cache) index_columns = [ Column(col, name=name) for col, name in zip( @@ -1209,37 +1378,38 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: raise AssertionError("Should never be reached") # pragma: no cover -@dataclasses.dataclass class Union(IR): """Concatenate dataframes vertically.""" - dfs: list[IR] - """List of inputs.""" - zlice: tuple[int, int] | None - """Optional slice to apply after concatenation.""" + __slots__ = ("zlice", "children") + _non_child = ("schema", "zlice") - def __post_init__(self) -> None: - """Validate preconditions.""" - super().__post_init__() - schema = self.dfs[0].schema - if not all(s.schema == schema for s in self.dfs[1:]): + def __init__(self, schema: Schema, zlice: tuple[int, int] | None, *children: IR): + self.schema = schema + self.zlice = zlice + self.children = children + schema = self.children[0].schema + if not all(s.schema == schema for s in self.children[1:]): raise NotImplementedError("Schema mismatch") def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" # TODO: only evaluate what we need if we have a slice - dfs = [df.evaluate(cache=cache) for df in self.dfs] + dfs = [df.evaluate(cache=cache) for df in self.children] return DataFrame.from_table( plc.concatenate.concatenate([df.table for df in dfs]), dfs[0].column_names ).slice(self.zlice) -@dataclasses.dataclass class HConcat(IR): """Concatenate dataframes horizontally.""" - dfs: list[IR] - """List of inputs.""" + __slots__ = ("children",) + _non_child = ("schema",) + + def __init__(self, schema: Schema, *children: IR): + self.schema = schema + self.children = children @staticmethod def _extend_with_nulls(table: plc.Table, *, nrows: int) -> plc.Table: @@ -1271,7 +1441,7 @@ def _extend_with_nulls(table: plc.Table, *, nrows: int) -> plc.Table: def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" - dfs = [df.evaluate(cache=cache) for df in self.dfs] + dfs = [df.evaluate(cache=cache) for df in self.children] max_rows = max(df.num_rows for df in dfs) # Horizontal concatenation extends shorter tables with nulls dfs = [ diff --git a/python/cudf_polars/cudf_polars/dsl/translate.py b/python/cudf_polars/cudf_polars/dsl/translate.py index a0291037f01..522c4a6729c 100644 --- a/python/cudf_polars/cudf_polars/dsl/translate.py +++ b/python/cudf_polars/cudf_polars/dsl/translate.py @@ -20,7 +20,7 @@ from cudf_polars.dsl import expr, ir from cudf_polars.typing import NodeTraverser -from cudf_polars.utils import dtypes +from cudf_polars.utils import dtypes, sorting __all__ = ["translate_ir", "translate_named_expr"] @@ -148,7 +148,7 @@ def _( with set_node(visitor, node.input): inp = translate_ir(visitor, n=None) exprs = [translate_named_expr(visitor, n=e) for e in node.expr] - return ir.Select(schema, inp, exprs, node.should_broadcast) + return ir.Select(schema, exprs, node.should_broadcast, inp) @_translate_ir.register @@ -161,11 +161,11 @@ def _( keys = [translate_named_expr(visitor, n=e) for e in node.keys] return ir.GroupBy( schema, - inp, - aggs, keys, + aggs, node.maintain_order, node.options, + inp, ) @@ -182,7 +182,7 @@ def _( with set_node(visitor, node.input_right): inp_right = translate_ir(visitor, n=None) right_on = [translate_named_expr(visitor, n=e) for e in node.right_on] - return ir.Join(schema, inp_left, inp_right, left_on, right_on, node.options) + return ir.Join(schema, left_on, right_on, node.options, inp_left, inp_right) @_translate_ir.register @@ -192,7 +192,7 @@ def _( with set_node(visitor, node.input): inp = translate_ir(visitor, n=None) exprs = [translate_named_expr(visitor, n=e) for e in node.exprs] - return ir.HStack(schema, inp, exprs, node.should_broadcast) + return ir.HStack(schema, exprs, node.should_broadcast, inp) @_translate_ir.register @@ -202,17 +202,23 @@ def _( with set_node(visitor, node.input): inp = translate_ir(visitor, n=None) exprs = [translate_named_expr(visitor, n=e) for e in node.expr] - return ir.Reduce(schema, inp, exprs) + return ir.Reduce(schema, exprs, inp) @_translate_ir.register def _( node: pl_ir.Distinct, visitor: NodeTraverser, schema: dict[str, plc.DataType] ) -> ir.IR: + (keep, subset, maintain_order, zlice) = node.options + keep = ir.Distinct._KEEP_MAP[keep] + subset = frozenset(subset) if subset is not None else None return ir.Distinct( schema, + keep, + subset, + zlice, + maintain_order, translate_ir(visitor, n=node.input), - node.options, ) @@ -223,14 +229,18 @@ def _( with set_node(visitor, node.input): inp = translate_ir(visitor, n=None) by = [translate_named_expr(visitor, n=e) for e in node.by_column] - return ir.Sort(schema, inp, by, node.sort_options, node.slice) + stable, nulls_last, descending = node.sort_options + order, null_order = sorting.sort_order( + descending, nulls_last=nulls_last, num_keys=len(by) + ) + return ir.Sort(schema, by, order, null_order, stable, node.slice, inp) @_translate_ir.register def _( node: pl_ir.Slice, visitor: NodeTraverser, schema: dict[str, plc.DataType] ) -> ir.IR: - return ir.Slice(schema, translate_ir(visitor, n=node.input), node.offset, node.len) + return ir.Slice(schema, node.offset, node.len, translate_ir(visitor, n=node.input)) @_translate_ir.register @@ -240,7 +250,7 @@ def _( with set_node(visitor, node.input): inp = translate_ir(visitor, n=None) mask = translate_named_expr(visitor, n=node.predicate) - return ir.Filter(schema, inp, mask) + return ir.Filter(schema, mask, inp) @_translate_ir.register @@ -259,10 +269,10 @@ def _( name, *options = node.function return ir.MapFunction( schema, - # TODO: merge_sorted breaks this pattern - translate_ir(visitor, n=node.input), name, options, + # TODO: merge_sorted breaks this pattern + translate_ir(visitor, n=node.input), ) @@ -271,7 +281,7 @@ def _( node: pl_ir.Union, visitor: NodeTraverser, schema: dict[str, plc.DataType] ) -> ir.IR: return ir.Union( - schema, [translate_ir(visitor, n=n) for n in node.inputs], node.options + schema, node.options, *(translate_ir(visitor, n=n) for n in node.inputs) ) @@ -279,7 +289,7 @@ def _( def _( node: pl_ir.HConcat, visitor: NodeTraverser, schema: dict[str, plc.DataType] ) -> ir.IR: - return ir.HConcat(schema, [translate_ir(visitor, n=n) for n in node.inputs]) + return ir.HConcat(schema, *(translate_ir(visitor, n=n) for n in node.inputs)) def translate_ir(visitor: NodeTraverser, *, n: int | None = None) -> ir.IR: diff --git a/python/cudf_polars/tests/test_config.py b/python/cudf_polars/tests/test_config.py index 3c3986be19b..9900f598e5f 100644 --- a/python/cudf_polars/tests/test_config.py +++ b/python/cudf_polars/tests/test_config.py @@ -10,7 +10,7 @@ import rmm -from cudf_polars.dsl.ir import IR +from cudf_polars.dsl.ir import DataFrameScan from cudf_polars.testing.asserts import ( assert_gpu_result_equal, assert_ir_translation_raises, @@ -18,10 +18,10 @@ def test_polars_verbose_warns(monkeypatch): - def raise_unimplemented(self): + def raise_unimplemented(self, *args): raise NotImplementedError("We don't support this") - monkeypatch.setattr(IR, "__post_init__", raise_unimplemented) + monkeypatch.setattr(DataFrameScan, "__init__", raise_unimplemented) q = pl.LazyFrame({}) # Ensure that things raise assert_ir_translation_raises(q, NotImplementedError)