From 232b18b8a317ed3f9bc8fe37c3239849ca7cfb59 Mon Sep 17 00:00:00 2001 From: exp-vkishore Date: Thu, 2 Dec 2021 12:01:27 +0530 Subject: [PATCH 1/4] models.py -> Add failfast boolean property in TaskSpec schema --- orquesta/specs/native/v1/models.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/orquesta/specs/native/v1/models.py b/orquesta/specs/native/v1/models.py index 1712dfac..6610c822 100644 --- a/orquesta/specs/native/v1/models.py +++ b/orquesta/specs/native/v1/models.py @@ -125,6 +125,7 @@ class TaskSpec(native_v1_specs.Spec): "input": {"oneOf": [spec_types.NONEMPTY_STRING, spec_types.NONEMPTY_DICT]}, "retry": TaskRetrySpec, "next": TaskTransitionSequenceSpec, + "failfast": spec_types.STRING_OR_BOOLEAN }, "additionalProperties": False, } @@ -153,6 +154,12 @@ def has_join(self): def has_retry(self): return hasattr(self, "retry") and self.retry + def has_failfast(self): + return hasattr(self, "failfast") + + def get_failfast_spec(self): + return getattr(self, "failfast", True) + def render(self, in_ctx): action_specs = [] From b98e54147de3554c49054d4a73879b80660e028b Mon Sep 17 00:00:00 2001 From: exp-vkishore Date: Thu, 2 Dec 2021 12:04:01 +0530 Subject: [PATCH 2/4] machines.py -> Add condition `If not failfast than run the pending incomplete tasks too` in add_context_to_task_item_event --- orquesta/machines.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/orquesta/machines.py b/orquesta/machines.py index 55b41be0..d4bfa6e0 100644 --- a/orquesta/machines.py +++ b/orquesta/machines.py @@ -304,6 +304,7 @@ events.ACTION_FAILED_TASK_DORMANT_ITEMS_CANCELED: statuses.CANCELED, events.ACTION_FAILED_TASK_DORMANT_ITEMS_FAILED: statuses.FAILED, events.ACTION_FAILED_TASK_DORMANT_ITEMS_INCOMPLETE: statuses.FAILED, + events.ACTION_FAILED_TASK_DORMANT_ITEMS_INCOMPLETE_CONTINUE: statuses.RUNNING, events.ACTION_FAILED_TASK_DORMANT_ITEMS_COMPLETED: statuses.FAILED, events.ACTION_EXPIRED: statuses.FAILED, events.ACTION_EXPIRED_TASK_DORMANT_ITEMS_PAUSED: statuses.FAILED, @@ -365,6 +366,7 @@ events.ACTION_FAILED_TASK_DORMANT_ITEMS_CANCELED: statuses.CANCELED, events.ACTION_FAILED_TASK_DORMANT_ITEMS_FAILED: statuses.FAILED, events.ACTION_FAILED_TASK_DORMANT_ITEMS_INCOMPLETE: statuses.FAILED, + events.ACTION_FAILED_TASK_DORMANT_ITEMS_INCOMPLETE_CONTINUE: statuses.PAUSED, events.ACTION_FAILED_TASK_DORMANT_ITEMS_COMPLETED: statuses.FAILED, events.ACTION_EXPIRED: statuses.FAILED, events.ACTION_EXPIRED_TASK_DORMANT_ITEMS_PAUSED: statuses.FAILED, @@ -425,6 +427,7 @@ events.ACTION_FAILED_TASK_DORMANT_ITEMS_CANCELED: statuses.CANCELED, events.ACTION_FAILED_TASK_DORMANT_ITEMS_FAILED: statuses.CANCELED, events.ACTION_FAILED_TASK_DORMANT_ITEMS_INCOMPLETE: statuses.CANCELED, + events.ACTION_FAILED_TASK_DORMANT_ITEMS_INCOMPLETE_CONTINUE: statuses.CANCELED, events.ACTION_FAILED_TASK_DORMANT_ITEMS_COMPLETED: statuses.CANCELED, events.ACTION_EXPIRED: statuses.FAILED, events.ACTION_EXPIRED_TASK_DORMANT_ITEMS_PAUSED: statuses.CANCELED, @@ -549,6 +552,11 @@ def add_context_to_task_item_event(cls, workflow_state, task_id, task_route, ac_ if not active and canceled: return action_event + "_items_canceled" + # If not failfast than run the pending incomplete tasks too + failfast = workflow_state.should_failfast(task_id, task_route) + if not active and incomplete and not failfast: + return action_event + "_items_incomplete_continue" + # Attach info on whether there are failed execution on the items and return. if not active and failed: return action_event + "_items_failed" @@ -786,11 +794,11 @@ def add_context_to_workflow_event(cls, workflow_state, wf_ex_event): # If the workflow is paused and on resume, check whether it is already completed. if ( - workflow_state.status == statuses.PAUSED - and wf_ex_event.status in [statuses.RUNNING, statuses.RESUMING] - and not workflow_state.has_active_tasks - and not workflow_state.has_staged_tasks - and not workflow_state.has_paused_tasks + workflow_state.status == statuses.PAUSED + and wf_ex_event.status in [statuses.RUNNING, statuses.RESUMING] + and not workflow_state.has_active_tasks + and not workflow_state.has_staged_tasks + and not workflow_state.has_paused_tasks ): workflow_event += "_workflow_completed" From 3bbd65e1263e86bca0e4c08f412d4bbc7538fea5 Mon Sep 17 00:00:00 2001 From: exp-vkishore Date: Thu, 2 Dec 2021 12:05:05 +0530 Subject: [PATCH 3/4] conducting.py -> Add getter def should_failfast in WorkflowState --- orquesta/conducting.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/orquesta/conducting.py b/orquesta/conducting.py index 34fb3567..0796e0b5 100644 --- a/orquesta/conducting.py +++ b/orquesta/conducting.py @@ -137,6 +137,9 @@ def has_barrier_next(self, task_id, route=None): def has_next_tasks(self, task_id=None, route=None): return self.conductor.has_next_tasks(task_id=task_id, route=route) + def should_failfast(self, task_id, route): + return self.conductor.should_failfast(task_id, route) + @property def has_active_tasks(self): return len(self.get_tasks_by_status(statuses.ACTIVE_STATUSES)) > 0 @@ -689,6 +692,13 @@ def has_next_tasks(self, task_id=None, route=None): return self._has_next(task_id, route=route) + def should_failfast(self, task_id, route): + task = self.get_task(task_id, route) + if task: + task_spec = task.get("spec") + return task_spec.get_failfast_spec() if task_spec.get_failfast_spec() is not None else True + return True + def get_next_tasks(self): fail_on_task_rendering = False staged_tasks = self.workflow_state.get_staged_tasks() From 92e6cbdfa145ee3574874579f71f9b2edf021efe Mon Sep 17 00:00:00 2001 From: exp-vkishore Date: Thu, 2 Dec 2021 12:06:44 +0530 Subject: [PATCH 4/4] events.py -> Add event ACTION_FAILED_TASK_DORMANT_ITEMS_INCOMPLETE_CONTINUE to continue even if no current active task but incomplete task(s) --- orquesta/events.py | 1 + 1 file changed, 1 insertion(+) diff --git a/orquesta/events.py b/orquesta/events.py index 5a1c09f1..95e56e57 100644 --- a/orquesta/events.py +++ b/orquesta/events.py @@ -216,6 +216,7 @@ ACTION_FAILED = "action_failed" ACTION_FAILED_TASK_ACTIVE_ITEMS_INCOMPLETE = "action_failed_task_active_items_incomplete" ACTION_FAILED_TASK_DORMANT_ITEMS_INCOMPLETE = "action_failed_task_dormant_items_incomplete" +ACTION_FAILED_TASK_DORMANT_ITEMS_INCOMPLETE_CONTINUE = "action_failed_task_dormant_items_incomplete_continue" ACTION_FAILED_TASK_DORMANT_ITEMS_COMPLETED = "action_failed_task_dormant_items_completed" ACTION_FAILED_TASK_DORMANT_ITEMS_PAUSED = "action_failed_task_dormant_items_paused" ACTION_FAILED_TASK_DORMANT_ITEMS_CANCELED = "action_failed_task_dormant_items_canceled"