Skip to content

Commit

Permalink
Merge pull request #133 from macwilk/mac/sync-async-utils
Browse files Browse the repository at this point in the history
Add Async Utilities to Mitigate Blocking Sync Functions
  • Loading branch information
grutt authored Aug 13, 2024
2 parents 645c7dc + 15e1c68 commit 2413114
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 3 deletions.
95 changes: 95 additions & 0 deletions examples/sync_to_async/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import asyncio
import os
import time

from dotenv import load_dotenv

from hatchet_sdk import Context, sync_to_async
from hatchet_sdk.v2.hatchet import Hatchet

os.environ["PYTHONASYNCIODEBUG"] = "1"
load_dotenv()

hatchet = Hatchet(debug=True)


@hatchet.function()
async def fanout_sync_async(context: Context) -> dict:
print("spawning child")

context.put_stream("spawning...")
results = []

n = context.workflow_input().get("n", 10)

start_time = time.time()
for i in range(n):
results.append(
(
await context.aio.spawn_workflow(
"Child",
{"a": str(i)},
key=f"child{i}",
options={"additional_metadata": {"hello": "earth"}},
)
).result()
)

result = await asyncio.gather(*results)

execution_time = time.time() - start_time
print(f"Completed in {execution_time:.2f} seconds")

return {"results": result}


@hatchet.workflow(on_events=["child:create"])
class Child:
###### Example Functions ######
def sync_blocking_function(self):
time.sleep(5)
return {"type": "sync_blocking"}

@sync_to_async # this makes the function async safe!
def decorated_sync_blocking_function(self):
time.sleep(5)
return {"type": "decorated_sync_blocking"}

@sync_to_async # this makes the async function loop safe!
async def async_blocking_function(self):
time.sleep(5)
return {"type": "async_blocking"}

###### Hatchet Steps ######
@hatchet.step()
async def handle_blocking_sync_in_async(self, context: Context):
wrapped_blocking_function = sync_to_async(self.sync_blocking_function)

# This will now be async safe!
data = await wrapped_blocking_function()
return {"blocking_status": "success", "data": data}

@hatchet.step()
async def handle_decorated_blocking_sync_in_async(self, context: Context):
data = await self.decorated_sync_blocking_function()
return {"blocking_status": "success", "data": data}

@hatchet.step()
async def handle_blocking_async_in_async(self, context: Context):
data = await self.async_blocking_function()
return {"blocking_status": "success", "data": data}

@hatchet.step()
async def non_blocking_async(self, context: Context):
await asyncio.sleep(5)
return {"nonblocking_status": "success"}


def main():
worker = hatchet.worker("fanout-worker", max_runs=50)
worker.register_workflow(Child())
worker.start()


if __name__ == "__main__":
main()
105 changes: 104 additions & 1 deletion hatchet_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@
UserTenantMembershipsList,
)
from hatchet_sdk.clients.rest.models.user_tenant_public import UserTenantPublic
from hatchet_sdk.clients.rest.models.worker import Worker
from hatchet_sdk.clients.rest.models.worker_list import WorkerList
from hatchet_sdk.clients.rest.models.workflow import Workflow
from hatchet_sdk.clients.rest.models.workflow_deployment_config import (
Expand Down Expand Up @@ -124,6 +123,7 @@
RateLimitDuration,
StickyStrategy,
)
from hatchet_sdk.utils.aio_utils import sync_to_async

