Skip to content

Commit

Permalink
ensure non-raising job subscribers handling during execution
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Oct 11, 2023
1 parent 5cc244e commit 7d8c386
Showing 1 changed file with 24 additions and 10 deletions.
34 changes: 24 additions & 10 deletions weaver/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,24 +200,29 @@ def map_job_subscribers(job_body, settings):
def send_job_notification_email(job, task_logger, settings):
# type: (Job, logging.Logger, SettingsType) -> None
"""
Sends the notification email about the execution if it was requested during :term:`Job` submission.
Sends a notification email about the execution status for the subscriber if requested during :term:`Job` submission.
"""
notification_email = job.subscribers.get("emails", {}).get(job.status)
job_subs = job.subscribers or {}
notification_email = job_subs.get("emails", {}).get(job.status)
if notification_email:
try:
email = decrypt_email(notification_email, settings)
notify_job_email(job, email, settings)
message = "Notification email sent successfully."
job.save_log(logger=task_logger, message=message)
except Exception as exc:
except Exception as exc: # pragma: no cover
exception = f"{fully_qualified_name(exc)}: {exc!s}"
message = f"Couldn't send notification email ({exception})"
message = f"Couldn't send notification email: [{exception}]"
job.save_log(errors=message, logger=task_logger, message=message)


def send_job_callback_request(job, task_logger, settings):
# type: (Job, logging.Logger, SettingsType) -> None
request_uri = job.subscribers.get("callbacks", {}).get(job.status)
"""
Send a callback request about the execution status for the subscriber if requested at :term:`Job` execution.
"""
job_subs = job.subscribers or {}
request_uri = job_subs.get("callbacks", {}).get(job.status)
if request_uri:
try:
if job.status != Status.SUCCEEDED:
Expand All @@ -242,9 +247,9 @@ def send_job_callback_request(job, task_logger, settings):
)
message = "Notification callback request sent successfully."
job.save_log(logger=task_logger, message=message)
except Exception as exc:
except Exception as exc: # pragma: no cover
exception = f"{fully_qualified_name(exc)}: {exc!s}"
message = f"Couldn't send notification callback request ({exception})"
message = f"Couldn't send notification callback request: [{exception}]"
job.save_log(errors=message, logger=task_logger, message=message)


Expand All @@ -253,7 +258,16 @@ def notify_job_subscribers(job, task_logger, settings):
"""
Send notifications to all requested :term:`Job` subscribers according to its current status.
All notification operations are non-raising. In case of error, the :term:`Job` logs is updated with error details.
All notification operations must be implemented as non-raising.
In case of error, the :term:`Job` logs will be updated with relevant error details and resume execution.
"""
send_job_notification_email(job, task_logger, settings)
send_job_callback_request(job, task_logger, settings)
try:
send_job_notification_email(job, task_logger, settings)
send_job_callback_request(job, task_logger, settings)
except Exception as exc: # pragma: no cover
exception = f"{fully_qualified_name(exc)}: {exc!s}"
message = (
f"Unhandled error occurred when processing a job notification subscriber: [{exception}]. "
"Error ignored to resume execution."
)
job.save_log(errors=message, logger=task_logger, message=message)

0 comments on commit 7d8c386

Please sign in to comment.