-
Notifications
You must be signed in to change notification settings - Fork 235
✨ BaseRestartWorkChain: add max iterations per handler and pause #7139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -84,7 +84,7 @@ def validate_handler_overrides( | |
|
|
||
| if isinstance(overrides, dict): | ||
| for key in overrides.keys(): | ||
| if key not in ['enabled', 'priority']: | ||
| if key not in ['enabled', 'priority', 'max_iterations']: | ||
| return f'The value of key `{handler}` contain keys `{key}` which is not supported.' | ||
|
|
||
| return None | ||
|
|
@@ -189,6 +189,13 @@ def define(cls, spec: 'ProcessSpec') -> None: # type: ignore[override] | |
| '"pause" (pause the workchain for inspection), "restart_once" (restart once then abort), ' | ||
| '"restart_and_pause" (restart once then pause if still failing).', | ||
| ) | ||
| spec.input( | ||
| 'pause_on_max_iterations', | ||
| valid_type=orm.Bool, | ||
| required=False, | ||
| help='If True, pause the workchain for inspection when max_iterations is reached (either globally or ' | ||
| 'for a specific handler) instead of aborting. When resumed, iteration counters are reset to zero.', | ||
| ) | ||
| spec.exit_code(301, 'ERROR_SUB_PROCESS_EXCEPTED', message='The sub process excepted.') | ||
| spec.exit_code(302, 'ERROR_SUB_PROCESS_KILLED', message='The sub process was killed.') | ||
| spec.exit_code( | ||
|
|
@@ -210,15 +217,14 @@ def setup(self) -> None: | |
| self.ctx.unhandled_failure = False | ||
| self.ctx.is_finished = False | ||
| self.ctx.iteration = 0 | ||
| self.ctx.handler_iteration_counts = {} | ||
|
|
||
| def should_run_process(self) -> bool: | ||
| """Return whether a new process should be run. | ||
|
|
||
| This is the case as long as the last process has not finished successfully and the maximum number of restarts | ||
| has not yet been exceeded. | ||
| This is the case as long as the last process has not finished successfully or a handler has been triggered. | ||
| """ | ||
| max_iterations = self.inputs.max_iterations.value | ||
| return not self.ctx.is_finished and self.ctx.iteration < max_iterations | ||
| return not self.ctx.is_finished | ||
|
|
||
| def run_process(self) -> ToContext: | ||
| """Run the next process, taking the input dictionary from the context at `self.ctx.inputs`.""" | ||
|
|
@@ -297,14 +303,14 @@ def inspect_process(self) -> Optional['ExitCode']: | |
|
|
||
| # If an actual report was returned, save it so it is not overridden by next handler returning `None` | ||
| if report: | ||
| self.ctx.handler_iteration_counts.setdefault(handler.__name__, 0) | ||
| self.ctx.handler_iteration_counts[handler.__name__] += 1 | ||
| last_report = report | ||
|
|
||
| # After certain handlers, we may want to skip all other handlers | ||
| if report and report.do_break: | ||
| break | ||
|
|
||
| report_args = (self.ctx.process_name, node.pk) | ||
|
|
||
| # If the process failed and no handler returned a report we consider it an unhandled failure | ||
| if node.is_failed and not last_report: | ||
| action = self.inputs.get('on_unhandled_failure', None) | ||
|
|
@@ -364,8 +370,44 @@ def inspect_process(self) -> Optional['ExitCode']: | |
| # considered to be an unhandled failed process and therefore we reset the flag | ||
| self.ctx.unhandled_failure = False | ||
|
|
||
| # If at least one handler returned a report, the action depends on its exit code and that of the process itself | ||
| if last_report: | ||
| pause_on_max_iterations = ( | ||
| self.inputs.pause_on_max_iterations.value | ||
| if self.inputs.get('pause_on_max_iterations', None) is not None | ||
| else False | ||
| ) | ||
| pause_process = False | ||
|
|
||
| # Check if the global max iterations have been reached | ||
| if self.ctx.iteration >= self.inputs.max_iterations.value: | ||
| self.report(f'Reached the maximum number of global iterations ({self.inputs.max_iterations.value}).') | ||
| if not pause_on_max_iterations: | ||
| self.report(f'Aborting! Last ran: {self.ctx.process_name}<{node.pk}>') | ||
| return self.exit_codes.ERROR_MAXIMUM_ITERATIONS_EXCEEDED | ||
|
|
||
| self.ctx.iteration = 0 | ||
| pause_process = True | ||
|
|
||
| # Check if any process handlers reached their max iterations | ||
| for handler in self.get_process_handlers(): | ||
| max_iterations_override = self.ctx.handler_overrides.get(handler.__name__, {}).get( | ||
| 'max_iterations', None | ||
| ) | ||
| max_handler_iterations = max_iterations_override if max_iterations_override else handler.max_iterations # type: ignore[attr-defined] | ||
| handler_iterations = self.ctx.handler_iteration_counts.get(handler.__name__, 0) | ||
|
|
||
| if max_handler_iterations is not None and handler_iterations >= max_handler_iterations: | ||
| self.report( | ||
| f'Reached the maximum number of iterations ({max_handler_iterations}) for handler ' | ||
| f'`{handler.__name__}`.' | ||
| ) | ||
| if not pause_on_max_iterations: | ||
| self.report(f'Aborting! Last ran: {self.ctx.process_name}<{node.pk}>') | ||
| return self.exit_codes.ERROR_MAXIMUM_ITERATIONS_EXCEEDED | ||
|
|
||
| self.ctx.handler_iteration_counts[handler.__name__] = 0 | ||
| pause_process = True | ||
|
|
||
| if node.is_finished_ok and last_report.exit_code.status == 0: | ||
| template = '{}<{}> finished successfully but a handler was triggered, restarting' | ||
| elif node.is_failed and last_report.exit_code.status == 0: | ||
|
|
@@ -375,9 +417,20 @@ def inspect_process(self) -> Optional['ExitCode']: | |
| elif node.is_failed and last_report.exit_code.status != 0: | ||
| template = '{}<{}> failed but a handler detected an unrecoverable problem, aborting' | ||
|
|
||
| self.report(template.format(*report_args)) | ||
| self.report(template.format(self.ctx.process_name, node.pk)) | ||
|
|
||
| if last_report.exit_code.status != 0: | ||
| return last_report.exit_code | ||
|
|
||
| if pause_process: | ||
| self.report( | ||
| f'Resetting the iteration counter(s) and pausing for inspection. You can resume execution using ' | ||
| f'`verdi process play {self.node.pk}`, or kill the work chain using ' | ||
| f'`verdi process kill {self.node.pk}`.' | ||
| ) | ||
| self.pause(f"Paused for user inspection, see: 'verdi process report {self.node.pk}'") | ||
|
|
||
| return last_report.exit_code | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here, the I think it's better to be explicit and return the exit code in case its exit status is nonzero, and return |
||
| return None | ||
|
|
||
| # Otherwise the process was successful and no handler returned anything so we consider the work done | ||
| self.ctx.is_finished = True | ||
|
|
@@ -397,17 +450,6 @@ def results(self) -> Optional['ExitCode']: | |
| """Attach the outputs specified in the output specification from the last completed process.""" | ||
| node = self.ctx.children[self.ctx.iteration - 1] | ||
|
|
||
| # We check the `is_finished` attribute of the work chain and not the successfulness of the last process | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All of the "max iterations" logic is now localised on in the |
||
| # because the error handlers in the last iteration can have qualified a "failed" process as satisfactory | ||
| # for the outcome of the work chain and so have marked it as `is_finished=True`. | ||
| max_iterations = self.inputs.max_iterations.value | ||
| if not self.ctx.is_finished and self.ctx.iteration >= max_iterations: | ||
| self.report( | ||
| f'reached the maximum number of iterations {max_iterations}: ' | ||
| f'last ran {self.ctx.process_name}<{node.pk}>' | ||
| ) | ||
| return self.exit_codes.ERROR_MAXIMUM_ITERATIONS_EXCEEDED | ||
|
|
||
| self.report(f'work chain completed after {self.ctx.iteration} iterations') | ||
| self._attach_outputs(node) | ||
| return None | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether or not the global
max_iterationshas been reached is now checked in theinspect_processstep.