from .client import new_client
from .clients.admin import (
Expand All @@ -138,3 +138,106 @@
from .context.worker_context import WorkerContext
from .hatchet import ClientConfig, Hatchet, concurrency, on_failure_step, step, workflow
from .worker import Worker, WorkerStartOptions, WorkerStatus

__all__ = [
"AcceptInviteRequest",
"APIError",
"APIErrors",
"APIMeta",
"APIMetaAuth",
"APIMetaIntegration",
"APIResourceMeta",
"APIToken",
"CreateAPITokenRequest",
"CreateAPITokenResponse",
"CreatePullRequestFromStepRun",
"CreateTenantInviteRequest",
"CreateTenantRequest",
"Event",
"EventData",
"EventKeyList",
"EventList",
"EventOrderByDirection",
"EventOrderByField",
"EventWorkflowRunSummary",
"GetStepRunDiffResponse",
"GithubAppInstallation",
"GithubBranch",
"GithubRepo",
"Job",
"JobRun",
"JobRunStatus",
"LinkGithubRepositoryRequest",
"ListAPITokensResponse",
"ListGithubAppInstallationsResponse",
"ListPullRequestsResponse",
"LogLine",
"LogLineLevel",
"LogLineList",
"LogLineOrderByDirection",
"LogLineOrderByField",
"PaginationResponse",
"PullRequest",
"PullRequestState",
"RejectInviteRequest",
"ReplayEventRequest",
"RerunStepRunRequest",
"Step",
"StepRun",
"StepRunDiff",
"StepRunStatus",
"sync_to_async",
"Tenant",
"TenantInvite",
"TenantInviteList",
"TenantList",
"TenantMember",
"TenantMemberList",
"TenantMemberRole",
"TriggerWorkflowRunRequest",
"UpdateTenantInviteRequest",
"User",
"UserLoginRequest",
"UserRegisterRequest",
"UserTenantMembershipsList",
"UserTenantPublic",
"Worker",
"WorkerList",
"Workflow",
"WorkflowDeploymentConfig",
"WorkflowList",
"WorkflowRun",
"WorkflowRunList",
"WorkflowRunStatus",
"WorkflowRunTriggeredBy",
"WorkflowTag",
"WorkflowTriggerCronRef",
"WorkflowTriggerEventRef",
"WorkflowTriggers",
"WorkflowVersion",
"WorkflowVersionDefinition",
"WorkflowVersionMeta",
"ConcurrencyLimitStrategy",
"CreateWorkflowVersionOpts",
"RateLimitDuration",
"StickyStrategy",
"new_client",
"ChildTriggerWorkflowOptions",
"DedupeViolationErr",
"ScheduleTriggerWorkflowOptions",
"TriggerWorkflowOptions",
"PushEventOptions",
"StepRunEventType",
"WorkflowRunEventType",
"Context",
"WorkerContext",
"ClientConfig",
"Hatchet",
"concurrency",
"on_failure_step",
"step",
"workflow",
"Worker",
"WorkerStartOptions",
"WorkerStatus",
]
71 changes: 71 additions & 0 deletions hatchet_sdk/utils/aio_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import asyncio
import inspect
from functools import partial, wraps


def sync_to_async(func):
"""
A decorator to run a synchronous function or coroutine in an asynchronous context with added
asyncio loop safety.
This decorator allows you to safely call synchronous functions or coroutines from an
asynchronous function by running them in an executor.
Args:
func (callable): The synchronous function or coroutine to be run asynchronously.
Returns:
callable: An asynchronous wrapper function that runs the given function in an executor.
Example:
@sync_to_async
def sync_function(x, y):
return x + y
@sync_to_async
async def async_function(x, y):
return x + y
def undecorated_function(x, y):
return x + y
async def main():
result1 = await sync_function(1, 2)
result2 = await async_function(3, 4)
result3 = await sync_to_async(undecorated_function)(5, 6)
print(result1, result2, result3)
asyncio.run(main())
"""

@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
"""
The asynchronous wrapper function that runs the given function in an executor.
Args:
*args: Positional arguments to pass to the function.
loop (asyncio.AbstractEventLoop, optional): The event loop to use. If None, the current running loop is used.
executor (concurrent.futures.Executor, optional): The executor to use. If None, the default executor is used.
**kwargs: Keyword arguments to pass to the function.
Returns:
The result of the function call.
"""
if loop is None:
loop = asyncio.get_running_loop()

if inspect.iscoroutinefunction(func):
# Wrap the coroutine to run it in an executor
async def wrapper():
return await func(*args, **kwargs)

pfunc = partial(asyncio.run, wrapper())
return await loop.run_in_executor(executor, pfunc)
else:
# Run the synchronous function in an executor
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)

return run
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "hatchet-sdk"
version = "0.34.0a5"
version = "0.34.0a6"
description = ""
authors = ["Alexander Belanger <[email protected]>"]
readme = "README.md"
Expand Down Expand Up @@ -42,7 +42,7 @@ known_third_party = [
"python_dotenv",
"python_dateutil",
"pyyaml",
"urllib3"
"urllib3",
]

[tool.poetry.scripts]
Expand Down

0 comments on commit 2413114

Please sign in to comment.