Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail Fast control for with item task #245

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions orquesta/conducting.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ def has_barrier_next(self, task_id, route=None):
def has_next_tasks(self, task_id=None, route=None):
return self.conductor.has_next_tasks(task_id=task_id, route=route)

def should_failfast(self, task_id, route):
return self.conductor.should_failfast(task_id, route)

@property
def has_active_tasks(self):
return len(self.get_tasks_by_status(statuses.ACTIVE_STATUSES)) > 0
Expand Down Expand Up @@ -689,6 +692,13 @@ def has_next_tasks(self, task_id=None, route=None):

return self._has_next(task_id, route=route)

def should_failfast(self, task_id, route):
task = self.get_task(task_id, route)
if task:
task_spec = task.get("spec")
return task_spec.get_failfast_spec() if task_spec.get_failfast_spec() is not None else True
return True

def get_next_tasks(self):
fail_on_task_rendering = False
staged_tasks = self.workflow_state.get_staged_tasks()
Expand Down
1 change: 1 addition & 0 deletions orquesta/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@
ACTION_FAILED = "action_failed"
ACTION_FAILED_TASK_ACTIVE_ITEMS_INCOMPLETE = "action_failed_task_active_items_incomplete"
ACTION_FAILED_TASK_DORMANT_ITEMS_INCOMPLETE = "action_failed_task_dormant_items_incomplete"
ACTION_FAILED_TASK_DORMANT_ITEMS_INCOMPLETE_CONTINUE = "action_failed_task_dormant_items_incomplete_continue"
ACTION_FAILED_TASK_DORMANT_ITEMS_COMPLETED = "action_failed_task_dormant_items_completed"
ACTION_FAILED_TASK_DORMANT_ITEMS_PAUSED = "action_failed_task_dormant_items_paused"
ACTION_FAILED_TASK_DORMANT_ITEMS_CANCELED = "action_failed_task_dormant_items_canceled"
Expand Down
18 changes: 13 additions & 5 deletions orquesta/machines.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@
events.ACTION_FAILED_TASK_DORMANT_ITEMS_CANCELED: statuses.CANCELED,
events.ACTION_FAILED_TASK_DORMANT_ITEMS_FAILED: statuses.FAILED,
events.ACTION_FAILED_TASK_DORMANT_ITEMS_INCOMPLETE: statuses.FAILED,
events.ACTION_FAILED_TASK_DORMANT_ITEMS_INCOMPLETE_CONTINUE: statuses.RUNNING,
events.ACTION_FAILED_TASK_DORMANT_ITEMS_COMPLETED: statuses.FAILED,
events.ACTION_EXPIRED: statuses.FAILED,
events.ACTION_EXPIRED_TASK_DORMANT_ITEMS_PAUSED: statuses.FAILED,
Expand Down Expand Up @@ -365,6 +366,7 @@
events.ACTION_FAILED_TASK_DORMANT_ITEMS_CANCELED: statuses.CANCELED,
events.ACTION_FAILED_TASK_DORMANT_ITEMS_FAILED: statuses.FAILED,
events.ACTION_FAILED_TASK_DORMANT_ITEMS_INCOMPLETE: statuses.FAILED,
events.ACTION_FAILED_TASK_DORMANT_ITEMS_INCOMPLETE_CONTINUE: statuses.PAUSED,
events.ACTION_FAILED_TASK_DORMANT_ITEMS_COMPLETED: statuses.FAILED,
events.ACTION_EXPIRED: statuses.FAILED,
events.ACTION_EXPIRED_TASK_DORMANT_ITEMS_PAUSED: statuses.FAILED,
Expand Down Expand Up @@ -425,6 +427,7 @@
events.ACTION_FAILED_TASK_DORMANT_ITEMS_CANCELED: statuses.CANCELED,
events.ACTION_FAILED_TASK_DORMANT_ITEMS_FAILED: statuses.CANCELED,
events.ACTION_FAILED_TASK_DORMANT_ITEMS_INCOMPLETE: statuses.CANCELED,
events.ACTION_FAILED_TASK_DORMANT_ITEMS_INCOMPLETE_CONTINUE: statuses.CANCELED,
events.ACTION_FAILED_TASK_DORMANT_ITEMS_COMPLETED: statuses.CANCELED,
events.ACTION_EXPIRED: statuses.FAILED,
events.ACTION_EXPIRED_TASK_DORMANT_ITEMS_PAUSED: statuses.CANCELED,
Expand Down Expand Up @@ -549,6 +552,11 @@ def add_context_to_task_item_event(cls, workflow_state, task_id, task_route, ac_
if not active and canceled:
return action_event + "_items_canceled"

# If not failfast than run the pending incomplete tasks too
failfast = workflow_state.should_failfast(task_id, task_route)
if not active and incomplete and not failfast:
return action_event + "_items_incomplete_continue"

# Attach info on whether there are failed execution on the items and return.
if not active and failed:
return action_event + "_items_failed"
Expand Down Expand Up @@ -786,11 +794,11 @@ def add_context_to_workflow_event(cls, workflow_state, wf_ex_event):

# If the workflow is paused and on resume, check whether it is already completed.
if (
workflow_state.status == statuses.PAUSED
and wf_ex_event.status in [statuses.RUNNING, statuses.RESUMING]
and not workflow_state.has_active_tasks
and not workflow_state.has_staged_tasks
and not workflow_state.has_paused_tasks
workflow_state.status == statuses.PAUSED
and wf_ex_event.status in [statuses.RUNNING, statuses.RESUMING]
and not workflow_state.has_active_tasks
and not workflow_state.has_staged_tasks
and not workflow_state.has_paused_tasks
Comment on lines +797 to +801
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please drop the whitespace only changes. Use black to reformat.

):
workflow_event += "_workflow_completed"

Expand Down
7 changes: 7 additions & 0 deletions orquesta/specs/native/v1/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class TaskSpec(native_v1_specs.Spec):
"input": {"oneOf": [spec_types.NONEMPTY_STRING, spec_types.NONEMPTY_DICT]},
"retry": TaskRetrySpec,
"next": TaskTransitionSequenceSpec,
"failfast": spec_types.STRING_OR_BOOLEAN
},
"additionalProperties": False,
}
Expand Down Expand Up @@ -153,6 +154,12 @@ def has_join(self):
def has_retry(self):
return hasattr(self, "retry") and self.retry

def has_failfast(self):
return hasattr(self, "failfast")

def get_failfast_spec(self):
return getattr(self, "failfast", True)

def render(self, in_ctx):
action_specs = []

Expand Down