-
Notifications
You must be signed in to change notification settings - Fork 1
feat: Preserve task signatures when writing pipelines #328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
yolanfery
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for proposing an improvement here.
I tried testing in PyCharm but I don't see much a difference before and after, not sure if it is my local setup or PyCharm specific. Sorry then I can't go deeper in testing this, right now.
In your current file diff, I think PipelineWithTask is not used anymore so the class can be removed, right ?
I explored a bit with AI and I feel like it would make sense to wrap the result to compensate for the tradeoff you mentioned. Maybe with something like Task[R]
To have something like :
- Make Task generic over return type
class Task(typing.Generic[R]):
def __init__(self, function: typing.Callable[..., R]):
self.result: R | None = None
...
- Add helpful getattr error message
def __getattr__(self, name: str):
"""Catch attempts to access result attributes in the DAG."""
raise TypeError(
f"Cannot access '.{name}' on Task object in pipeline DAG. "
f"Task results are only available inside other task functions..."
)
- Update decorator to return
Task[R]and aCallable[P, Task[R]]
def task(self, function: typing.Callable[P, R]) -> typing.Callable[P, Task[R]]:
@wraps(function)
def wrapper(*task_args: P.args, **task_kwargs: P.kwargs) -> Task[R]:
task = Task(function)(*task_args, **task_kwargs)
self.tasks.append(task)
return task
return wrapper
It brings transparency and (runtime but) explicit error. Does all this make sense ?
|
Yes, great idea! I'll try to test it on Pycharm. I only tested it on VSCode using the Pylance LSP for now. Do you know which LSP is used by Pycharm? It's a custom one right? |
It’s actually a native, custom, and probably proprietary implementation, though PyCharm can be configured to use external LSPs if needed |
|
I'm leaving the PR on hold for now because I'm not sure it's going to improve DX in the end... Might be ok to just write |
The PR adjusts the typing of
PipelineWithTaskandPipeline.task()to improve developer experience when writing pipelines in an IDE. The goal is to improve IDE type hints and autocomplete when using tasks, and allow users to see parameter signatures and docstrings when hovering over functions decorated as tasks.As of today, when hovering over a task in a pipeline DAG, we are losing the function docstring and signature. Which means developers have to scroll down to the task definition to verify the parameters that are needed and their types.
For example, using the
simple_ioexample pipeline:Changes
ParamSpecandTypeVarto make PipelineWithTask generic and preserve types. I think there is a simpler way to do this in Python 3.12 but we still want to support 3.11functools.wrapsto preserve function metadata (e.g.__doc__,__annotations__etc)Pipeline.task()toCallable[P, R]to claim returning the original function return type instead of aTaskobjectAlso, the IDE will now detect type errors when providing wrong task parameters. For example, if the task expects a
GeoDataFramebut you are passing aDataFrame.The trade-off is that if a user tries to manipulate the object returned by a Task in the main pipeline DAG (e.g.
df.head()), they might be confused. The IDE is not going to detect the error because we type it asDataFramewhile it's aTaskat runtime.