Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
grutt committed Jul 25, 2024
1 parent 44a241f commit b96add4
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 23 deletions.
7 changes: 3 additions & 4 deletions hatchet_sdk/clients/dispatcher/action_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from hatchet_sdk.utils.backoff import exp_backoff_sleep

from ...loader import ClientConfig
from ...logger import logger
from hatchet_sdk.logger import logger
from ...metadata import get_metadata
from ..events import proto_timestamp_now

Expand Down Expand Up @@ -81,7 +81,6 @@ class ActionListener:
run_heartbeat: bool = field(default=True, init=False)
listen_strategy: str = field(default="v2", init=False)
stop_signal: bool = field(default=False, init=False)
logger = logger

missed_heartbeats: int = field(default=0, init=False)

Expand Down Expand Up @@ -244,7 +243,7 @@ async def _generator(self) -> AsyncGenerator[Action, None]:
# Handle different types of errors
if e.code() == grpc.StatusCode.CANCELLED:
# Context cancelled, unsubscribe and close
self.logger.debug("Context cancelled, closing listener")
logger.debug("Context cancelled, closing listener")
elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
logger.info("Deadline exceeded, retrying subscription")
elif (
Expand Down Expand Up @@ -280,7 +279,7 @@ def map_action_type(self, action_type):
elif action_type == ActionType.START_GET_GROUP_KEY:
return START_GET_GROUP_KEY
else:
# self.logger.error(f"Unknown action type: {action_type}")
# logger.error(f"Unknown action type: {action_type}")
return None

async def get_listen_client(self):
Expand Down
2 changes: 1 addition & 1 deletion hatchet_sdk/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def workflow_run_id(self):
return self.action.workflow_run_id

def cancel(self):
logger.info("Cancelling step...")
logger.debug("cancelling step...")
self.exit_flag = True

# done returns true if the context has been cancelled
Expand Down
2 changes: 1 addition & 1 deletion hatchet_sdk/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

# Create a named logger
logger = logging.getLogger("hatchet")
logger.setLevel(logging.ERROR)
logger.setLevel(logging.INFO)

handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter("[%(levelname)s]\t🪓 -- %(asctime)s - %(message)s")
Expand Down
6 changes: 3 additions & 3 deletions hatchet_sdk/worker/action_listener_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,19 +158,19 @@ async def start_action_loop(self):
type=STEP_EVENT_TYPE_STARTED, # TODO ack type
)
)
logger.debug(
logger.info(
f"rx: start step run: {action.step_run_id}/{action.action_id}"
)
case ActionType.CANCEL_STEP_RUN:
logger.debug(f"rx: cancel step run: {action.step_run_id}")
logger.info(f"rx: cancel step run: {action.step_run_id}")
case ActionType.START_GET_GROUP_KEY:
self.event_queue.put(
ActionEvent(
action=action,
type=GROUP_KEY_EVENT_TYPE_STARTED, # TODO ack type
)
)
logger.debug(
logger.info(
f"rx: start group key: {action.get_group_key_run_id}"
)
case _:
Expand Down
22 changes: 10 additions & 12 deletions hatchet_sdk/worker/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,18 +153,18 @@ def __init__(
def run(self, action: Action):
match action.action_type:
case ActionType.START_STEP_RUN:
logger.debug(
f"run start step run: {action.action_id}/{action.step_run_id}"
logger.info(
f"run: start step: {action.action_id}/{action.step_run_id}"
)
asyncio.create_task(self.handle_start_step_run(action))
case ActionType.CANCEL_STEP_RUN:
logger.debug(
f"cancel step run: {action.action_id}/{action.step_run_id}"
logger.info(
f"cancel: step run: {action.action_id}/{action.step_run_id}"
)
asyncio.create_task(self.handle_cancel_action(action.step_run_id))
case ActionType.START_GET_GROUP_KEY:
logger.debug(
f"run get group key: {action.action_id}/{action.get_group_key_run_id}"
logger.info(
f"run: get group key: {action.action_id}/{action.get_group_key_run_id}"
)
asyncio.create_task(self.handle_start_group_key_run(action))
case _:
Expand Down Expand Up @@ -193,7 +193,7 @@ def inner_callback(task: asyncio.Task):
)
)

logger.debug(
logger.error(
f"failed step run: {action.action_id}/{action.step_run_id}"
)

Expand All @@ -206,7 +206,7 @@ def inner_callback(task: asyncio.Task):
)
)

logger.debug(
logger.info(
f"finished step run: {action.action_id}/{action.step_run_id}"
)

Expand All @@ -233,7 +233,7 @@ def inner_callback(task: asyncio.Task):
)
)

logger.debug(
logger.error(
f"failed step run: {action.action_id}/{action.step_run_id}"
)

Expand All @@ -246,7 +246,7 @@ def inner_callback(task: asyncio.Task):
)
)

logger.debug(
logger.info(
f"finished step run: {action.action_id}/{action.step_run_id}"
)

Expand Down Expand Up @@ -311,8 +311,6 @@ def cleanup_run_id(self, run_id: str):
del self.contexts[run_id]

async def handle_start_step_run(self, action: Action):
logger.debug(f"Starting step run {action.step_run_id}")

action_name = action.action_id
context = Context(
action,
Expand Down
5 changes: 3 additions & 2 deletions hatchet_sdk/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def status(self) -> WorkerStatus:

return self._status

def async_start(self):
async def async_start(self):
return self.start()

def setup_loop(self):
Expand All @@ -112,7 +112,8 @@ def setup_loop(self):
## Start methods
def start(self):
main_pid = os.getpid()
logger.debug(f"------------------------------------------")
logger.info(f"------------------------------------------")
logger.info(f"STARTING HATCHET...")
logger.debug(f"worker runtime starting on PID: {main_pid}")

if len(self.action_registry.keys()) == 0:
Expand Down

0 comments on commit b96add4

Please sign in to comment.