diff --git a/CHANGES.rst b/CHANGES.rst index bd7b24f4e..7149b027f 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -20,10 +20,24 @@ Changes: Media-Type that categories the ``unit`` contents, similarly to how it could be provided for its ``href`` counterpart. For the moment, only `CWL`-based ``unit`` are supported, but this could allow future extensions to provide alternate representations of an `Application Package`. - -Fixes: ------- -- No change. +- Add `Job` ``subscribers`` support to define `OGC`-compliant callback URLs where HTTP(S) requests will be sent upon + reaching certain `Job` status milestones (resolves `#230 `_). +- Add email notification support to the new ``subscribers`` definition (extension over `OGC` minimal requirements). +- Deprecate `Job` ``notification_email`` in the `OpenAPI` specification in favor of ``subscribers``, but preserve + parsing of its value if provided in the `JSON` body during `Job` submission for backward compatibility support of + existing servers. The ``Job.notification_email`` attribute is removed to avoid duplicate references. +- Add notification email for `Job` ``started`` status, only available through the ``subscribers`` property. +- Add `CLI` and ``WeaverClient`` options to support ``subscribers`` specification for submitted `Job` execution. +- Add ``{PROCESS_ID}/{STATUS}.mako`` template detection under the ``weaver.wps_email_notify_template_dir`` location + to allow per-`Process` and per-`Job` status email customization. +- Refactor ``weaver/notify.py`` and ``weaver/processes/execution.py`` to avoid mixed references to the + encryption/decryption logic employed for notification emails. All notifications including emails and + callback requests are now completely handled and contained in the ``weaver/notify.py`` module. +- Remove partially duplicate Mako Template definition as hardcoded string and separate file for email notification. + +Fixes: +------ +- Fix ``weaver.cli`` logger not properly configured when executed from `CLI` causing log messages to not be reported. .. _changes_4.33.0: diff --git a/docs/source/cli.rst b/docs/source/cli.rst index 72a683997..179a230c1 100644 --- a/docs/source/cli.rst +++ b/docs/source/cli.rst @@ -337,7 +337,7 @@ above :term:`CLI` variations, but it is usually more intuitive to use a Python : {"href": "http://another.com/data.json"}, ], "single": { - "href": "/workspace/data.xml@mediaType", # note: uploaded to vault automatically before execution + "href": "/workspace/data.xml", # note: uploaded to vault automatically before execution "type": "text/xml", } }) diff --git a/docs/source/configuration.rst b/docs/source/configuration.rst index 87176ab2a..f27c21a3f 100644 --- a/docs/source/configuration.rst +++ b/docs/source/configuration.rst @@ -221,21 +221,37 @@ they are optional and which default value or operation is applied in each situat | The *path* variant **SHOULD** start with ``/`` for appropriate concatenation with ``weaver.url``, although this is not strictly enforced. -- | ``weaver.wps_metadata_[...]`` (multiple settings) +- | ``weaver.wps_metadata_[...]`` (multiple settings) [:class:`str`] | | Metadata fields that will be rendered by either or both the WPS-1/2 and WPS-REST endpoints (:ref:`GetCapabilities `). - | ``weaver.wps_email_[...]`` (multiple settings) | - | Defines configuration of email notification functionality on job completion. + | Defines configuration of email notification functionality on :term:`Job` status milestones. | - | Encryption settings as well as custom email templates are available. Default email template defined in - `email-template`_ is employed if none is provided. Email notifications are sent only on job - completion if an email was provided in the :ref:`Execute ` request body - (see also: :ref:`Email Notification`). + | Encryption settings as well as custom email template locations are available. + The |default-notify-email-template|_ is employed if none is provided or when specified template + files or directory cannot be resolved. + | + | When looking up for templates within ``weaver.wps_email_notify_template_dir``, the following resolution order is + followed to attempt matching files. The first one that is found will be employed for the notification email. + | + | 1. file ``{TEMPLATE_DIR}/{PROCESS_ID}/{STATUS}.mako`` used for a specific :term:`Process` and :term:`Job` status + | 2. file ``{TEMPLATE_DIR}/{PROCESS_ID}.mako`` used for a specific :term:`Process` but any :term:`Job` status + | 3. file ``{TEMPLATE_DIR}/{weaver.wps_email_notify_template_default}`` used for any combination if specified + | 4. file ``{TEMPLATE_DIR}/default.mako`` used for any combination if an alternate default name was not specified + | 5. file |default-notify-email-template|_ as last resort + | + | Email notifications are sent only when corresponding :term:`Job` status milestones are reached and when + email(s) were provided in the :ref:`Execute ` request body. Emails will not be sent if + the request body did not include a subscription to those notifications, even if the templates were configured. + +.. seealso:: + See :ref:`Notification Subscribers ` for more details. .. versionadded:: 4.15 +.. versionchanged:: 4.34 - | ``weaver.execute_sync_max_wait = `` [:class:`int`, seconds] | (default: ``20``) diff --git a/docs/source/processes.rst b/docs/source/processes.rst index 1e8e45ced..bcb9d573c 100644 --- a/docs/source/processes.rst +++ b/docs/source/processes.rst @@ -1394,7 +1394,9 @@ the configured :term:`WPS` output directory. .. versionadded:: 4.3 Addition of the ``X-WPS-Output-Context`` header. -Email Notification +.. _proc_op_execute_subscribers: + +Notification Subscribers ~~~~~~~~~~~~~~~~~~~~~~~~~~ When submitting a :term:`Job` for execution, it is possible to provide the ``notification_email`` field. @@ -1402,6 +1404,20 @@ Doing so will tell `Weaver` to send an email to the specified address with succe upon :term:`Job` completion. The format of the email is configurable from `weaver.ini.example`_ file with email-specific settings (see: :ref:`Configuration`). +Alternatively to ``notification_email``, the ``subscribers`` field of the :term:`API` can be employed during :term:`Job` +submission. Using this field will take precedence over ``notification_email`` for corresponding email and status +combinations. The :term:`Job` ``subscribers`` allow more fined-grained control over which emails will be sent for +the various combinations of :term:`Job` status milestones. + +Furthermore, ``subscribers`` allow specifying URLs where HTTP(S) requests will be sent with +the :ref:`Job Status ` or :ref:`Job Results ` contents directly in :term:`JSON` format. +This allows users and/or servers to directly receive the necessary details using a push-notification mechanism instead +of the polling-based method on the :ref:`Job Status ` endpoint otherwise required to obtain updated +:term:`Job` details. + +.. seealso:: + Refer to the |oas-rtd|_ of the |exec-req|_ request for all available ``subscribers`` properties. + .. _proc_op_status: .. _proc_op_monitor: diff --git a/docs/source/references.rst b/docs/source/references.rst index dc31fef95..6559faba1 100644 --- a/docs/source/references.rst +++ b/docs/source/references.rst @@ -137,7 +137,8 @@ .. _request_options.yml.example: ../../../config/request_options.yml.example .. _Dockerfile-manager: ../../../docker/Dockerfile-manager .. _Dockerfile-worker: ../../../docker/Dockerfile-worker -.. _email-template: ../../../weaver/wps_restapi/templates/notification_email_example.mako +.. _default-notify-email-template: ../../../weaver/wps_restapi/templates/notification_email_example.mako +.. |default-notify-email-template| replace:: Default Notification Email Mako Template .. |opensearch-deploy| replace:: OpenSearch Deploy .. _opensearch-deploy: ../../../tests/opensearch/json/opensearch_deploy.json .. |opensearch-examples| replace:: OpenSearch Examples diff --git a/tests/functional/test_cli.py b/tests/functional/test_cli.py index eab3c3edd..65a467005 100644 --- a/tests/functional/test_cli.py +++ b/tests/functional/test_cli.py @@ -8,6 +8,7 @@ import logging import os import shutil +import smtplib import tempfile import uuid from typing import TYPE_CHECKING @@ -38,9 +39,10 @@ from weaver.cli import AuthHandler, BearerAuthHandler, WeaverClient, main as weaver_cli from weaver.datatype import DockerAuthentication, Service from weaver.formats import ContentType, OutputFormat, get_cwl_file_format, repr_json +from weaver.notify import decrypt_email from weaver.processes.constants import CWL_REQUIREMENT_APP_DOCKER, ProcessSchema from weaver.processes.types import ProcessType -from weaver.status import Status, StatusCategory +from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory from weaver.utils import fully_qualified_name from weaver.visibility import Visibility from weaver.wps.utils import get_wps_output_url, map_wps_output_location @@ -67,7 +69,10 @@ def setUpClass(cls): settings.update({ "weaver.vault_dir": tempfile.mkdtemp(prefix="weaver-test-"), "weaver.wps_output_dir": tempfile.mkdtemp(prefix="weaver-test-"), - "weaver.wps_output_url": "http://random-file-server.com/wps-outputs" + "weaver.wps_output_url": "http://random-file-server.com/wps-outputs", + "weaver.wps_email_notify_smtp_host": "http://fake-email-server", + "weaver.wps_email_notify_port": 1234, + "weaver.wps_email_encrypt_salt": "123456", }) cls.settings = settings super(TestWeaverClientBase, cls).setUpClass() @@ -425,7 +430,8 @@ def test_describe(self): ), "CLI should not have overridden the process description field." def run_execute_inputs_schema_variant(self, inputs_param, process="Echo", - preload=False, location=False, expect_success=True, mock_exec=True): + preload=False, location=False, expect_success=True, + mock_exec=True, **exec_kwargs): if isinstance(inputs_param, str): ref = {"location": inputs_param} if location else {"ref_name": inputs_param} if preload: @@ -440,7 +446,8 @@ def run_execute_inputs_schema_variant(self, inputs_param, process="Echo", mock_exec_func = None for mock_exec_proc in mocked_execute_celery(func_execute_task=mock_exec_func): stack_exec.enter_context(mock_exec_proc) - result = mocked_sub_requests(self.app, self.client.execute, self.test_process[process], inputs=inputs_param) + result = mocked_sub_requests(self.app, self.client.execute, self.test_process[process], + inputs=inputs_param, **exec_kwargs) if expect_success: assert result.success, result.message + (result.text if result.text else "") assert "jobID" in result.body @@ -566,6 +573,64 @@ def test_execute_with_auto_monitor(self): # status was periodically pooled and returned 'running' until the final 'succeeded' resumes to download. raise NotImplementedError + def test_execute_subscribers(self): + """ + Test that specified subscribers are called for relevant :term:`Job` status milestones. + + .. versionadded:: 4.34 + """ + subscribers = { + "inProgressUri": "https://server.com/started", + "failedUri": "https://server.com/failure", + "successUri": "https://server.com/success", + "inProgressEmail": "working@email.com", + "failedEmail": "failed@email.com", + "successEmail": "success@email.com", + } + with contextlib.ExitStack() as subs_stack: + # mock as close as possible to the 'send' operations of respective subscriber types + mocked_requests = subs_stack.enter_context(mock.patch("weaver.notify.request_extra")) + mocked_smtp = subs_stack.enter_context(mock.patch("smtplib.SMTP_SSL", autospec=smtplib.SMTP_SSL)) + mocked_smtp.return_value.sendmail.return_value = None # sending worked + mocked_emails = mocked_smtp.return_value.sendmail # shortcut + + result = self.run_execute_inputs_schema_variant( + {"message": "test-subscribers"}, + subscribers=subscribers, + mock_exec=False, # need to run it to get subscriber calls + ) + + # NOTE: + # Because JSON of job status are pushed using the OGC schema definitions, + # actual status in the body will be mapped to their standard equivalents. + # For example, "started" will be represented as "running" in the callback request body, + # even though both of these statuses are used internally at distinct execution steps. + running_statuses = JOB_STATUS_CATEGORIES[StatusCategory.RUNNING] + job_id = result.body["jobID"] + expect_outputs = { + "output": { + "href": f"{get_wps_output_url(self.settings)}/{job_id}/output/stdout.log", + "type": ContentType.TEXT_PLAIN, + "format": {"mediaType": ContentType.TEXT_PLAIN}, + } + } + + # order important, expect status 'started' (in-progress) to occur before 'succeeded' + # call for 'failed' should never happen since 'succeeded' expected, as validated by above method + assert mocked_requests.call_count == 2, "Should not have called both failed/success callback requests" + assert mocked_requests.call_args_list[0].args == ("POST", subscribers["inProgressUri"]) + assert mocked_requests.call_args_list[0].kwargs["json"]["status"] in running_statuses # status JSON + assert mocked_requests.call_args_list[1].args == ("POST", subscribers["successUri"]) + assert mocked_requests.call_args_list[1].kwargs["json"] == expect_outputs # results JSON + + # first argument None is 'from_addr' not configured, this is allowed if provided by 'From' email header + test_proc_byte = self.test_process["Echo"] + assert mocked_emails.call_count == 2, "Should not have sent both failed/success email notifications" + assert mocked_emails.call_args_list[0].args[:2] == (None, subscribers["inProgressEmail"]) + assert f"Job {test_proc_byte} Started".encode() in mocked_emails.call_args_list[0].args[-1] + assert mocked_emails.call_args_list[1].args[:2] == (None, subscribers["successEmail"]) + assert f"Job {test_proc_byte} Succeeded".encode() in mocked_emails.call_args_list[1].args[-1] + # NOTE: # For all below '<>_auto_resolve_vault' test cases, the local file referenced in the Execute request body # should be automatically handled by uploading to the Vault and forwarding the relevant X-Auth-Vault header. @@ -1626,6 +1691,61 @@ def test_execute_output_context(self, cli_options, expect_output_context): wps_path = link.split(wps_url)[-1] assert wps_path == f"/{expect_output_context}/{job_id}/output/stdout.log" + def test_execute_subscriber_options(self): + """ + Validate that subscriber options are properly combined on the CLI. + + Since options are provided by multiple separate arguments on the command line, but are a single JSON definition + in :class:`weaver.cli.WeaverClient`, ensure that mapping is accomplished as expected. Also, validate that those + definitions correspond to the final data structure obtained in the database for later use by the job execution. + + .. versionadded:: 4.34 + """ + proc = self.test_process["Echo"] + with contextlib.ExitStack() as stack_exec: + for mock_exec_proc in mocked_execute_celery(): + stack_exec.enter_context(mock_exec_proc) + + test_email_started = "started-job@email.com" + test_email_failed = "failed-job@email.com" + test_callback_started = "https://server.com/started" + test_callback_success = "https://server.com/success" + lines = mocked_sub_requests( + self.app, run_command, + [ + # "weaver", + "execute", + "-u", self.url, + "-p", proc, + "-I", "message='TEST MESSAGE!'", + "-M", + "-T", 2, + "-W", 1, + "-nL", + "-d", + "-F", OutputFormat.JSON_RAW, + "-sEP", test_email_started, + "-sEF", test_email_failed, + "-sCP", test_callback_started, + "-sCS", test_callback_success, + ], + trim=False, + entrypoint=weaver_cli, + only_local=True, + ) + data = json.loads(lines[0]) + assert data["status"] == Status.SUCCEEDED + + job = self.job_store.fetch_by_id(data["jobID"]) + # to properly compare, we must decrypt emails (encrypt is not deterministic on multiple calls) + subs = copy.deepcopy(job.subscribers) + for sub, email in subs["emails"].items(): + subs["emails"][sub] = decrypt_email(email, self.settings) + assert subs == { + "callbacks": {Status.STARTED: test_callback_started, Status.SUCCEEDED: test_callback_success}, + "emails": {Status.STARTED: test_email_started, Status.FAILED: test_email_failed}, + }, "Job subscribers should be as submitted, after combining CLI options, without extra or missing ones." + def test_execute_help_details(self): """ Verify that formatting of the execute operation help provides multiple paragraphs with more details. diff --git a/tests/test_cli.py b/tests/test_cli.py index 5dde1c02c..4b47b4de9 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,6 +1,7 @@ """ Unit test for :mod:`weaver.cli` utilities. """ +import argparse import base64 import inspect import json @@ -20,6 +21,7 @@ BearerAuthHandler, CookieAuthHandler, OperationResult, + SubscriberAction, WeaverClient, main as weaver_cli ) @@ -280,3 +282,91 @@ def mock_upload(_href, *_, **__): with mock.patch("weaver.cli.WeaverClient.upload", side_effect=mock_upload): result = WeaverClient()._upload_files(inputs=inputs) assert result is mock_result, "WeaverCLient.upload is expected to be called and should return a failed result." + + +@pytest.mark.parametrize( + ["expect_error", "subscriber_option", "subscriber_dest", "subscriber_value", "subscriber_result"], + [ + ( + None, + "--subscriber-email", + "subscriber.email", + "test@email.com", + {"subscriber": {"email": "test@email.com"}}, + ), + ( + None, + "--subscriber-callback", + "subscriber.callback", + "https://some-server.com/path", + {"subscriber": {"callback": "https://some-server.com/path"}} + ), + ( + None, + "--random-option", + "subscriber.email", + "test@email.com", + {"subscriber": {"email": "test@email.com"}}, + ), + ( + None, + "--random-option", + "subscriber.callback", + "https://some-server.com/path", + {"subscriber": {"callback": "https://some-server.com/path"}} + ), + ( + argparse.ArgumentError, + "--subscriber-email", + "subscriber.email", + "https://some-server.com/path", + None + ), + ( + argparse.ArgumentError, + "--subscriber-callback", + "subscriber.callback", + "test@email.com", + None, + ), + ( + argparse.ArgumentError, + "--subscriber-email", + "subscriber.email", + "random", + None + ), + ( + argparse.ArgumentError, + "--subscriber-callback", + "subscriber.callback", + "random", + None + ), + ( + NotImplementedError, + "--subscriber-unknown", + "subscriber.unknown", + "test@email.com", + None + ), + ( + NotImplementedError, + "--subscriber-unknown", + "subscriber.unknown", + "https://some-server.com/path", + None + ), + ] +) +def test_subscriber_parsing(expect_error, subscriber_option, subscriber_dest, subscriber_value, subscriber_result): + ns = argparse.Namespace() + try: + action = SubscriberAction(["-sXX", subscriber_option], dest=subscriber_dest) + action(argparse.ArgumentParser(), ns, subscriber_value) + except Exception as exc: + assert expect_error is not None, f"Test was not expected to fail, but raised {exc!s}." + assert isinstance(exc, expect_error), f"Test expected to raise {expect_error}, but raised {exc!s} instead." + else: + assert expect_error is None, f"Test was expected to fail with {expect_error}, but did not raise" + assert dict(**vars(ns)) == subscriber_result # pylint: disable=R1735 diff --git a/tests/test_notify.py b/tests/test_notify.py index 5f029d826..17c9d89ac 100644 --- a/tests/test_notify.py +++ b/tests/test_notify.py @@ -1,15 +1,23 @@ +import contextlib import os +import pathlib import smtplib import tempfile +import textwrap import uuid +from typing import TYPE_CHECKING import mock import pytest +from weaver import WEAVER_MODULE_DIR from weaver.datatype import Job -from weaver.notify import decrypt_email, encrypt_email, notify_job_complete +from weaver.notify import decrypt_email, encrypt_email, notify_job_email, resolve_email_template from weaver.status import Status +if TYPE_CHECKING: + from typing import Dict, Optional, Type, Union + def test_encrypt_decrypt_email_valid(): settings = { @@ -50,7 +58,7 @@ def test_encrypt_decrypt_email_raise(email_func): pytest.fail("Should have raised for invalid/missing settings") -def test_notify_job_complete(): +def test_notify_email_job_complete(): test_url = "https://test-weaver.example.com" settings = { "weaver.url": test_url, @@ -74,7 +82,7 @@ def test_notify_job_complete(): mock_smtp.return_value.sendmail.return_value = None # sending worked test_job.status = Status.SUCCEEDED - notify_job_complete(test_job, notify_email, settings) + notify_job_email(test_job, notify_email, settings) mock_smtp.assert_called_with("xyz.test.com", 12345, timeout=1) assert mock_smtp.return_value.sendmail.call_args[0][0] == "test-weaver@email.com" assert mock_smtp.return_value.sendmail.call_args[0][1] == notify_email @@ -89,7 +97,7 @@ def test_notify_job_complete(): assert test_job_err_url not in message test_job.status = Status.FAILED - notify_job_complete(test_job, notify_email, settings) + notify_job_email(test_job, notify_email, settings) assert mock_smtp.return_value.sendmail.call_args[0][0] == "test-weaver@email.com" assert mock_smtp.return_value.sendmail.call_args[0][1] == notify_email message_encoded = mock_smtp.return_value.sendmail.call_args[0][2] @@ -103,7 +111,7 @@ def test_notify_job_complete(): assert test_job_err_url in message -def test_notify_job_complete_custom_template(): +def test_notify_job_email_custom_template(): with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8", suffix=".mako") as email_template_file: email_template_file.writelines([ "From: Weaver\n", @@ -137,7 +145,7 @@ def test_notify_job_complete_custom_template(): with mock.patch("smtplib.SMTP_SSL", autospec=smtplib.SMTP_SSL) as mock_smtp: mock_smtp.return_value.sendmail.return_value = None # sending worked - notify_job_complete(test_job, notify_email, settings) + notify_job_email(test_job, notify_email, settings) message_encoded = mock_smtp.return_value.sendmail.call_args[0][2] message = message_encoded.decode("utf8") @@ -148,3 +156,137 @@ def test_notify_job_complete_custom_template(): "", f"Job: {test_url}/processes/{test_job.process}/jobs/{test_job.id}", ]) + + +@pytest.mark.parametrize( + ["settings", "test_process", "test_status", "test_default", "tmp_default", "expect_result"], + [ + ({}, None, None, False, None, 4), + # directory exists, but none of the supported mako variants found under it + ({"weaver.wps_email_notify_template_dir": tempfile.gettempdir()}, None, None, False, None, IOError), + ({"weaver.wps_email_notify_template_dir": "/DOES_NOT_EXIST"}, None, None, False, None, IOError), + ({"weaver.wps_email_notify_template_dir": ""}, None, None, False, None, IOError), + ({"weaver.wps_email_notify_template_dir": "", + "weaver.wps_email_notify_template_default": "RANDOM"}, None, None, False, None, IOError), + ({"weaver.wps_email_notify_template_dir": ""}, None, None, False, None, IOError), + ({"weaver.wps_email_notify_template_dir": "", + "weaver.wps_email_notify_template_default": "test-default.mako"}, None, False, None, "random.mako", IOError), + ({"weaver.wps_email_notify_template_dir": ""}, None, None, False, "test-default.mako", IOError), + ({"weaver.wps_email_notify_template_dir": "", + "weaver.wps_email_notify_template_default": "test-default.mako"}, None, None, False, "default.mako", IOError), + ({"weaver.wps_email_notify_template_dir": "", + "weaver.wps_email_notify_template_default": "test-default.mako"}, None, None, True, "default.mako", IOError), + ({"weaver.wps_email_notify_template_dir": ""}, None, None, True, None, 3), + ({"weaver.wps_email_notify_template_dir": "", + "weaver.wps_email_notify_template_default": "test-default.mako"}, None, None, False, "test-default.mako", 2), + ( + {"weaver.wps_email_notify_template_dir": "", + "weaver.wps_email_notify_template_default": "test-default.mako"}, + "random-process", + None, + False, + "test-default.mako", + 2 + ), + ( + {"weaver.wps_email_notify_template_dir": "", + "weaver.wps_email_notify_template_default": "test-default.mako"}, + "random-process", + Status.SUCCEEDED, + False, + "test-default.mako", + 2 + ), + ( + {"weaver.wps_email_notify_template_dir": "", + "weaver.wps_email_notify_template_default": "test-default.mako"}, + "random-process", + Status.STARTED, + False, + "test-default.mako", + 2 + ), + ( + {"weaver.wps_email_notify_template_dir": "", + "weaver.wps_email_notify_template_default": "test-default.mako"}, + "tmp-process", + None, + False, + "test-default.mako", + 1 + ), + ( + {"weaver.wps_email_notify_template_dir": "", + "weaver.wps_email_notify_template_default": "test-default.mako"}, + "tmp-process", + Status.SUCCEEDED, + False, + "test-default.mako", + 1 + ), + ( + {"weaver.wps_email_notify_template_dir": "", + "weaver.wps_email_notify_template_default": "test-default.mako"}, + "tmp-process", + Status.STARTED, + False, + "test-default.mako", + 0 + ), + ] +) +def test_resolve_email_template(settings, test_process, test_status, test_default, tmp_default, expect_result): + # type: (Dict[str, str], Optional[str], Optional[str], bool, Optional[str], Union[Type[Exception], int]) -> None + + # process name and job status are important to evaluate expected mako file resolution + tmp_process = "tmp-process" + tmp_status = Status.STARTED + + with contextlib.ExitStack() as tmp_stack: + tmp_dir = pathlib.Path(tmp_stack.enter_context(tempfile.TemporaryDirectory())) + if settings.get("weaver.wps_email_notify_template_dir") == "": + settings["weaver.wps_email_notify_template_dir"] = str(tmp_dir) + + tmp_proc_dir = tmp_dir / tmp_process + os.makedirs(tmp_proc_dir, exist_ok=True) + tmp_file0 = tmp_proc_dir / f"{tmp_status}.mako" + tmp_file0.touch() + tmp_file1 = tmp_dir / f"{tmp_process}.mako" + tmp_file1.touch() + tmp_file2 = tmp_dir / str(tmp_default) + if tmp_default: + tmp_file2.touch() + tmp_file3 = tmp_dir / "default.mako" + if test_default: + tmp_file3.touch() + default_file = os.path.join(WEAVER_MODULE_DIR, "wps_restapi/templates/notification_email_example.mako") + + ordered_possible_matches = [ + str(tmp_file0), # {tmp_dir}/{process}/{status}.mako + str(tmp_file1), # {tmp_dir}/{process}.mako + str(tmp_file2), # {tmp_dir}/{default}.mako + str(tmp_file3), # {tmp_dir}/default.mako + str(default_file), # weaver default mako + ] + tmp_dir_files = list(sorted(os.path.join(root, file) for root, _, files in os.walk(tmp_dir) for file in files)) + tmp_dir_msg = "Temporary directory contents:\n{}".format(textwrap.indent("\n".join(tmp_dir_files), " ")) + + test_job = Job(task_id=uuid.uuid4(), process=test_process, status=test_status) + try: + found_template = resolve_email_template(test_job, settings) + found_template_index = ordered_possible_matches.index(found_template.filename) + assert isinstance(expect_result, int), ( + f"Test expected to raise {expect_result} but did not raise.\n{tmp_dir_msg}" + ) + assert found_template_index == expect_result, ( + f"Test did not match the expected template file.\n{tmp_dir_msg}" + ) + except AssertionError: + raise + except Exception as exc: + assert not isinstance(expect_result, int), ( + f"Test did not expect an error, but raised {exc!r}.\n{tmp_dir_msg}" + ) + assert isinstance(exc, expect_result), ( + f"Test expected {expect_result}, but raised {exc!r} instead.\n{tmp_dir_msg}" + ) diff --git a/weaver/cli.py b/weaver/cli.py index 944d7bc26..3a98d858f 100644 --- a/weaver/cli.py +++ b/weaver/cli.py @@ -13,6 +13,7 @@ from urllib.parse import urlparse import yaml +from colander import EMAIL_RE, URL_REGEX from pyramid.httpexceptions import HTTPNotImplemented from requests.auth import AuthBase, HTTPBasicAuth from requests.sessions import Session @@ -37,6 +38,7 @@ from weaver.sort import Sort, SortMethods from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status from weaver.utils import ( + Lazify, OutputMethod, copy_doc, fetch_reference, @@ -70,15 +72,18 @@ AnyResponseType, CWL, ExecutionInputsMap, + ExecutionResultObjectRef, ExecutionResults, HeadersType, + JobSubscribers, JSON ) except ImportError: # avoid linter issue AnyRequestMethod = str AnyHeadersContainer = AnyRequestType = AnyResponseType = Any - CWL = JSON = ExecutionInputsMap = ExecutionResults = HeadersType = Any + CWL = JSON = ExecutionInputsMap = ExecutionResults = ExecutionResultObjectRef = HeadersType = Any + JobSubscribers = Any try: from weaver.formats import AnyOutputFormat from weaver.processes.constants import ProcessSchemaType @@ -92,7 +97,7 @@ PostHelpFormatter = Callable[[str], str] ArgumentParserRule = Tuple[argparse._ActionsContainer, Callable[[argparse.Namespace], Optional[bool]], str] # noqa -LOGGER = logging.getLogger(__name__) +LOGGER = logging.getLogger("weaver.cli") # do not use '__name__' since it becomes '__main__' from CLI call OPERATION_ARGS_TITLE = "Operation Arguments" OPTIONAL_ARGS_TITLE = "Optional Arguments" @@ -1050,6 +1055,7 @@ def execute(self, monitor=False, # type: bool timeout=None, # type: Optional[int] interval=None, # type: Optional[int] + subscribers=None, # type: Optional[JobSubscribers] url=None, # type: Optional[str] auth=None, # type: Optional[AuthHandler] headers=None, # type: Optional[AnyHeadersContainer] @@ -1094,6 +1100,9 @@ def execute(self, Monitoring timeout (seconds) if requested. :param interval: Monitoring interval (seconds) between job status polling requests. + :param subscribers: + Job status subscribers to obtain email or callback request notifications. + The subscriber keys indicate which type of subscriber and for which status the notification will be sent. :param url: Instance URL if not already provided during client creation. :param auth: Instance authentication handler if not already created during client creation. @@ -1136,7 +1145,12 @@ def execute(self, # FIXME: allow filtering 'outputs' (https://github.com/crim-ca/weaver/issues/380) "outputs": {} } + if subscribers: + LOGGER.debug("Adding job execution subscribers:\n%s", Lazify(lambda: repr_json(subscribers, indent=2))) + data["subscribers"] = subscribers + # omit x-headers on purpose for 'describe', assume they are intended for 'execute' operation only + LOGGER.debug("Looking up process [%s] (provider: %s) to execute on [%s]", process_id, provider_id, base) result = self.describe(process_id, provider_id=provider_id, url=base, auth=auth) if not result.success: return OperationResult(False, "Could not obtain process description for execution.", @@ -1540,7 +1554,7 @@ def _download_references(self, outputs, out_links, out_dir, job_id, auth=None): out_path = os.path.join(out_dir, out_id) is_list = True if not isinstance(value, list): - value = [value] + value = [value] # type: ignore is_list = False for i, item in enumerate(value): if "href" in item: @@ -1560,15 +1574,15 @@ def _download_references(self, outputs, out_links, out_dir, job_id, auth=None): link, params = link_header.split(";", 1) href = link.strip("<>") params = parse_kvp(params, multi_value_sep=None, accumulate_keys=False) - ctype = (params.get("type") or [None])[0] + ctype = (params.get("type") or [None])[0] # type: str rel = params["rel"][0].split(".") output = rel[0] is_array = len(rel) > 1 and str.isnumeric(rel[1]) ref_path = fetch_reference(href, out_dir, auth=auth, out_method=OutputMethod.COPY, out_listing=False) - value = {"href": href, "type": ctype, "path": ref_path, "source": "link"} + value = {"href": href, "type": ctype, "path": ref_path, "source": "link"} # type: ExecutionResultObjectRef if output in outputs: - if isinstance(outputs[output], dict): # in case 'rel=". None + if not isinstance(dest, str) or "." not in dest: # pragma: no cover # only for self-validation + raise ValueError("Using 'SubscriberAction' requires 'dest=.' parameter.") + dest, self.field = dest.split(".", 1) + super(SubscriberAction, self).__init__(option_strings, dest=dest, **kwargs) + + def __call__(self, parser, namespace, subscriber_param, option_string=None): + # type: (argparse.ArgumentParser, argparse.Namespace, str, Optional[str]) -> None + + sub_options = "/".join(self.option_strings) + self.validate(sub_options, subscriber_param) + + subs_params = getattr(namespace, self.dest, {}) or {} + subs_params[self.field] = subscriber_param + setattr(namespace, self.dest, subs_params) + + def validate(self, option, value): + # type: (str, Any) -> None + metavar = self.metavar or "" + if any("email" in opt.lower() for opt in [option, self.field, metavar]): + pattern = re.compile(EMAIL_RE, flags=re.IGNORECASE) + elif any("callback" in opt.lower() for opt in [option, self.field, metavar]): + pattern = re.compile(URL_REGEX, flags=re.IGNORECASE) + else: + raise NotImplementedError(f"Cannot parse option: '{option}'") + if not re.match(pattern, value): + raise argparse.ArgumentError(self, f"Value '{value}' is not a valid subscriber argument for '{option}'.") + + +def add_subscribers_params(parser): + # type: (argparse.ArgumentParser) -> None + subs_args = parser.add_argument_group( + title="Notification Subscribers", + description=( + "Email or callback request URL to obtain notification of job status milestones.\n\n" + "Note that for email notifications, the targeted server must have properly configured SMTP settings." + ), + ) + subs_args.add_argument( + "-sEP", "--subscriber-email-progress", + action=SubscriberAction, + metavar="EMAIL", + dest="subscribers.inProgressEmail", + help="Send a notification email to this address once the job started execution." + ) + subs_args.add_argument( + "-sEF", "--subscriber-email-failed", + action=SubscriberAction, + metavar="EMAIL", + dest="subscribers.failedEmail", + help="Send a notification email to this address if the job execution completed with failure." + ) + subs_args.add_argument( + "-sES", "--subscriber-email-success", + action=SubscriberAction, + metavar="EMAIL", + dest="subscribers.successEmail", + help="Send a notification email to this address if the job execution completed successfully." + + ) + subs_args.add_argument( + "-sCP", "--subscriber-callback-progress", + action=SubscriberAction, + metavar="URL", + dest="subscribers.inProgressUri", + help=( + "Send an HTTP callback request to this URL once the job started execution.\n\n" + "The request body will contain the JSON representation of the job status details." + ) + ) + subs_args.add_argument( + "-sCF", "--subscriber-callback-failed", + action=SubscriberAction, + metavar="URL", + dest="subscribers.failedUri", + help=( + "Send an HTTP callback request to this URL if the job execution completed with failure.\n\n" + "The request body will contain the JSON representation of the job status details." + ) + ) + subs_args.add_argument( + "-sCS", "--subscriber-callback-success", + action=SubscriberAction, + metavar="URL", + dest="subscribers.successUri", + help=( + "Send an HTTP callback request to this URL if the job execution completed successfully.\n\n" + "The request body will contain the JSON representation of the job results." + ) + ) + + def set_parser_sections(parser): # type: (argparse.ArgumentParser) -> None parser._optionals.title = OPTIONAL_ARGS_TITLE @@ -2552,6 +2668,7 @@ def make_parser(): "If not requested, the created job status location is directly returned." ) add_timeout_param(op_execute) + add_subscribers_params(op_execute) op_jobs = WeaverArgumentParser( "jobs", diff --git a/weaver/datatype.py b/weaver/datatype.py index 85a32aa89..03706bae4 100644 --- a/weaver/datatype.py +++ b/weaver/datatype.py @@ -85,6 +85,7 @@ CWL, ExecutionInputs, ExecutionOutputs, + ExecutionSubscribers, JSON, Link, Metadata, @@ -926,16 +927,28 @@ def status_location(self, location_url): self["status_location"] = location_url @property - def notification_email(self): - # type: () -> Optional[str] - return self.get("notification_email") - - @notification_email.setter - def notification_email(self, email): - # type: (Optional[Union[str]]) -> None - if not isinstance(email, str): - raise TypeError(f"Type 'str' is required for '{self.__name__}.notification_email'") - self["notification_email"] = email + def subscribers(self): + # type: () -> Optional[ExecutionSubscribers] + return self.get("subscribers") + + @subscribers.setter + def subscribers(self, subscribers): + # type: (Optional[ExecutionSubscribers]) -> None + if subscribers and not ( + isinstance(subscribers, dict) and + all( + sub_type and isinstance(sub_type, str) and + sub and isinstance(sub, str) and + val and isinstance(val, str) + for sub_type, subs in subscribers.items() + for sub, val in (subs if isinstance(subs, dict) else {None: None}).items() + ) + ): + raise TypeError( + "Mapping of subscriber types, status and notification references " + f"is required for '{self.__name__}.subscribers'." + ) + self["subscribers"] = subscribers or None @property def accept_language(self): @@ -1327,10 +1340,10 @@ def links(self, container=None, self_link=None): link.update(link_meta) return job_links - def json(self, container=None, self_link=None): # pylint: disable=W0221,arguments-differ - # type: (Optional[AnySettingsContainer], Optional[str]) -> JSON + def json(self, container=None): # pylint: disable=W0221,arguments-differ + # type: (Optional[AnySettingsContainer]) -> JSON """ - Obtains the JSON data representation for response body. + Obtains the :term:`JSON` data representation for :term:`Job` response body. .. note:: Settings are required to update API shortcut URLs to job additional information. @@ -1359,7 +1372,7 @@ def json(self, container=None, self_link=None): # pylint: disable=W0221,argu # new name as per OGC-API, enforced integer # https://github.com/opengeospatial/ogcapi-processes/blob/master/openapi/schemas/processes-core/statusInfo.yaml "progress": int(self.progress), - "links": self.links(settings, self_link=self_link) + "links": self.links(settings, self_link="status") } return sd.JobStatusInfo().deserialize(job_json) @@ -1394,7 +1407,7 @@ def params(self): "context": self.context, "request": self.request, "response": self.response, - "notification_email": self.notification_email, + "subscribers": self.subscribers, "accept_language": self.accept_language, } diff --git a/weaver/notify.py b/weaver/notify.py index f2e59fb9d..46d17f266 100644 --- a/weaver/notify.py +++ b/weaver/notify.py @@ -12,57 +12,19 @@ from mako.template import Template from pyramid.settings import asbool +from weaver import WEAVER_MODULE_DIR from weaver.datatype import Job -from weaver.utils import bytes2str, get_settings, str2bytes +from weaver.processes.constants import JobInputsOutputsSchema +from weaver.status import Status +from weaver.utils import bytes2str, fully_qualified_name, get_settings, request_extra, str2bytes +from weaver.wps_restapi.jobs.utils import get_results if TYPE_CHECKING: - from weaver.typedefs import AnySettingsContainer, SettingsType + from typing import Optional -LOGGER = logging.getLogger(__name__) + from weaver.typedefs import AnySettingsContainer, ExecutionSubscribers, JSON, SettingsType -__DEFAULT_TEMPLATE__ = """ -<%doc> - This is an example notification message to be sent by email when a job is done. - It is formatted using the Mako template library (https://www.makotemplates.org/). - The content must also include the message header. - - The provided variables are: - to: Recipient's address - job: weaver.datatype.Job object - settings: application settings - - And every variable returned by the `weaver.datatype.Job.json` method. - Below is a non-exhaustive list of example parameters from this method. - Refer to the method for complete listing. - - status: succeeded, failed - logs: url to the logs - jobID: example "617f23d3-f474-47f9-a8ec-55da9dd6ac71" - result: url to the outputs - duration: example "0:01:02" - message: example "Job succeeded." - percentCompleted: example 100 - -From: Weaver -To: ${to} -Subject: Job ${job.process} ${job.status.title()} -Content-Type: text/plain; charset=UTF-8 - -Dear user, - -Your job submitted on ${job.created.strftime("%Y/%m/%d %H:%M %Z")} to ${settings.get("weaver.url")} ${job.status}. - -% if job.status == "succeeded": -You can retrieve the output(s) at the following link: ${job.results_url(settings)} -% elif job.status == "failed": -You can retrieve potential error details from the following link: ${job.exceptions_url(settings)} -% endif - -The job logs are available at the following link: ${job.logs_url(settings)} - -Regards, -Weaver -""" +LOGGER = logging.getLogger(__name__) __SALT_LENGTH__ = 16 __TOKEN_LENGTH__ = 32 @@ -70,42 +32,74 @@ __DEFAULT_ROUNDS__ = 100_000 -def notify_job_complete(job, to_email_recipient, container): - # type: (Job, str, AnySettingsContainer) -> None +def resolve_email_template(job, settings): + # type: (Job, SettingsType) -> Template """ - Send email notification of a job completion. + Finds the most appropriate Mako Template email notification file based on configuration and :term:`Job` context. + + The example template is used by default *ONLY* if the template directory was not overridden. If overridden, failing + to match any of the template file locations will raise to report the issue instead of silently using the default. + + .. seealso:: + https://github.com/crim-ca/weaver/blob/master/weaver/wps_restapi/templates/notification_email_example.mako + + :raises IOError: + If the template directory was configured explicitly, but cannot be resolved, or if any of the possible + combinations of template file names cannot be resolved under that directory. + :returns: Matched template instance based on resolution order as described in the documentation. """ - settings = get_settings(container) - smtp_host = settings.get("weaver.wps_email_notify_smtp_host") - from_addr = settings.get("weaver.wps_email_notify_from_addr") - password = settings.get("weaver.wps_email_notify_password") - timeout = int(settings.get("weaver.wps_email_notify_timeout") or 10) - port = settings.get("weaver.wps_email_notify_port") - ssl = asbool(settings.get("weaver.wps_email_notify_ssl", True)) - # an example template is located in - # weaver/wps_restapi/templates/notification_email_example.mako template_dir = settings.get("weaver.wps_email_notify_template_dir") or "" - if not smtp_host or not port: - raise ValueError("The email server configuration is missing.") - port = int(port) - # find appropriate template according to settings - if not os.path.isdir(template_dir): - LOGGER.warning("No default email template directory configured. Using default format.") - template = Template(text=__DEFAULT_TEMPLATE__) # nosec: B702 + if not template_dir and not os.path.isdir(template_dir): + LOGGER.warning("No default email template directory configured. Using default template.") + template_file = os.path.join(WEAVER_MODULE_DIR, "wps_restapi/templates/notification_email_example.mako") + template = Template(filename=template_file) # nosec: B702 else: - default_name = settings.get("weaver.wps_email_notify_template_default", "default.mako") + default_setting = "weaver.wps_email_notify_template_default" + default_default = "default.mako" + default_name = settings.get(default_setting) or default_default process_name = f"{job.process!s}.mako" + process_status_name = f"{job.process!s}/{job.status!s}.mako" default_template = os.path.join(template_dir, default_name) process_template = os.path.join(template_dir, process_name) - if os.path.isfile(process_template): + process_status_template = os.path.join(template_dir, process_status_name) + if os.path.isfile(process_status_template): + template = Template(filename=process_status_template) # nosec: B702 + elif os.path.isfile(process_template): template = Template(filename=process_template) # nosec: B702 elif os.path.isfile(default_template): template = Template(filename=default_template) # nosec: B702 else: - raise IOError(f"Template file doesn't exist: OneOf[{process_name!s}, {default_name!s}]") + raise IOError( + f"No Mako Template file could be resolved under the template directory: [{template_dir}]. Expected " + f"OneOf[{process_status_name!s}, {process_name!s}, {{{default_setting!s}}}, {default_default!s}]" + ) + return template + + +def notify_job_email(job, to_email_recipient, container): + # type: (Job, str, AnySettingsContainer) -> None + """ + Send email notification of a :term:`Job` status. + """ + settings = get_settings(container) + smtp_host = settings.get("weaver.wps_email_notify_smtp_host") + from_addr = settings.get("weaver.wps_email_notify_from_addr") + password = settings.get("weaver.wps_email_notify_password") + timeout = int(settings.get("weaver.wps_email_notify_timeout") or 10) + port = settings.get("weaver.wps_email_notify_port") + ssl = asbool(settings.get("weaver.wps_email_notify_ssl", True)) + + if not smtp_host or not port: # pragma: no cover # only raise to warn service manager + # note: don't expose the values to avoid leaking them in logs + raise ValueError( + "The email server configuration is missing or incomplete. " + "Validate that SMTP host and port are properly configured." + ) + port = int(port) + template = resolve_email_template(job, settings) job_json = job.json(settings) contents = template.render(to=to_email_recipient, job=job, settings=settings, **job_json) message = f"{contents}".strip("\n") @@ -178,3 +172,109 @@ def decrypt_email(email, settings): except Exception as ex: LOGGER.debug("Job email decrypt failed [%r].", ex) raise ValueError("Cannot complete job, server not properly configured for notification email.") + + +def map_job_subscribers(job_body, settings): + # type: (JSON, SettingsType) -> Optional[ExecutionSubscribers] + """ + Converts the :term:`Job` subscribers definition submitted at execution into a mapping for later reference. + + The returned contents must be sorted in the relevant :term:`Job` object. + For backward compatibility, ``notification_email`` directly provided at the root will be used if corresponding + definitions were not provided for the corresponding subscriber email fields. + """ + notification_email = job_body.get("notification_email") + submit_subscribers = job_body.get("subscribers") or {} + mapped_subscribers = {} + for status, name, sub_type, alt in [ + (Status.STARTED, "inProgressEmail", "emails", None), + (Status.FAILED, "failedEmail", "emails", notification_email), + (Status.SUCCEEDED, "successEmail", "emails", notification_email), + (Status.STARTED, "inProgressUri", "callbacks", None), + (Status.FAILED, "failedUri", "callbacks", None), + (Status.SUCCEEDED, "successUri", "callbacks", None), + ]: + value = submit_subscribers.get(name) or alt + if not value: + continue + if sub_type == "emails": + value = encrypt_email(value, settings) + mapped_subscribers.setdefault(sub_type, {}) + mapped_subscribers[sub_type][status] = value + return mapped_subscribers or None + + +def send_job_notification_email(job, task_logger, settings): + # type: (Job, logging.Logger, SettingsType) -> None + """ + Sends a notification email about the execution status for the subscriber if requested during :term:`Job` submission. + """ + job_subs = job.subscribers or {} + notification_email = job_subs.get("emails", {}).get(job.status) + if notification_email: + try: + email = decrypt_email(notification_email, settings) + notify_job_email(job, email, settings) + message = "Notification email sent successfully." + job.save_log(logger=task_logger, message=message) + except Exception as exc: # pragma: no cover + exception = f"{fully_qualified_name(exc)}: {exc!s}" + message = f"Couldn't send notification email: [{exception}]" + job.save_log(errors=message, logger=task_logger, message=message) + + +def send_job_callback_request(job, task_logger, settings): + # type: (Job, logging.Logger, SettingsType) -> None + """ + Send a callback request about the execution status for the subscriber if requested at :term:`Job` execution. + """ + job_subs = job.subscribers or {} + request_uri = job_subs.get("callbacks", {}).get(job.status) + if request_uri: + try: + if job.status != Status.SUCCEEDED: + body = job.json(settings) + else: + # OGC-compliant request body needed to respect 'subscribers' callback definition + # (https://github.com/opengeospatial/ogcapi-processes/blob/master/core/examples/yaml/callbacks.yaml) + body, _ = get_results( + job, + settings, + value_key="value", + schema=JobInputsOutputsSchema.OGC, + link_references=False, + ) + request_extra( + "POST", + request_uri, + json=body, + allowed_codes=[200, 201, 202], + cache_enabled=False, + settings=settings, + ) + message = "Notification callback request sent successfully." + job.save_log(logger=task_logger, message=message) + except Exception as exc: # pragma: no cover + exception = f"{fully_qualified_name(exc)}: {exc!s}" + message = f"Couldn't send notification callback request: [{exception}]" + job.save_log(errors=message, logger=task_logger, message=message) + + +def notify_job_subscribers(job, task_logger, settings): + # type: (Job, logging.Logger, SettingsType) -> None + """ + Send notifications to all requested :term:`Job` subscribers according to its current status. + + All notification operations must be implemented as non-raising. + In case of error, the :term:`Job` logs will be updated with relevant error details and resume execution. + """ + try: + send_job_notification_email(job, task_logger, settings) + send_job_callback_request(job, task_logger, settings) + except Exception as exc: # pragma: no cover + exception = f"{fully_qualified_name(exc)}: {exc!s}" + message = ( + f"Unhandled error occurred when processing a job notification subscriber: [{exception}]. " + "Error ignored to resume execution." + ) + job.save_log(errors=message, logger=task_logger, message=message) diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index 81a05e01f..6177eaa3a 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -17,7 +17,7 @@ from weaver.datatype import Process, Service from weaver.execute import ExecuteControlOption, ExecuteMode from weaver.formats import AcceptLanguage, ContentType, clean_mime_type_format -from weaver.notify import decrypt_email, encrypt_email, notify_job_complete +from weaver.notify import map_job_subscribers, notify_job_subscribers from weaver.owsexceptions import OWSInvalidParameterValue, OWSNoApplicableCode from weaver.processes import wps_package from weaver.processes.constants import WPS_COMPLEX_DATA, JobInputsOutputsSchema @@ -127,8 +127,9 @@ def execute_process(task, job_id, wps_url, headers=None): job.status = Status.STARTED # will be mapped to 'RUNNING' job.status_message = f"Job {Status.STARTED}." # will preserve detail of STARTED vs RUNNING job.save_log(message=job.status_message) - task_logger = get_task_logger(__name__) + notify_job_subscribers(job, task_logger, settings) + job.save_log(logger=task_logger, message="Job task setup initiated.") load_pywps_config(settings) job.progress = JobProgress.SETUP @@ -269,8 +270,11 @@ def execute_process(task, job_id, wps_url, headers=None): job.save_log(errors=errors, logger=task_logger) job = store.update_job(job) finally: - # if task worker terminated, local 'job' is out of date compared to remote/background runner last update + # note: + # don't update the progress and status here except for 'success' to preserve last error that was set + # it is more relevant to return the latest step that worked properly to understand where it failed job = store.fetch_by_id(job.id) + # if task worker terminated, local 'job' is out of date compared to remote/background runner last update if task_terminated and map_status(job.status) == Status.FAILED: job.status = Status.DISMISSED task_success = map_status(job.status) not in JOB_STATUS_CATEGORIES[StatusCategory.FAILED] @@ -282,7 +286,7 @@ def execute_process(task, job_id, wps_url, headers=None): if task_success: job.progress = JobProgress.NOTIFY - send_job_complete_notification_email(job, task_logger, settings) + notify_job_subscribers(job, task_logger, settings) if job.status not in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]: job.status = Status.SUCCEEDED @@ -452,23 +456,6 @@ def parse_wps_inputs(wps_process, job): return wps_inputs -def send_job_complete_notification_email(job, task_logger, settings): - # type: (Job, logging.Logger, SettingsType) -> None - """ - Sends the notification email of completed execution if it was requested during job submission. - """ - if job.notification_email is not None: - try: - email = decrypt_email(job.notification_email, settings) - notify_job_complete(job, email, settings) - message = "Notification email sent successfully." - job.save_log(logger=task_logger, message=message) - except Exception as exc: - exception = f"{fully_qualified_name(exc)}: {exc!s}" - message = f"Couldn't send notification email ({exception})" - job.save_log(errors=message, logger=task_logger, message=message) - - def make_results_relative(results, settings): # type: (List[JSON], SettingsType) -> List[JSON] """ @@ -656,16 +643,14 @@ def submit_job_handler(payload, # type: ProcessExecution # Prefer header not resolved with a valid value should still resume without error is_execute_async = mode != ExecuteMode.SYNC exec_resp = json_body.get("response") - - notification_email = json_body.get("notification_email") - encrypted_email = encrypt_email(notification_email, settings) if notification_email else None + subscribers = map_job_subscribers(json_body, settings) store = db.get_store(StoreJobs) # type: StoreJobs job = store.save_job(task_id=Status.ACCEPTED, process=process, service=provider_id, inputs=json_body.get("inputs"), outputs=json_body.get("outputs"), is_local=is_local, is_workflow=is_workflow, access=visibility, user_id=user, context=context, execute_async=is_execute_async, execute_response=exec_resp, - custom_tags=tags, notification_email=encrypted_email, accept_language=language) + custom_tags=tags, accept_language=language, subscribers=subscribers) job.save_log(logger=LOGGER, message="Job task submitted for execution.", status=Status.ACCEPTED, progress=0) job = store.update_job(job) location_url = job.status_url(settings) @@ -688,7 +673,7 @@ def submit_job_handler(payload, # type: ProcessExecution if job.status == Status.SUCCEEDED: return get_job_results_response(job, settings, headers=resp_headers) # otherwise return the error status - body = job.json(container=settings, self_link="status") + body = job.json(container=settings) body["location"] = location_url resp = get_job_submission_response(body, resp_headers, error=True) return resp diff --git a/weaver/store/base.py b/weaver/store/base.py index cc36af096..f4ada87e6 100644 --- a/weaver/store/base.py +++ b/weaver/store/base.py @@ -23,6 +23,7 @@ DatetimeIntervalType, ExecutionInputs, ExecutionOutputs, + ExecutionSubscribers, JSON, SettingsType, TypedDict @@ -179,7 +180,7 @@ def save_job(self, user_id=None, # type: Optional[int] access=None, # type: Optional[AnyVisibility] context=None, # type: Optional[str] - notification_email=None, # type: Optional[str] + subscribers=None, # type: Optional[ExecutionSubscribers] accept_language=None, # type: Optional[str] created=None, # type: Optional[datetime.datetime] ): # type: (...) -> Job diff --git a/weaver/store/mongodb.py b/weaver/store/mongodb.py index 7e2cc701d..296213a22 100644 --- a/weaver/store/mongodb.py +++ b/weaver/store/mongodb.py @@ -78,8 +78,8 @@ AnyVersion, ExecutionInputs, ExecutionOutputs, - JSON, - SettingsType + ExecutionSubscribers, + JSON ) from weaver.visibility import AnyVisibility @@ -796,7 +796,7 @@ def save_job(self, user_id=None, # type: Optional[int] access=None, # type: Optional[AnyVisibility] context=None, # type: Optional[str] - notification_email=None, # type: Optional[str] + subscribers=None, # type: Optional[ExecutionSubscribers] accept_language=None, # type: Optional[str] created=None, # type: Optional[datetime.datetime] ): # type: (...) -> Job @@ -836,7 +836,7 @@ def save_job(self, "tags": list(set(tags)), # remove duplicates "access": access, "context": context, - "notification_email": notification_email, + "subscribers": subscribers, "accept_language": accept_language, }) self.collection.insert_one(new_job.params()) diff --git a/weaver/typedefs.py b/weaver/typedefs.py index a667ec0a8..88130cf93 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -6,7 +6,6 @@ if TYPE_CHECKING: import os import sys - import typing import uuid from datetime import datetime from decimal import Decimal @@ -69,7 +68,7 @@ from weaver.execute import AnyExecuteControlOption, AnyExecuteMode, AnyExecuteResponse, AnyExecuteTransmissionMode from weaver.processes.constants import CWL_RequirementNames from weaver.processes.wps_process_base import WpsProcessInterface - from weaver.status import AnyStatusType + from weaver.status import AnyStatusType, StatusType from weaver.visibility import AnyVisibility Path = Union[os.PathLike, str, bytes] @@ -413,6 +412,14 @@ class CWL_SchemaName(Protocol): JobOutputs = List[JobOutputItem] JobResults = List[JobValueItem] JobMonitorReference = Any # typically a URI of the remote job status or an execution object/handler + JobSubscribers = TypedDict("JobSubscribers", { + "failedUri": NotRequired[str], + "successUri": NotRequired[str], + "inProgressUri": NotRequired[str], + "failedEmail": NotRequired[str], + "successEmail": NotRequired[str], + "inProgressEmail": NotRequired[str], + }, total=True) # when schema='weaver.processes.constants.ProcessSchema.OGC' ExecutionInputsMap = Dict[str, Union[JobValueObject, List[JobValueObject]]] @@ -432,8 +439,10 @@ class CWL_SchemaName(Protocol): ExecutionOutputsMap = Dict[str, ExecutionOutputObject] ExecutionOutputs = Union[ExecutionOutputsList, ExecutionOutputsMap] ExecutionResultObjectRef = TypedDict("ExecutionResultObjectRef", { - "href": Optional[str], + "href": str, "type": NotRequired[str], + "title": NotRequired[str], + "rel": NotRequired[str], }, total=False) ExecutionResultObjectValue = TypedDict("ExecutionResultObjectValue", { "value": Optional[AnyValueType], @@ -443,6 +452,10 @@ class CWL_SchemaName(Protocol): ExecutionResultArray = List[ExecutionResultObject] ExecutionResultValue = Union[ExecutionResultObject, ExecutionResultArray] ExecutionResults = Dict[str, ExecutionResultValue] + ExecutionSubscribers = TypedDict("ExecutionSubscribers", { + "emails": NotRequired[Dict[StatusType, str]], + "callbacks": NotRequired[Dict[StatusType, str]], + }, total=True) # reference employed as 'JobMonitorReference' by 'WPS1Process' JobExecution = TypedDict("JobExecution", {"execution": WPSExecution}) @@ -846,5 +859,6 @@ class CWL_SchemaName(Protocol): "response": NotRequired[AnyExecuteResponse], "inputs": NotRequired[ExecutionInputs], "outputs": NotRequired[ExecutionOutputs], - "notification_email": NotRequired[str], + "subscribers": NotRequired[JobSubscribers], + "notification_email": NotRequired[str], # deprecated, backward-compatibility }, total=False) diff --git a/weaver/wps_restapi/api.py b/weaver/wps_restapi/api.py index aed8b26cd..f16f545c8 100644 --- a/weaver/wps_restapi/api.py +++ b/weaver/wps_restapi/api.py @@ -210,8 +210,7 @@ def get_conformance(category): f"{ogcapi_proc_core}/rec/job-list/next-1", f"{ogcapi_proc_core}/rec/job-list/next-2", f"{ogcapi_proc_core}/rec/job-list/next-3", - # FIXME: https://github.com/crim-ca/weaver/issues/230 - # ogcapi_proc_core + "/req/callback/job-callback", + f"{ogcapi_proc_core}/req/callback/job-callback", f"{ogcapi_proc_core}/req/core", f"{ogcapi_proc_core}/req/core/api-definition-op", f"{ogcapi_proc_core}/req/core/api-definition-success", diff --git a/weaver/wps_restapi/jobs/jobs.py b/weaver/wps_restapi/jobs/jobs.py index a4b6df8ae..35cc7df9c 100644 --- a/weaver/wps_restapi/jobs/jobs.py +++ b/weaver/wps_restapi/jobs/jobs.py @@ -133,7 +133,7 @@ def get_job_status(request): Retrieve the status of a job. """ job = get_job(request) - job_status = job.json(request, self_link="status") + job_status = job.json(request) return HTTPOk(json=job_status) diff --git a/weaver/wps_restapi/jobs/utils.py b/weaver/wps_restapi/jobs/utils.py index 7accb0a33..17c5b8bd7 100644 --- a/weaver/wps_restapi/jobs/utils.py +++ b/weaver/wps_restapi/jobs/utils.py @@ -326,7 +326,7 @@ def get_results(job, # type: Job :param schema: Selects which schema to employ for representing the output results (listing or mapping). :param link_references: - If enabled, an output that was requested by reference instead of value will be returned as ``Link`` reference. + If enabled, an output that was requested by reference instead of by value will be returned as ``Link`` header. :returns: Tuple with: - List or mapping of all outputs each with minimally an ID and value under the requested key. diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index fe1cb218f..d11cfdaef 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -26,7 +26,7 @@ import jsonschema import yaml from babel.numbers import list_currencies -from colander import All, DateTime, Email, Length, Money, OneOf, Range, Regex, drop, null, required +from colander import All, DateTime, Email as EmailRegex, Length, Money, OneOf, Range, Regex, drop, null, required from dateutil import parser as date_parser from weaver import WEAVER_SCHEMA_DIR, __meta__ @@ -359,6 +359,13 @@ class URL(ExtendedSchemaNode): format = "url" +class Email(ExtendedSchemaNode): + schema_type = String + description = "Email recipient." + format = "email" + validator = EmailRegex() + + class MediaType(ExtendedSchemaNode): schema_type = String description = "IANA identifier of content and format." @@ -1856,6 +1863,42 @@ class JobGroupsCommaSeparated(ExpandStringList, ExtendedSchemaNode): validator = StringOneOf(["process", "provider", "service", "status"], delimiter=",", case_sensitive=True) +class JobExecuteSubscribers(ExtendedMappingSchema): + _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/subscriber.yaml" + description = "Optional URIs for callbacks for this job." + # basic OGC subscribers + success_uri = URL( + name="successUri", + description="Location where to POST the job results on successful completion.", + ) + failure_uri = URL( + name="failedUri", + description="Location where to POST the job status if it fails execution.", + missing=drop, + ) + started_uri = URL( + name="inProgressUri", + description="Location where to POST the job status once it starts execution.", + missing=drop, + ) + # additional subscribers + success_email = Email( + name="successEmail", + description="Email recipient to send a notification on successful job completion.", + missing=drop, + ) + failure_email = Email( + name="failedEmail", + description="Email recipient to send a notification on failed job completion.", + missing=drop, + ) + started_email = Email( + name="inProgressEmail", + description="Email recipient to send a notification of job status once it starts execution.", + missing=drop, + ) + + class LaunchJobQuerystring(ExtendedMappingSchema): tags = JobTagsCommaSeparated() @@ -2292,7 +2335,7 @@ class OWSAddress(ExtendedMappingSchema, OWSNamespace): admin_area = OWSString(name="AdministrativeArea", title="AdministrativeArea", missing=drop) postal_code = OWSString(name="PostalCode", title="OWSPostalCode", example="A1B 2C3", missing=drop) email = OWSString(name="ElectronicMailAddress", title="OWSElectronicMailAddress", - example="mail@me.com", validator=Email, missing=drop) + example="mail@me.com", validator=EmailRegex, missing=drop) class OWSContactInfo(ExtendedMappingSchema, OWSNamespace): @@ -3656,12 +3699,15 @@ class Execute(ExecuteInputOutputs): ), validator=OneOf(ExecuteResponse.values()) ) - notification_email = ExtendedSchemaNode( - String(), + notification_email = Email( missing=drop, - validator=Email(), - description="Optionally send a notification email when the job is done." + deprecated=True, + description=( + "Optionally send a notification email when the job is completed. " + "This is equivalent to using subscribers for both failed and successful job status emails simultaneously." + ) ) + subscribers = JobExecuteSubscribers(missing=drop) class QuoteStatusSchema(ExtendedSchemaNode): diff --git a/weaver/wps_restapi/templates/notification_email_example.mako b/weaver/wps_restapi/templates/notification_email_example.mako index cf2e39c38..01dcfb8bf 100644 --- a/weaver/wps_restapi/templates/notification_email_example.mako +++ b/weaver/wps_restapi/templates/notification_email_example.mako @@ -1,29 +1,41 @@ -## -*- coding: utf-8 -*- <%doc> - This is an example notification message to be sent by email when a job is done - It is formatted using the Mako template library (https://www.makotemplates.org/) + This is an example notification message to be sent by email when a job is done. + It is formatted using the Mako template library (https://www.makotemplates.org/). + The content must also include the message header. The provided variables are: - job: a weaver.datatype.Job object - - And every variable returned by the `weaver.wps_restapi.jobs.jobs.job_format_json` function: - status: success, failure, etc - logs: url to the logs - jobID: example "617f23d3-f474-47f9-a8ec-55da9dd6ac71" - result: url to the outputs - duration: example "0:01:02" - message: example "Job succeeded." - percentCompleted: example "100" + to: Recipient's address + job: weaver.datatype.Job object + settings: application settings + + And every variable returned by the `weaver.datatype.Job.json` method. + Below is a non-exhaustive list of example parameters from this method. + Refer to the method for complete listing. + + status: succeeded, failed, started + logs: url to the logs + jobID: example "617f23d3-f474-47f9-a8ec-55da9dd6ac71" + result: url to the outputs + duration: example "0:01:02" + message: example "Job succeeded." + percentCompleted: example 100 +From: Weaver +To: ${to} +Subject: Job ${job.process} ${job.status.title()} +Content-Type: text/plain; charset=UTF-8 Dear user, -Your job submitted on ${job.created.strftime('%Y/%m/%d %H:%M %Z')} ${job.status}. +Your job submitted on ${job.created.strftime("%Y/%m/%d %H:%M %Z")} to ${settings.get("weaver.url")} ${job.status}. -% if job.status == 'succeeded': -You can retrieve the output(s) at the following link: ${result} +% if job.status == "succeeded": +You can retrieve the output(s) at the following link: ${job.results_url(settings)} +% elif job.status == "failed": +You can retrieve potential error details from the following link: ${job.exceptions_url(settings)} % endif -The logs are available here: ${logs} +The job logs are available at the following link: ${job.logs_url(settings)} Regards, +Weaver