Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check UDF params types in SignalSchema #973

Merged
merged 12 commits into from
Mar 19, 2025
Merged

Conversation

dreadatour
Copy link
Contributor

@dreadatour dreadatour commented Mar 14, 2025

Follow-up for the #966

See this thread for more details: #966 (comment)

We are now checking types of UDF arguments if they are defined. Also there is a special case for File signal: it will be automatically converted to TextFile, ImageFile or VideoFile if one of these types are used in UDF. See also my comments about implementation and examples below 🙏

Very basic example

Let's say we have Dataset my-ds with two signals: name: str and value: int.

We can now define UDF to process these signals, e.g.:

def process(name: str, value: int) -> float:
    return len(name) / value

DataChain.from_dataset("my-ds").map(signal=process)

In this PR we are adding type checking for UDF param types based on dataset, such as the following UDF:

def process(name: int, value: int) -> float:
    return len(name) / value

DataChain.from_dataset("my-ds").map(signal=process)

will return an error:

SignalResolvingError: cannot resolve signal name 'name': types mismatch: <class 'int'> != <class 'str'>

because in dataset my-ds signal name type is str, not int.

Another real-life example

In this example we are creating dataset from Google Storage bucket with signal file type File. Next we are going to process these files as images to get image meta from them. Since general File type have no methods to work with images, we need type conversion (File -> ImageFile), which can be done by using file.as_image_file(), but it is also possible to do it on-the-fly based on UDF signature types, which is the subject of this PR.

Query

from datachain import DataChain, Image, ImageFile


def parse_meta(file: ImageFile) -> Image:
    return file.get_info()


(
    DataChain.from_storage("gs://datachain-test-vlad/human-pose-estimation")
    .limit(100)
    .map(meta=parse_meta)
    .show()
)

Before

$ python query.py
Traceback (most recent call last):
  File "/Users/vlad/work/iterative/datachain/src/datachain/lib/udf.py", line 174, in process
    return self._func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vlad/work/iterative/playground/query.py", line 5, in parse_meta
    return file.get_info()
           ^^^^^^^^^^^^^
  File "/Users/vlad/.virtualenvs/datachain/lib/python3.12/site-packages/pydantic/main.py", line 892, in __getattr__
    raise AttributeError(f'{type(self).__name__!r} object has no attribute {item!r}')
