Skip to content

Commit

Permalink
full persistence in AsyncTransformer and fully async udfs (#8164)
Browse files Browse the repository at this point in the history
Co-authored-by: Jakub Kowalski <[email protected]>
GitOrigin-RevId: 4423803c31f732ff373f416189c1bf0a9e6b7d4d
  • Loading branch information
2 people authored and Manul from Pathway committed Feb 17, 2025
1 parent 01311e6 commit 8ee45f2
Show file tree
Hide file tree
Showing 40 changed files with 2,410 additions and 454 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `pw.io.python.write` accepting `ConnectorObserver` as an alternative to `pw.io.subscribe`.
- `pw.io.iceberg.read` and `pw.io.iceberg.write` now support S3 as data backend and AWS Glue catalog implementations.
- All output connectors now support the `sort_by` field for ordering output within a single minibatch.
- A new UDF executor `pw.udfs.fully_async_executor`. It allows for creation of non-blocking asynchronous UDFs which results can be returned in the future processing time.
- A Future data type to represent results of fully asynchronous UDFs.
- `pw.Table.await_futures` method to wait for results of fully asynchronous UDFs.

### Changed
- **BREAKING**: Changed the interface of `LLMReranker`, the `use_logit_bias`, `cache_strategy`, `retry_strategy` and `kwargs` arguments are no longer supported.
- **BREAKING**: LLMReranker no longer inherits from pw.UDF
- **BREAKING**: `pw.stdlib.utils.AsyncTransformer.output_table` now returns a table with columns with Future data type.

## [0.18.0] - 2025-02-07

Expand Down
21 changes: 20 additions & 1 deletion python/pathway/engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class PathwayType:
PY_OBJECT_WRAPPER: PathwayType
@staticmethod
def optional(arg: PathwayType) -> PathwayType: ...
@staticmethod
def future(arg: PathwayType) -> PathwayType: ...

class ConnectorMode(Enum):
STATIC: ConnectorMode
Expand Down Expand Up @@ -693,22 +695,39 @@ class Scope:
def error_log(self, properties: ConnectorProperties) -> tuple[Table, ErrorLog]: ...
def set_error_log(self, error_log: ErrorLog | None) -> None: ...
def set_operator_properties(self, id: int, depends_on_error_log: bool) -> None: ...
def remove_errors_from_table(
def remove_value_from_table(
self,
table: Table,
column_paths: Iterable[ColumnPath],
value: Value,
table_properties: TableProperties,
) -> Table: ...
def remove_retractions_from_table(
self,
table: Table,
table_properties: TableProperties,
) -> Table: ...
def async_transformer(
self,
table: Table,
column_paths: Iterable[ColumnPath],
on_change: Callable,
on_time_end: Callable,
on_end: Callable,
data_source: DataStorage,
data_format: DataFormat,
table_properties: ConnectorProperties,
skip_errors: bool,
) -> Table: ...

class Error: ...

ERROR: Error

class Pending: ...

PENDING: Pending

class Done:
def __lt__(self, other: Frontier) -> bool: ...
def __le__(self, other: Frontier) -> bool: ...
Expand Down
1 change: 1 addition & 0 deletions python/pathway/internals/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
dict[str, _Value],
tuple[_Value, ...],
Error,
Pending,
]
CapturedTable = dict[Pointer, tuple[Value, ...]]
CapturedStream = list[DataRow]
Expand Down
56 changes: 54 additions & 2 deletions python/pathway/internals/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
import pathway.internals as pw
from pathway.engine import ExternalIndexFactory
from pathway.internals import column_properties as cp, dtype as dt, trace
from pathway.internals.datasource import GenericDataSource
from pathway.internals.expression import ColumnExpression, ColumnReference
from pathway.internals.helpers import SetOnceProperty, StableSet
from pathway.internals.parse_graph import G
from pathway.internals.schema import Schema
from pathway.internals.universe import Universe

if TYPE_CHECKING:
from pathway.internals import api
from pathway.internals.expression import InternalColRef
from pathway.internals.operator import OutputHandle
from pathway.internals.table import Table
Expand Down Expand Up @@ -161,6 +164,23 @@ def properties(self) -> cp.ColumnProperties:
return self._properties


class ColumnWithoutExpression(ColumnWithContext):
_dtype: dt.DType

def __init__(
self,
context: Context,
universe: Universe,
dtype: dt.DType,
) -> None:
super().__init__(context, universe)
self._dtype = dtype

@cached_property
def context_dtype(self) -> dt.DType:
return self._dtype


class ColumnWithExpression(ColumnWithContext):
"""Column holding expression and context."""

Expand Down Expand Up @@ -1103,12 +1123,16 @@ def id_column_type(self) -> dt.DType:


@dataclass(eq=False, frozen=True)
class RemoveErrorsContext(
class FilterOutValueContext(
Context, column_properties_evaluator=cp.PreserveDependenciesPropsEvaluator
):
"""Context of `table.remove_errors() operation."""
"""Context of operations that filter all columns of the table.
Used in `table.remove_errors()` and ``table.await_futures()`
"""

orig_id_column: IdColumn
value_to_filter_out: api.Value

def column_dependencies_external(self) -> Iterable[Column]:
return [self.orig_id_column]
Expand Down Expand Up @@ -1144,3 +1168,31 @@ def id_column_type(self) -> dt.DType:
@cached_property
def universe(self) -> Universe:
return self.id_column_to_filter.universe.superset()


@dataclass(eq=False, frozen=True)
class AsyncTransformerContext(
Context, column_properties_evaluator=cp.PreserveDependenciesPropsEvaluator
):
"""Context of `AsyncTransformer` operation."""

input_id_column: IdColumn
input_columns: list[Column]
schema: type[Schema]
on_change: Callable
on_time_end: Callable
on_end: Callable
datasource: GenericDataSource

def column_dependencies_external(self) -> Iterable[Column]:
return [self.input_id_column] + self.input_columns

def input_universe(self) -> Universe:
return self.input_id_column.universe

def id_column_type(self) -> dt.DType:
return self.input_id_column.dtype

@cached_property
def universe(self) -> Universe:
return self.input_id_column.universe.subset()
24 changes: 19 additions & 5 deletions python/pathway/internals/column_properties.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright © 2024 Pathway
# Copyright © 2025 Pathway

from __future__ import annotations

Expand All @@ -7,10 +7,8 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

from pathway.internals import dtype as dt

if TYPE_CHECKING:
import pathway.internals.column as clmn
from pathway.internals import column as clmn, dtype as dt


@dataclass(frozen=True)
Expand Down Expand Up @@ -39,14 +37,30 @@ def _append_only(self, column: clmn.ColumnWithContext) -> bool:

class PreserveDependenciesPropsEvaluator(ColumnPropertiesEvaluator):
def _append_only(self, column: clmn.ColumnWithContext):
return self._has_property(column, "append_only", True)
maybe_append_only = self._check_expression(column)
return maybe_append_only and self._has_property(column, "append_only", True)

def _has_property(self, column: clmn.ColumnWithContext, name: str, value: Any):
return all(
getattr(col.properties, name) == value
for col in column.column_dependencies()
)

def _check_expression(self, column: clmn.ColumnWithContext) -> bool:
from pathway.internals.column import ColumnWithExpression
from pathway.internals.expression_props_evaluator import (
ExpressionPropsEvaluator,
PropsEvaluatorState,
)

if isinstance(column, ColumnWithExpression):
evaluator = ExpressionPropsEvaluator()
props = PropsEvaluatorState(True)
evaluator.eval_expression(column.expression, props=props)
return props.append_only
else:
return True


class UpdateRowsPropsEvaluator(ColumnPropertiesEvaluator):
context: clmn.UpdateRowsContext
Expand Down
36 changes: 35 additions & 1 deletion python/pathway/internals/dtype.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import asyncio
import collections
import datetime
import functools
Expand All @@ -16,8 +17,8 @@
import numpy.typing as npt
import pandas as pd

from pathway.engine import PathwayType
from pathway.internals import api, datetime_types, json as js
from pathway.internals.api import PathwayType

if typing.TYPE_CHECKING:
from pathway.internals.schema import SchemaMetaclass
Expand Down Expand Up @@ -457,6 +458,35 @@ def max_size(self) -> float:
return math.inf


class Future(DType):
wrapped: DType

def __repr__(self):
return f"Future({self.wrapped})"

def __new__(cls, arg: DType) -> Future:
arg = wrap(arg)
if isinstance(arg, Future):
return arg
return super().__new__(cls, arg)

def _set_args(self, wrapped):
self.wrapped = wrapped

def to_engine(self) -> PathwayType:
return api.PathwayType.future(self.wrapped.to_engine())

def is_value_compatible(self, arg):
return arg is api.PENDING or self.wrapped.is_value_compatible(arg)

@cached_property
def typehint(self) -> type[asyncio.Future]:
return asyncio.Future[self.wrapped.typehint] # type: ignore[name-defined]

def max_size(self) -> float:
return self.wrapped.max_size()


class _DateTimeNaive(DType):
def __repr__(self):
return "DATE_TIME_NAIVE"
Expand Down Expand Up @@ -658,6 +688,10 @@ def wrap(input_type) -> DType:
)
elif input_type == datetime.timedelta:
raise TypeError(f"Unsupported type {input_type}, use pw.DURATION")
elif typing.get_origin(input_type) == asyncio.Future:
args = get_args(input_type)
(arg,) = args
return Future(wrap(arg))
else:
dtype = {
int: INT,
Expand Down
35 changes: 30 additions & 5 deletions python/pathway/internals/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
from typing import TYPE_CHECKING, Any, Mapping, cast

from pathway.internals import api, dtype as dt, helpers
from pathway.internals.api import Value
from pathway.internals.operator_input import OperatorInput
from pathway.internals.shadows import operator
from pathway.internals.trace import Trace

if TYPE_CHECKING:
from pathway.internals.api import Value
from pathway.internals.column import Column, ColumnWithExpression
from pathway.internals.expressions import (
DateTimeNamespace,
Expand Down Expand Up @@ -745,6 +745,7 @@ class ApplyExpression(ColumnExpression):
_return_type: dt.DType
_propagate_none: bool
_deterministic: bool
_check_for_disallowed_types: bool
_args: tuple[ColumnExpression, ...]
_kwargs: dict[str, ColumnExpression]
_fun: Callable
Expand All @@ -757,15 +758,14 @@ def __init__(
deterministic: bool,
args: tuple[ColumnExpression | Value, ...],
kwargs: Mapping[str, ColumnExpression | Value],
_check_for_disallowed_types: bool = True,
):
super().__init__()
self._fun = fun
return_type = dt.wrap(return_type)
if propagate_none:
return_type = dt.Optional(return_type)
self._return_type = return_type
self._return_type = dt.wrap(return_type)
self._propagate_none = propagate_none
self._deterministic = deterministic
self._check_for_disallowed_types = _check_for_disallowed_types

self._args = tuple(ColumnExpression._wrap(arg) for arg in args)

Expand All @@ -783,15 +783,40 @@ def _to_internal(self) -> InternalColExpr:
self._return_type,
self._propagate_none,
self._deterministic,
self._check_for_disallowed_types,
*self._args,
**self._kwargs,
)

@property
def _maybe_optional_return_type(self) -> dt.DType:
if self._propagate_none:
return dt.Optional(self._return_type)
else:
return self._return_type


class AsyncApplyExpression(ApplyExpression):
pass


class FullyAsyncApplyExpression(ApplyExpression):
autocommit_duration_ms: int | None

def __init__(
self,
fun: Callable,
return_type: Any,
propagate_none: bool,
deterministic: bool,
autocommit_duration_ms: int | None,
args: tuple[ColumnExpression | Value, ...],
kwargs: Mapping[str, ColumnExpression | Value],
):
super().__init__(fun, return_type, propagate_none, deterministic, args, kwargs)
self.autocommit_duration_ms = autocommit_duration_ms


class CastExpression(ColumnExpression):
_return_type: dt.DType
_expr: ColumnExpression
Expand Down
4 changes: 4 additions & 0 deletions python/pathway/internals/expression_printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ def eval_fill_error(self, expression: expr.FillErrorExpression):
args = self._eval_args_kwargs((expression._expr, expression._replacement))
return f"pathway.fill_error({args})"

def eval_fully_async_apply(self, expression: expr.FullyAsyncApplyExpression):
args = self._eval_args_kwargs(expression._args, expression._kwargs)
return f"pathway.apply_fully_async({expression._fun.__name__}, {args})"


def get_expression_info(expression: expr.ColumnExpression) -> str:
printer = ExpressionFormatter()
Expand Down
25 changes: 25 additions & 0 deletions python/pathway/internals/expression_props_evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright © 2025 Pathway

from __future__ import annotations

from dataclasses import dataclass

from pathway.internals import expression as expr
from pathway.internals.expression_visitor import IdentityTransform


@dataclass
class PropsEvaluatorState:
append_only: bool


class ExpressionPropsEvaluator(IdentityTransform):
def eval_fully_async_apply(
self,
expression: expr.FullyAsyncApplyExpression,
props: PropsEvaluatorState | None = None,
**kwargs,
) -> expr.FullyAsyncApplyExpression:
assert props is not None
props.append_only = False
return super().eval_fully_async_apply(expression, props=props, **kwargs)
Loading

0 comments on commit 8ee45f2

Please sign in to comment.