Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
grutt committed Jul 25, 2024
1 parent 0c578d1 commit 92b2755
Show file tree
Hide file tree
Showing 11 changed files with 355 additions and 313 deletions.
9 changes: 6 additions & 3 deletions examples/simple/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@

@hatchet.workflow(on_events=["user:create"])
class MyWorkflow:
@hatchet.step(timeout="2s", retries=3)
@hatchet.step(timeout="9s", retries=3)
def step1(self, context: Context):
print("executed step1")
time.sleep(10)
pass
time.sleep(2)
raise Exception("test")
return {
"step1": "step1",
}


def main():
Expand Down
3 changes: 2 additions & 1 deletion hatchet_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from hatchet_sdk.clients.run_event_listener import RunEventListenerClient
from hatchet_sdk.clients.workflow_listener import PooledWorkflowRunListener
from hatchet_sdk.connection import new_conn
from hatchet_sdk.logger import logger

from .clients.admin import AdminClient, new_admin
from .clients.dispatcher import DispatcherClient, new_dispatcher
Expand All @@ -28,7 +29,7 @@ def from_environment(
cls,
defaults: ClientConfig = ClientConfig(),
debug: bool = False,
*opts_functions: Callable[[ClientConfig], None]
*opts_functions: Callable[[ClientConfig], None],
):
config: ClientConfig = ConfigLoader(".").load_client_config(defaults)
for opt_function in opts_functions:
Expand Down
45 changes: 44 additions & 1 deletion hatchet_sdk/clients/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import threading
import time
from dataclasses import dataclass, field
from typing import Any, AsyncGenerator, List, Optional
from typing import Any, AsyncGenerator, List, Optional, overload

import grpc
from google.protobuf.timestamp_pb2 import Timestamp
from grpc._cython import cygrpc

from hatchet_sdk.clients.event_ts import Event_ts, read_with_interrupt
Expand All @@ -16,11 +17,13 @@
ActionType,
AssignedAction,
GroupKeyActionEvent,
GroupKeyActionEventType,
HeartbeatRequest,
OverridesData,
RefreshTimeoutRequest,
ReleaseSlotRequest,
StepActionEvent,
StepActionEventType,
WorkerListenRequest,
WorkerRegisterRequest,
WorkerRegisterResponse,
Expand Down Expand Up @@ -356,12 +359,52 @@ async def send_step_action_event(self, in_: StepActionEvent):
metadata=get_metadata(self.token),
)

async def send_step_action_event_simple(
self, action: Action, event_type: StepActionEventType, payload: str
):
eventTimestamp = Timestamp()
eventTimestamp.GetCurrentTime()

event = StepActionEvent(
workerId=action.worker_id,
jobId=action.job_id,
jobRunId=action.job_run_id,
stepId=action.step_id,
stepRunId=action.step_run_id,
actionId=action.action_id,
eventTimestamp=eventTimestamp,
eventType=event_type,
eventPayload=payload,
)

return await self.send_step_action_event(event)

async def send_group_key_action_event(self, in_: GroupKeyActionEvent):
await self.aio_client.SendGroupKeyActionEvent(
in_,
metadata=get_metadata(self.token),
)

async def send_group_key_action_event_simple(
self, action: Action, event_type: GroupKeyActionEventType, payload: str
):
eventTimestamp = Timestamp()
eventTimestamp.GetCurrentTime()

event = GroupKeyActionEvent(
workerId=action.worker_id,
jobId=action.job_id,
jobRunId=action.job_run_id,
stepId=action.step_id,
stepRunId=action.step_run_id,
actionId=action.action_id,
eventTimestamp=eventTimestamp,
eventType=event_type,
eventPayload=payload,
)

return await self.send_group_key_action_event(event)

def put_overrides_data(self, data: OverridesData):
response: ActionEventResponse = self.client.PutOverridesData(
data,
Expand Down
2 changes: 1 addition & 1 deletion hatchet_sdk/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
logger.setLevel(logging.ERROR)

handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter("[%(levelname)s] 🪓 -- %(asctime)s - %(message)s")
formatter = logging.Formatter("[%(levelname)s]\t🪓 -- %(asctime)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)

Expand Down
81 changes: 0 additions & 81 deletions hatchet_sdk/worker/action_listener.py

This file was deleted.

Loading

0 comments on commit 92b2755

Please sign in to comment.