AttributeError: 'File' object has no attribute 'get_info'
==========================================================
Traceback (most recent call last):
  File "/Users/vlad/work/iterative/playground/query.py", line 12, in <module>
    .show()
     ^^^^^^
  File "/Users/vlad/work/iterative/datachain/src/datachain/lib/dc.py", line 1887, in show
    df = dc.to_pandas(flatten, include_hidden=include_hidden)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vlad/work/iterative/datachain/src/datachain/lib/dc.py", line 1864, in to_pandas
    results = self.results(include_hidden=include_hidden)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vlad/work/iterative/datachain/src/datachain/lib/dc.py", line 1334, in results
    return list(self.collect_flatten(include_hidden=include_hidden))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vlad/work/iterative/datachain/src/datachain/lib/dc.py", line 1285, in collect_flatten
    with self._query.ordered_select(*db_signals).as_iterable() as rows:
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.9/Frameworks/Python.framework/Versions/3.12/lib/python3.12/contextlib.py", line 137, in __enter__
    return next(self.gen)
           ^^^^^^^^^^^^^^
  File "/Users/vlad/work/iterative/datachain/src/datachain/query/dataset.py", line 1264, in as_iterable
    query = self.apply_steps().select()
            ^^^^^^^^^^^^^^^^^^
  File "/Users/vlad/work/iterative/datachain/src/datachain/query/dataset.py", line 1210, in apply_steps
    result = step.apply(
             ^^^^^^^^^^^
  File "/Users/vlad/work/iterative/datachain/src/datachain/query/dataset.py", line 602, in apply
    self.populate_udf_table(udf_table, query)
  File "/Users/vlad/work/iterative/datachain/src/datachain/query/dataset.py", line 520, in populate_udf_table
    process_udf_outputs(
  File "/Users/vlad/work/iterative/datachain/src/datachain/query/dataset.py", line 340, in process_udf_outputs
    for udf_output in udf_results:
                      ^^^^^^^^^^^
  File "/Users/vlad/work/iterative/datachain/src/datachain/lib/udf.py", line 100, in run
    yield from self.inner.run(
  File "/Users/vlad/work/iterative/datachain/src/datachain/lib/udf.py", line 386, in run
    result_objs = self.process_safe(udf_args)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vlad/work/iterative/datachain/src/datachain/lib/udf.py", line 284, in process_safe
    raise DataChainError(
datachain.lib.utils.DataChainError: Error in user code in class 'Mapper': 'File' object has no attribute 'get_info'
$

After

$ python query.py
                            file     file              file      file                             file     file  meta   meta   meta
                            path     size              etag is_latest                    last_modified location width height format
0    human-pose-estimation/0.jpg   730027  CKfQgNGSyIkDEAE=         1 2024-11-06 16:39:41.795000+00:00     None  3580   5370   JPEG
1    human-pose-estimation/1.jpg  4744618  CPbpqtGSyIkDEAE=         1 2024-11-06 16:39:42.510000+00:00     None  3217   4825   JPEG
2   human-pose-estimation/10.jpg  1726248  CO2DmtGSyIkDEAE=         1 2024-11-06 16:39:42.236000+00:00     None  2792   3634   JPEG
3   human-pose-estimation/11.jpg  2257008  CPy6ndGSyIkDEAE=         1 2024-11-06 16:39:42.291000+00:00     None  3573   5242   JPEG
4   human-pose-estimation/12.jpg   792329  CNH9g9GSyIkDEAE=         1 2024-11-06 16:39:41.865000+00:00     None  5247   3498   JPEG
5   human-pose-estimation/13.jpg  2676811  CITOpdGSyIkDEAE=         1 2024-11-06 16:39:42.423000+00:00     None  2997   4496   JPEG
6   human-pose-estimation/14.jpg  1274475  CJPemdGSyIkDEAE=         1 2024-11-06 16:39:42.224000+00:00     None  5472   3648   JPEG
7   human-pose-estimation/15.jpg   993245  CPyXj9GSyIkDEAE=         1 2024-11-06 16:39:42.047000+00:00     None  2969   4453   JPEG
8   human-pose-estimation/16.jpg  2368068  CIGYotGSyIkDEAE=         1 2024-11-06 16:39:42.356000+00:00     None  2968   4016   JPEG
9   human-pose-estimation/17.jpg  1965589  CI3bmdGSyIkDEAE=         1 2024-11-06 16:39:42.225000+00:00     None  3265   4898   JPEG
10  human-pose-estimation/18.jpg  3172189  CP6g39GSyIkDEAE=         1 2024-11-06 16:39:43.337000+00:00     None  3744   5616   JPEG
11  human-pose-estimation/19.jpg  1044142  CISzkdGSyIkDEAE=         1 2024-11-06 16:39:42.072000+00:00     None  4698   7050   JPEG
12   human-pose-estimation/2.JPG  2809438  CPLPoNGSyIkDEAE=         1 2024-11-06 16:39:42.323000+00:00     None  3648   5472   JPEG
13  human-pose-estimation/20.jpg  1257389  CJiulNGSyIkDEAE=         1 2024-11-06 16:39:42.140000+00:00     None  4896   3264   JPEG
14  human-pose-estimation/21.jpg  2062542  CIKZntGSyIkDEAE=         1 2024-11-06 16:39:42.304000+00:00     None  4851   3234   JPEG
15  human-pose-estimation/22.jpg  3014260  CMPSqNGSyIkDEAE=         1 2024-11-06 16:39:42.464000+00:00     None  5461   8192   JPEG
16  human-pose-estimation/23.jpg  1778821  CJL/ltGSyIkDEAE=         1 2024-11-06 16:39:42.179000+00:00     None  3705   5557   JPEG
17  human-pose-estimation/24.jpg  3823660  CPPu49GSyIkDEAE=         1 2024-11-06 16:39:43.436000+00:00     None  3648   5472   JPEG
18  human-pose-estimation/25.jpg   779455  CP2TidGSyIkDEAE=         1 2024-11-06 16:39:41.927000+00:00     None  2829   3149   JPEG
19  human-pose-estimation/26.jpg   750505  CMDrwdGSyIkDEAE=         1 2024-11-06 16:39:42.884000+00:00     None  3456   5184   JPEG

[Limited by 20 rows]
$

TODO

  • tests

@dreadatour dreadatour requested review from shcheklein, skshetry and a team March 14, 2025 15:41
@dreadatour dreadatour self-assigned this Mar 14, 2025
Copy link

cloudflare-workers-and-pages bot commented Mar 15, 2025

Deploying datachain-documentation with  Cloudflare Pages  Cloudflare Pages

Latest commit: 237e3e0
Status: ✅  Deploy successful!
Preview URL: https://a6d039bc.datachain-documentation.pages.dev
Branch Preview URL: https://respect-udf-param-types.datachain-documentation.pages.dev

View logs

Copy link

codecov bot commented Mar 15, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 88.18%. Comparing base (98fd50c) to head (237e3e0).
Report is 1 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #973      +/-   ##
==========================================
+ Coverage   88.08%   88.18%   +0.10%     
==========================================
  Files         133      133              
  Lines       12123    12145      +22     
  Branches     1687     1695       +8     
==========================================
+ Hits        10678    10710      +32     
+ Misses       1025     1017       -8     
+ Partials      420      418       -2     
Flag Coverage Δ
datachain 88.11% <100.00%> (+0.10%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@@ -1003,8 +1003,9 @@ def _udf_to_obj(
func: Optional[Union[Callable, UDFObjT]],
params: Union[None, str, Sequence[str]],
output: OutputType,
signal_map,
signal_map: dict[str, Callable],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just improving typings here.

) -> UDFObjT:
is_batch = target_class.is_input_batched
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be able to check UDF function input arguments for batch UDFs.

Comment on lines +1019 to +1021
params_schema = signals_schema.slice(
sign.params, self._setup, is_batch=is_batch
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to be able to know if UDF is batching or now when we do checking UDF input argument types.

@@ -130,7 +130,7 @@ def read_meta( # noqa: C901
#

def parse_data(
file: File,
file: TextFile,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because File.open returns binary data, and we need text (str) below. Now File types in UDF will be automatically converted based on UDF input param.

Comment on lines 466 to 471
def slice(
self, keys: Sequence[str], setup: Optional[dict[str, Callable]] = None
self,
params: dict[str, Union[DataType, Any]],
setup: Optional[dict[str, Callable]] = None,
is_batch: bool = False,
) -> "SignalSchema":
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main changes of this PR goes into this function.

params is now a dictionary with param names as keys and types as values instead of just a list of input params names.

is_batch is needed to be able to check batching UDF input params types.

Let's say we do have a signal foo with type int, then regular UDF might looks like this:

def process(foo: int) -> int:
    ...

and batching UDF looks like this:

def process_batch(foo: list[int]) -> int:
    ...

To be able to check types here we need to know if UDF is batching or not.

if param_origin is Union and type(None) in get_args(param_type):
param_type = get_args(param_type)[0]

if is_batch:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of batch UDFs these both declarations will be the same:

def process(foo: list[int]) -> int:
    ...

and

def process(foo: list) -> int:
    ...

Comment on lines +510 to +517
if param_type == schema_type or (
isclass(param_type)
and isclass(schema_type)
and issubclass(param_type, File)
and issubclass(schema_type, File)
):
schema[param] = schema_type
continue
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UDF input param type should be the same as DataSet signal type.

Special case here: we are converting File signal to TextFile, ImageFile or VideoFile based on UDF input param type signature, and vise-versa, TextFile DataSet signal can be used as regular File type if needed.

Let's say we do have DataSet with File type:

DataChain.from_storage("s3://bucket/")

if we know these files are image files, we can use them in UDF like this:

def process(file: ImageFile) -> ImageMeta:
    return file.get_info()

This might be useful, since File class have no get_info method.

JFYI, other possible way to do this is:

def process(file: File) -> ImageMeta:
    return file.as_image_file().get_info()

Same forks in the opposite way. Let's say, we do have DataSet with ImageFile files:

DataChain.from_storage("s3://bucket/", type="image")

we can use regular File in UDF:

def process(file: File) -> int:
    ...

type will be converted automatically in both ways.

@@ -18,7 +18,7 @@ def __init__(self, chain: str, msg):
@dataclass
class UdfSignature:
func: Union[Callable, UDFBase]
params: Sequence[str]
params: dict[str, Union[DataType, Any]]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UDF signature params is now a dictionary with param names as keys and param types as values, instead of just a list of param names.

Comment on lines +61 to 63
func_params_map_sign, func_outs_sign, is_iterator = cls._func_signature(
chain, udf_func
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why not to use self here 👀

Comment on lines +67 to +69
udf_params = (
{params: Any} if isinstance(params, str) else dict.fromkeys(params, Any)
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Any type here for params with unknown type, since we don't have an Unknown type in Python 👀

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it be None? what is parameter type is type annotation is missing ... e.g. how does mypy treat it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the case when params is set in map chain method. Here we are not checking UDF signature param types, e.g.:

dc.map(lambda file: file.parse_info(), params="file")

or

def process_signals(signal1, signal2) -> int:
    return signal1 + signal2

dc.map(process_signals, params=["signal1", "signal2"])

Answering your question, if parameter type is missing, inspect will return it as inspect.Parameter.empty (see below). In this case we are returning it's type as Any (since where is no Unknown type in Python).

@dreadatour dreadatour changed the title Use UDF param types in signal schema Check UDF params types in SignalSchema Mar 16, 2025
@dreadatour dreadatour requested a review from dmpetrov March 16, 2025 06:27
@@ -159,6 +159,7 @@ def process(self, file) -> list[float]:
```
"""

is_input_batched = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use the same is_output_batch? (may be rename it to `is_batched)? it seems it always has the same value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For generators input will not be batched (single row), but output is batched, thus we need to have separate params for these.


param_type = get_args(param_type)[0]

if param_type == schema_type or (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we do this only for File? does it make sense to make it general?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. For Files we are sure no additional fields are exists in different File type models (File, ImageFile, VideoFile, etc). But in general other models inherited from each other may have additional fields and simple type conversion may fails. We can also check this, but it will complicate things and I am not sure I can see the value here.

We can do it later if needed.

@ilongin
Copy link
Contributor

ilongin commented Mar 17, 2025

Haven't looked into the PR yet but am I the only one little bit concerned with these implicit File conversions? ... What worries me is that we are not consistent, i.e int -> str is not converted automatically but File -> ImageFile is.
Is the only reason for this to avoid that as_image_file() method call by user in UDF itself, or there's more to it?

@dreadatour
Copy link
Contributor Author

int -> str is not converted automatically but File -> ImageFile is

This is because int and str are completely different types, while ImageFile is inherited from the File model and is, basically, the same type, just with some additional helper methods defined.

I prefer to think about this the same way as int and float types behave:

$ cat sum.py
def sum(x: float, y: float) -> float:
    return x + y

print(sum(3, 4))
$ python sum.py
7
$ mypy sum.py
Success: no issues found in 1 source file
$

Different types, but converted seamless without any warning. Looks logical, intuitive and natural to me 😇

@ilongin
Copy link
Contributor

ilongin commented Mar 18, 2025

int -> str is not converted automatically but File -> ImageFile is

This is because int and str are completely different types, while ImageFile is inherited from the File model and is, basically, the same type, just with some additional helper methods defined.

I prefer to think about this the same way as int and float types behave:

$ cat sum.py
def sum(x: float, y: float) -> float:
    return x + y

print(sum(3, 4))
$ python sum.py
7
$ mypy sum.py
Success: no issues found in 1 source file
$

Different types, but converted seamless without any warning. Looks logical, intuitive and natural to me 😇

Yes, there are some implicit casting happening in some languages, but what I was worried more is to be inconsistent and to only do implicit casting for File objects. If we are always implicitly casting subclasses for any kind of custom user classes then I'm ok with this.

@dreadatour
Copy link
Contributor Author

If we are always implicitly casting subclasses for any kind of custom user classes then I'm ok with this.

Then IMO we need to make sure no additional fields were added in subclasses. See my comment above.

@dreadatour dreadatour requested a review from shcheklein March 18, 2025 18:32
@dreadatour dreadatour merged commit 512ee2b into main Mar 19, 2025
35 checks passed
@dreadatour dreadatour deleted the respect-udf-param-types branch March 19, 2025 01:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants