Skip to content

Commit

Permalink
[resources] support async yield_for_execution
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Feb 28, 2025
1 parent 12c4625 commit 60641e3
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 18 deletions.
18 changes: 15 additions & 3 deletions python_modules/dagster/dagster/_config/pythonic_config/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,15 +407,27 @@ def _initialize_and_run(self, context: InitResourceContext) -> TResValue:

@contextlib.contextmanager
def _initialize_and_run_cm(
self, context: InitResourceContext
self,
context: InitResourceContext,
) -> Generator[TResValue, None, None]:
with self._resolve_and_update_nested_resources(context) as has_nested_resource:
updated_resource = has_nested_resource.with_replaced_resource_context( # noqa: SLF001
context
)._with_updated_values(context.resource_config)

with updated_resource.yield_for_execution(context) as value:
yield value
resource_cm = updated_resource.yield_for_execution(context)
if isinstance(resource_cm, contextlib.AbstractAsyncContextManager):
aio_exit_stack = contextlib.AsyncExitStack()
try:
value = context.event_loop.run_until_complete(
aio_exit_stack.enter_async_context(resource_cm)
)
yield value
finally:
context.event_loop.run_until_complete(aio_exit_stack.aclose())
else:
with resource_cm as value:
yield value

def setup_for_execution(self, context: InitResourceContext) -> None:
"""Optionally override this method to perform any pre-execution steps
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from collections.abc import Mapping
from typing import Any, Optional, Union

Expand Down Expand Up @@ -38,6 +39,7 @@ def __init__(
instance: Optional[DagsterInstance] = None,
dagster_run: Optional[DagsterRun] = None,
log_manager: Optional[DagsterLogManager] = None,
event_loop=None,
):
self._resource_config = resource_config
self._resource_def = resource_def
Expand All @@ -46,6 +48,7 @@ def __init__(
self._instance = instance
self._resources = resources
self._dagster_run = dagster_run
self._event_loop = event_loop

@public
@property
Expand Down Expand Up @@ -115,6 +118,10 @@ def replace_config(self, config: Any) -> "InitResourceContext":
log_manager=self.log,
)

@property
def event_loop(self) -> asyncio.AbstractEventLoop:
return self._event_loop # type: ignore


class UnboundInitResourceContext(InitResourceContext):
"""Resource initialization context outputted by ``build_init_resource_context``.
Expand Down
11 changes: 10 additions & 1 deletion python_modules/dagster/dagster/_core/execution/context/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,14 @@ def __init__(
plan_data: PlanData,
execution_data: ExecutionData,
log_manager: DagsterLogManager,
output_capture: Optional[dict[StepOutputHandle, Any]] = None,
output_capture: Optional[dict[StepOutputHandle, Any]],
event_loop,
):
self._plan_data = plan_data
self._execution_data = execution_data
self._log_manager = log_manager
self._output_capture = output_capture
self._event_loop = event_loop

@property
def plan_data(self) -> PlanData:
Expand All @@ -317,6 +319,10 @@ def plan_data(self) -> PlanData:
def output_capture(self) -> Optional[dict[StepOutputHandle, Any]]:
return self._output_capture

@property
def event_loop(self) -> Optional[dict[StepOutputHandle, Any]]:
return self._event_loop

