-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[resources] support async yield_for_execution #28142
base: master
Are you sure you want to change the base?
[resources] support async yield_for_execution #28142
Conversation
60641e3
to
b0a6b8b
Compare
b0a6b8b
to
2ac2079
Compare
2ac2079
to
d85553d
Compare
def gen_from_async_gen( | ||
context: StepExecutionContext, | ||
async_gen: AsyncIterator[T], | ||
) -> Iterator[T]: | ||
# prime use for asyncio.Runner, but new in 3.11 and did not find appealing backport | ||
loop = asyncio.new_event_loop() | ||
try: | ||
while True: | ||
try: | ||
yield loop.run_until_complete(async_gen.__anext__()) | ||
except StopAsyncIteration: | ||
return | ||
finally: | ||
loop.run_until_complete(loop.shutdown_asyncgens()) | ||
loop.close() | ||
# try: | ||
while True: | ||
try: | ||
yield context.event_loop.run_until_complete(async_gen.__anext__()) | ||
except StopAsyncIteration: | ||
return | ||
# finally: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The commented try
/finally
blocks should be removed since they're not being used. More importantly, this code needs proper cleanup of async generators to prevent resource leaks. The event loop's shutdown_asyncgens()
call that was previously in the finally
block needs to be preserved somewhere in the execution lifecycle, likely at a higher level where the event loop itself is managed.
Spotted by Graphite Reviewer
Is this helpful? React 👍 or 👎 to let us know.
d85553d
to
f3f32c5
Compare
f3f32c5
to
dc5d2e3
Compare
Add support for async context manager backed resources by passing the same event loop to resource init and core execution.
The biggest bit of awkwardness here is that to ensure the same event loop is used for both resource init and execution in the direct execution case you have to manually pass the loop in to context creation and then use it to execute your asset/op. This is demonstrated in the test.
resolves #27974
How I Tested These Changes
added tests
Changelog
async def yield_for_execution
is now supported onConfigurableResource
. Anevent_loop
argument has been added to context builders to support direct execution.