Skip to content

Commit

Permalink
Add method to get last execution history event
Browse files Browse the repository at this point in the history
  • Loading branch information
EpicWink committed Jan 5, 2022
1 parent b7ff956 commit 4492112
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/swf_typed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
from ._history import WorkflowExecutionTimedOutEvent

from ._history import get_execution_history
from ._history import get_last_execution_history_event

from ._state import TaskStatus
from ._state import TimerStatus
Expand Down
27 changes: 27 additions & 0 deletions src/swf_typed/_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -1917,3 +1917,30 @@ def get_execution_history(
reverseOrder=reverse,
)
return _common.iter_paged(call, Event.from_api, "events")


def get_last_execution_history_event(
execution: "_executions.ExecutionId",
domain: str,
client: "botocore.client.BaseClient" = None,
) -> Event:
"""Get last workflow execution history event.
Args:
execution: workflow execution to get history event of
domain: domain of workflow execution
client: SWF client
Returns:
most recent workflow execution history event
"""

client = _common.ensure_client(client)
response = client.get_workflow_execution_history(
domain=domain,
execution=execution.to_api(),
reverseOrder=True,
maximumPageSize=1,
)
last_event_data = response["events"][0] # all executions have events
return Event.from_api(last_event_data)

0 comments on commit 4492112

Please sign in to comment.