def for_step(
self,
step: ExecutionStep,
Expand All @@ -333,6 +339,7 @@ def for_step(
step=step,
output_capture=self.output_capture,
known_state=known_state,
event_loop=self.event_loop,
)

@property
Expand Down Expand Up @@ -408,6 +415,7 @@ def __init__(
step: ExecutionStep,
output_capture: Optional[dict[StepOutputHandle, Any]],
known_state: Optional["KnownExecutionState"],
event_loop,
):
from dagster._core.execution.resources_init import get_required_resource_keys_for_step

Expand All @@ -416,6 +424,7 @@ def __init__(
execution_data=execution_data,
log_manager=log_manager,
output_capture=output_capture,
event_loop=event_loop,
)
self._step = step
self._required_resource_keys = get_required_resource_keys_for_step(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging
import sys
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -215,6 +216,7 @@ def execution_context_event_generator(

log_manager = create_log_manager(context_creation_data)
resource_defs = job_def.get_required_resource_defs()
event_loop = asyncio.new_event_loop()

resources_manager = scoped_resources_builder_cm(
resource_defs=resource_defs,
Expand All @@ -225,6 +227,7 @@ def execution_context_event_generator(
resource_keys_to_init=context_creation_data.resource_keys_to_init,
instance=instance,
emit_persistent_events=True,
event_loop=event_loop,
)
yield from resources_manager.generate_setup_events()
scoped_resources_builder = check.inst(resources_manager.get_object(), ScopedResourcesBuilder)
Expand All @@ -234,6 +237,7 @@ def execution_context_event_generator(
execution_data=create_execution_data(context_creation_data, scoped_resources_builder),
log_manager=log_manager,
output_capture=output_capture,
event_loop=event_loop,
)

_validate_plan_with_context(execution_context, execution_plan)
Expand Down
27 changes: 14 additions & 13 deletions python_modules/dagster/dagster/_core/execution/plan/compute.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import inspect
from collections.abc import AsyncIterator, Iterator, Mapping, Sequence
from typing import Any, TypeVar, Union
Expand Down Expand Up @@ -119,18 +118,20 @@ def _validate_event(event: Any, step_context: StepExecutionContext) -> OpOutputU
return event


def gen_from_async_gen(async_gen: AsyncIterator[T]) -> Iterator[T]:
def gen_from_async_gen(
context: StepExecutionContext,
async_gen: AsyncIterator[T],
) -> Iterator[T]:
# prime use for asyncio.Runner, but new in 3.11 and did not find appealing backport
loop = asyncio.new_event_loop()
try:
while True:
try:
yield loop.run_until_complete(async_gen.__anext__())
except StopAsyncIteration:
return
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
# try:
while True:
try:
yield context.event_loop.run_until_complete(async_gen.__anext__())
except StopAsyncIteration:
return
# finally:
# loop.run_until_complete(loop.shutdown_asyncgens())
# loop.close()


def _yield_compute_results(
Expand All @@ -152,7 +153,7 @@ def _yield_compute_results(
return

if inspect.isasyncgen(user_event_generator):
user_event_generator = gen_from_async_gen(user_event_generator)
user_event_generator = gen_from_async_gen(step_context, user_event_generator)

op_label = step_context.describe_op()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def resource_initialization_manager(
resource_keys_to_init: Optional[AbstractSet[str]],
instance: Optional[DagsterInstance],
emit_persistent_events: Optional[bool],
event_loop,
):
generator = resource_initialization_event_generator(
resource_defs=resource_defs,
Expand All @@ -55,6 +56,7 @@ def resource_initialization_manager(
resource_keys_to_init=resource_keys_to_init,
instance=instance,
emit_persistent_events=emit_persistent_events,
event_loop=event_loop,
)
return EventGenerationManager(generator, ScopedResourcesBuilder)

Expand Down Expand Up @@ -121,6 +123,7 @@ def _core_resource_initialization_event_generator(
resource_keys_to_init: Optional[AbstractSet[str]],
instance: Optional[DagsterInstance],
emit_persistent_events: Optional[bool],
event_loop,
):
job_name = "" # Must be initialized to a string to satisfy typechecker
contains_generator = False
Expand Down Expand Up @@ -166,6 +169,7 @@ def _core_resource_initialization_event_generator(
resources=resources,
instance=instance,
all_resource_defs=resource_defs,
event_loop=event_loop,
)
manager = single_resource_generation_manager(
resource_context, resource_name, resource_def
Expand Down Expand Up @@ -217,6 +221,7 @@ def resource_initialization_event_generator(
resource_keys_to_init: Optional[AbstractSet[str]],
instance: Optional[DagsterInstance],
emit_persistent_events: Optional[bool],
event_loop,
):
check.inst_param(log_manager, "log_manager", DagsterLogManager)
resource_keys_to_init = check.opt_set_param(
Expand Down Expand Up @@ -251,6 +256,7 @@ def resource_initialization_event_generator(
resource_keys_to_init=resource_keys_to_init,
instance=instance,
emit_persistent_events=emit_persistent_events,
event_loop=event_loop,
)
except GeneratorExit:
# Shouldn't happen, but avoid runtime-exception in case this generator gets GC-ed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
from contextlib import asynccontextmanager

from dagster import Output, asset, materialize
from dagster import ConfigurableResource, Output, asset, materialize
from dagster._core.definitions.decorators import op
from dagster._utils.test import wrap_op_in_graph_and_execute
from pydantic import PrivateAttr


def test_aio_op():
Expand Down Expand Up @@ -33,3 +35,27 @@ async def aio_gen(_):

result = wrap_op_in_graph_and_execute(aio_gen)
assert result.output_value() == "done"


def test_aio_resource():
class AioResource(ConfigurableResource):
_loop = PrivateAttr()

@property
def loop(self):
return self._loop

@asynccontextmanager
async def yield_for_execution(self, context):
await asyncio.sleep(0)
self._loop = asyncio.get_running_loop()
yield self

@op
async def aio_op(aio_resource: AioResource):
await asyncio.sleep(0)
assert aio_resource.loop is asyncio.get_running_loop()
return "done"

result = wrap_op_in_graph_and_execute(aio_op, {"aio_resource": AioResource()})
assert result.output_value() == "done"

0 comments on commit 60641e3

Please sign in to comment.