diff --git a/CHANGES.rst b/CHANGES.rst index b80cc3f55..85093505d 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,10 +12,34 @@ Changes Changes: -------- -- No change. +- Add support of official `CWL` IANA types to allow `Process` deployment with the relevant ``Content-Type`` header + for the submitted payload (see `common-workflow-language/common-workflow-language#421 (comment) + `_, + relates to `opengeospatial/NamingAuthority#169 `_, + resolves `#434 `_). +- Support `Process` deployment using only `CWL` content provided it contains an ``id`` field representing the target + `Process` ID as per recommendation in `OGC Best Practice for Earth Observation Application Package, CWL Document + `_ (resolves `#434 `_). +- Support `Process` deployment with a payload using ``YAML`` content instead of ``JSON``. This ``YAML`` content + **MUST** be submitted in the request with a ``Content-Type`` header either equal to ``application/x-yaml`` or + ``application/ogcapppkg+yaml`` for the |ogc-app-pkg|_ schema, or using ``application/cwl+yaml`` for + a `CWL`-only definition. The definition will be loaded and converted to ``JSON`` for schema validation. Otherwise, + ``JSON`` contents is assumed to be directly provided in the request payload for validation as previously accomplished. +- Add partial support of `CWL` with ``$graph`` representation for the special case where the graph is composed of a list + of exactly one `Application Package`. Multi/nested-`CWL` definitions are **NOT** supported + (relates to `#56 `_). +- Add ``weaver.cwl_processes_dir`` configuration setting for preloading, registering or updating a set of + known `Process` definitions from `CWL` files stored in a nested directory structure. This allows a service provider + that uses `Weaver` to offer their `Processes` to directly maintain their definitions from the set of `CWL` files and + upload changes in the web application at startup without need to manually undeploy and redeploy each `Process`. +- Add ``weaver.cwl_processes_register_error`` to fail fast any `Process` registration error from `CWL` when loading + files at startup. Fixes: ------ +- Fix `Process` deployment using a `WPS-1/2` URL reference defining a ``GetCapabilities`` request to resolve + the corresponding ``DescribeProcess`` request if the `Process` ID can be inferred from other known locations + (relates to `#11 `_). - Move ``WpsPackage`` properties to instance level to avoid potential referencing of attributes across same class used by distinct running `Process`. @@ -43,6 +67,7 @@ Changes: Fixes: ------ +- Fix ``Process.payload`` improperly encoded in case of special characters where allowed such as in `CWL` definition. - Fix `CLI` operations assuming valid JSON response to instead return error response content and status code. - Fix `CLI` rendering of various optional arguments and groups when displaying help messages. - Fix invalid handling of ``Constants`` definitions mixed with ``classproperty`` such as in ``OutputFormat`` causing diff --git a/Makefile b/Makefile index 62ee1c698..deca676b0 100644 --- a/Makefile +++ b/Makefile @@ -512,8 +512,12 @@ check-security-only: check-security-code-only check-security-deps-only ## run s # FIXME: safety ignore file (https://github.com/pyupio/safety/issues/351) # ignored codes: # 42194: https://github.com/kvesteri/sqlalchemy-utils/issues/166 # not fixed since 2015 -# 42498: celery<5.2.0 bumps kombu>=5.2.1 with security fixes to {redis,sqs} # mongo is used by default in Weaver +# 42498: celery<5.2.0 bumps kombu>=5.2.1 with security fixes to {redis,sqs} # mongo is used by default in Weaver # 43738: celery<5.2.2 CVE-2021-23727: trusts the messages and metadata stored in backends +# 45185: pylint<2.13.0: unrelated doc extension (https://github.com/PyCQA/pylint/issues/5322) +SAFETY_IGNORE := 42194 42498 43738 45185 +SAFETY_IGNORE := $(addprefix "-i ",$(SAFETY_IGNORE)) + .PHONY: check-security-deps-only check-security-deps-only: mkdir-reports ## run security checks on package dependencies @echo "Running security checks of dependencies..." @@ -525,9 +529,7 @@ check-security-deps-only: mkdir-reports ## run security checks on package depen -r "$(APP_ROOT)/requirements-dev.txt" \ -r "$(APP_ROOT)/requirements-doc.txt" \ -r "$(APP_ROOT)/requirements-sys.txt" \ - -i 42194 \ - -i 42498 \ - -i 43738 \ + $(SAFETY_IGNORE) \ 1> >(tee "$(REPORTS_DIR)/check-security-deps.txt")' .PHONY: check-security-code-only diff --git a/config/weaver.ini.example b/config/weaver.ini.example index 625e668f6..fe3d5b24a 100644 --- a/config/weaver.ini.example +++ b/config/weaver.ini.example @@ -81,6 +81,11 @@ weaver.quote_sync_max_wait = 20 # (default: use cwltool auto-resolution according to running machine and current user/group) weaver.cwl_euid = weaver.cwl_egid = +# directory where to load predefined process definitions defined with CWL files +# default configuration directory is used if this entry is removed +# only CWL files are considered, lookup in directory is recursive +weaver.cwl_processes_dir = +weaver.cwl_processes_register_error = false # --- Weaver WPS settings --- weaver.wps = true diff --git a/docs/source/appendix.rst b/docs/source/appendix.rst index dc38f0f1d..fd38ff3dd 100644 --- a/docs/source/appendix.rst +++ b/docs/source/appendix.rst @@ -169,7 +169,8 @@ Glossary later retrieval using an access token. .. seealso:: - :ref:`vault_upload` + - :ref:`vault_upload` + - :ref:`file_vault_inputs` WKT Well-Known Text geometry representation. diff --git a/docs/source/configuration.rst b/docs/source/configuration.rst index 48dc3634b..a0eb894f7 100644 --- a/docs/source/configuration.rst +++ b/docs/source/configuration.rst @@ -1,6 +1,9 @@ .. include:: references.rst .. _configuration: +.. default location to quickly reference items without the explicit and long prefix +.. py:currentmodule:: weaver.config + ****************** Configuration ****************** @@ -76,8 +79,21 @@ they are optional and which default value or operation is applied in each situat .. versionadded:: 4.0.0 +- | ``weaver.cwl_euid = `` [:class:`int`, *experimental*] + | (default: ``None``, auto-resolved by :term:`CWL` with effective machine user) + | + | Define the effective machine user ID to be used for running the :term:`Application Package`. + +.. versionadded:: 1.9.0 + +- | ``weaver.cwl_egid = `` [:class:`int`, *experimental*] + | (default: ``None``, auto-resolved by :term:`CWL` with the group of the effective user) + | + | Define the effective machine group ID to be used for running the :term:`Application Package`. + +.. versionadded:: 1.9.0 -- | ``weaver.wps = true|false`` +- | ``weaver.wps = true|false`` [:class:`bool`-like] | (default: ``true``) | | Enables the WPS-1/2 endpoint. @@ -177,7 +193,7 @@ they are optional and which default value or operation is applied in each situat | | Prefix where process :term:`Job` worker should execute the :term:`Process` from. -- | ``weaver.wps_restapi = true|false`` +- | ``weaver.wps_restapi = true|false`` [:class:`bool`-like] | (default: ``true``) | | Enable the WPS-REST endpoint. @@ -215,8 +231,8 @@ they are optional and which default value or operation is applied in each situat .. versionadded:: 4.15.0 -- | ``weaver.exec_sync_max_wait`` - | (default: ``20``, :class:`int`, seconds) +- | ``weaver.exec_sync_max_wait = `` [:class:`int`, seconds] + | (default: ``20``) | | Defines the maximum duration allowed for running a :term:`Job` execution in `synchronous` mode. | @@ -225,8 +241,8 @@ they are optional and which default value or operation is applied in each situat .. versionadded:: 4.15.0 -- | ``weaver.quote_sync_max_wait`` - | (default: ``20``, :class:`int`, seconds) +- | ``weaver.quote_sync_max_wait = `` [:class:`int`, seconds] + | (default: ``20``) | | Defines the maximum duration allowed for running a :term:`Quote` estimation in `synchronous` mode. | @@ -321,6 +337,8 @@ using the ``Weaver.data_sources`` configuration setting. .. seealso:: More details about the implication of :term:`Data Source` are provided in :ref:`data-source`. +.. _conf_wps_processes: + Configuration of WPS Processes ======================================= @@ -347,19 +365,77 @@ Please refer to `wps_processes.yml.example`_ for explicit format, keywords suppo Using this registration method, the processes will always reflect the latest modification from the remote WPS provider. -To specify a custom YAML file, you can define the setting named ``weaver.wps_processes_file`` with the appropriate path -within the employed ``weaver.ini`` file that starts your application. By default, this setting will look for the -provided path as absolute location, then will attempt to resolve relative path (corresponding to where the application -is started from), and will also look within the |weaver-config|_ directory. If none of the files can be found, the -operation is skipped. -To ensure that this feature is disabled and to avoid any unexpected auto-deployment provided by this functionality, -simply set setting ``weaver.wps_processes_file`` as *undefined* (i.e.: nothing after ``=`` in ``weaver.ini``). +- | ``weaver.wps_processes_file = `` + | (default: :py:data:`WEAVER_DEFAULT_WPS_PROCESSES_CONFIG` located in :py:data:`WEAVER_CONFIG_DIR`) + | + | Defines a custom :term:`YAML` file corresponding to `wps_processes.yml.example`_ schema to pre-load :term:`WPS` + processes and/or providers for registration at application startup. + | + | The value defined by this setting will look for the provided path as absolute location, then will attempt to + resolve relative path (corresponding to where the application is started from), and will also look within + the |weaver-config|_ directory. If none of the files can be found, the operation is skipped. + | + | To ensure that this feature is disabled and to avoid any unexpected auto-deployment provided by this functionality, + simply set setting ``weaver.wps_processes_file`` as *undefined* (i.e.: nothing after ``=`` in ``weaver.ini``). + The default value is employed if the setting is not defined at all. .. seealso:: - `weaver.ini.example`_ - `wps_processes.yml.example`_ +.. _conf_cwl_processes: + +Configuration of CWL Processes +======================================= + +.. versionadded:: 4.19.0 + +Although `Weaver` supports :ref:`Deployment ` and dynamic management of :term:`Process` definitions +while the web application is running, it is sometime more convenient for service providers to offer a set of predefined +:ref:`application-package` definitions. In order to automatically register such definitions (or update them if changed), +without having to repeat any deployment requests after the application was started, it is possible to employ the +configuration setting ``weaver.cwl_processes_dir``. Registration of a :term:`Process` using this approach will result +in an identical definition as if it was :ref:`Deployed ` using :term:`API` requests or using the +:ref:`cli` interfaces. + +- | ``weaver.cwl_processes_dir = `` + | (default: :py:data:`WEAVER_CONFIG_DIR`) + | + | Defines the root directory where to *recursively* and *alphabetically* load any :term:`CWL` file + to deploy the corresponding :term:`Process` definitions. Files at higher levels are loaded first before moving + down into lower directories of the structure. + | + | Any failed deployment from a seemingly valid :term:`CWL` will be logged with the corresponding error message. + Loading will proceed by ignoring failing cases according to ``weaver.cwl_processes_register_error`` setting. + The number of successful :term:`Process` deployments will also be reported if any should occur. + | + | The value defined by this setting will look for the provided path as absolute location, then will attempt to + resolve relative path (corresponding to where the application is started from). If no :term:`CWL` file could be + found, the operation is skipped. + | + | To ensure that this feature is disabled and to avoid any unexpected auto-deployment provided by this functionality, + simply set setting ``weaver.cwl_processes_dir`` as *undefined* (i.e.: nothing after ``=`` in ``weaver.ini``). + The default value is employed if the setting is not defined at all. + +.. note:: + When registering processes using :term:`CWL`, it is mandatory for those definitions to provide an ``id`` within + the file along other :term:`CWL` details to let `Weaver` know which :term:`Process` reference to use for deployment. + +.. warning:: + If a :term:`Process` depends on another definition, such as in the case of a :ref:`proc_workflow` definition, all + dependencies must be registered prior to this :term:`Process`. Consider naming your :term:`CWL` files to take + advantage of loading order to resolve such situations. + +- | ``weaver.cwl_processes_register_error = true|false`` [:class:`bool`] + | (default: ``false``, *ignore failures*) + | + | Indicate if `Weaver` should ignore failing :term:`Process` deployments (when ``false``), due to unsuccessful + registration of :term:`CWL` files found within any sub-directory of ``weaver.cwl_processes_dir`` path, or + immediately fail (when ``true``) when an issue is raised during :term:`Process` deployment. + +.. seealso:: + - `weaver.ini.example`_ .. _conf_request_options: @@ -391,7 +467,7 @@ etc. on a per-request basis, leave other requests unaffected and generally more | Path of the :term:`Request Options` definitions to employ. -- | ``weaver.ssl_verify = true|false`` +- | ``weaver.ssl_verify = true|false`` [:class:`bool`-like] | (default: ``true``) | | Toggle the SSL certificate verification across all requests. @@ -404,6 +480,35 @@ etc. on a per-request basis, leave other requests unaffected and generally more basis using :term:`Request Options` for acceptable cases. +.. _conf_vault: + +Configuration of File Vault +======================================= + +.. versionadded:: 4.9.0 + +Configuration of the :term:`Vault` is required in order to obtain access to its functionalities +and to enable its :term:`API` endpoints. This feature is notably employed to push local files to a remote `Weaver` +instance when using the :ref:`cli` utilities, in order to use them for the :term:`Job` execution. Please refer to +below references for more details. + +.. seealso:: + - :ref:`vault_upload` + - :ref:`file_vault_inputs` + +- | ``weaver.vault = true|false`` [:class:`bool`-like] + | (default: ``true``) + | + | Toggles the :term:`Vault` feature. + +- | ``weaver.vault_dir = `` + | (default: ``/tmp/vault``) + | + | Defines the default location where to write :ref:`files uploaded to the Vault `. + | + | If the directory does not exist, it is created on demand by the feature making use of it. + + Starting the Application ======================================= diff --git a/docs/source/processes.rst b/docs/source/processes.rst index 431213ca7..ff9a8eae5 100644 --- a/docs/source/processes.rst +++ b/docs/source/processes.rst @@ -1325,12 +1325,15 @@ Note again that the more the :term:`Process` is verbose, the more tracking will Uploading File to the Vault ----------------------------- +The :term:`Vault` is available as secured storage for uploading files to be employed later for :term:`Process` +execution (see also :ref:`file_vault_inputs`). + .. note:: The :term:`Vault` is a specific feature of `Weaver`. Other :term:`ADES`, :term:`EMS` and :term:`OGC API - Processes` servers are not expected to provide this endpoint nor support the |vault_ref| reference format. -The :term:`Vault` is available as secured storage for uploading files to be employed later for :term:`Process` -execution (see also :ref:`file_vault_inputs`). +.. seealso:: + Refer to :ref:`conf_vault` for applicable settings for this feature. When upload succeeds, the response will return a :term:`Vault` UUID and an ``access_token`` to access the file. Uploaded files cannot be accessed unless the proper credentials are provided. Requests toward the :term:`Vault` should @@ -1346,7 +1349,7 @@ the file from the :term:`Vault`. For both HTTP methods, the ``X-Auth-Vault`` hea .. note:: The :term:`Vault` acts only as temporary file storage. For this reason, once the file has been downloaded, it is - immediately deleted. Download can only occur once. It is assumed that the resource that must employ it will have + *immediately deleted*. Download can only occur once. It is assumed that the resource that must employ it will have created a local copy from the download and the :term:`Vault` doesn't require to preserve it anymore. This behaviour intends to limit the duration for which potentially sensitive data remains available in the :term:`Vault` as well as performing cleanup to limit storage space. diff --git a/tests/processes/test_utils.py b/tests/processes/test_utils.py index cba55c0ac..8b03164fb 100644 --- a/tests/processes/test_utils.py +++ b/tests/processes/test_utils.py @@ -1,5 +1,8 @@ +import json +import os import tempfile +import mock import pytest import yaml @@ -10,8 +13,9 @@ setup_mongodb_processstore, setup_mongodb_servicestore ) +from weaver.exceptions import PackageRegistrationError from weaver.processes.constants import CWL_REQUIREMENT_APP_WPS1 -from weaver.processes.utils import register_wps_processes_from_config +from weaver.processes.utils import register_cwl_processes_from_config, register_wps_processes_from_config WPS1_URL1 = resources.TEST_REMOTE_SERVER_URL WPS1_URL2 = "http://yet-another-server.com" @@ -25,7 +29,7 @@ def test_register_wps_processes_from_config_empty(): f.flush() f.seek(0) try: - register_wps_processes_from_config(f.name, {}) + register_wps_processes_from_config({}, f.name) except Exception: # noqa pytest.fail("Empty file should not raise any error") @@ -34,7 +38,7 @@ def test_register_wps_processes_from_config_empty(): f.flush() f.seek(0) try: - register_wps_processes_from_config(f.name, {}) + register_wps_processes_from_config({}, f.name) except Exception: # noqa pytest.fail("File with empty 'processes' section should not raise any error") @@ -43,7 +47,7 @@ def test_register_wps_processes_from_config_empty(): f.flush() f.seek(0) try: - register_wps_processes_from_config(f.name, {}) + register_wps_processes_from_config({}, f.name) except Exception: # noqa pytest.fail("File with empty 'providers' section should not raise any error") @@ -52,14 +56,14 @@ def test_register_wps_processes_from_config_empty(): f.flush() f.seek(0) try: - register_wps_processes_from_config(f.name, {}) + register_wps_processes_from_config({}, f.name) except Exception: # noqa pytest.fail("File with empty 'providers' and 'processes' sections should not raise any error") def test_register_wps_processes_from_config_missing(): try: - register_wps_processes_from_config("/this/path/des/not/exist", {}) + register_wps_processes_from_config({}, "/this/path/des/not/exist") except Exception: # noqa pytest.fail("Path pointing to missing file should not raise any error") @@ -128,7 +132,7 @@ def test_register_wps_processes_from_config_valid(): try: # note: # can take some time to process since OWSLib must parse all GetCapabilities/DescribeProcesses responses - register_wps_processes_from_config(f.name, config) + register_wps_processes_from_config(config, f.name) except Exception: # noqa pytest.fail("Valid definitions in configuration file should not raise any error") @@ -186,3 +190,144 @@ def test_register_wps_processes_from_config_valid(): proc6 = p_store.fetch_by_id(proc6_id) assert proc6.package["hints"][CWL_REQUIREMENT_APP_WPS1]["provider"] == WPS1_URL4 + "/" assert proc6.package["hints"][CWL_REQUIREMENT_APP_WPS1]["process"] == resources.WPS_LITERAL_COMPLEX_IO_ID + + +def test_register_cwl_processes_from_config_undefined(): + assert register_cwl_processes_from_config({}) == 0 + + +def test_register_cwl_processes_from_config_empty_var(): + settings = {"weaver.cwl_processes_dir": ""} + assert register_cwl_processes_from_config(settings) == 0 + + +def test_register_cwl_processes_from_config_not_a_dir(): + with tempfile.NamedTemporaryFile(mode="w") as tmp_file: + tmp_file.write("data") + + settings = {"weaver.cwl_processes_dir": tmp_file.name} + assert register_cwl_processes_from_config(settings) == 0 + + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_dir = os.path.join(tmp_dir, "does-not-exist") + settings = {"weaver.cwl_processes_dir": tmp_dir} + assert register_cwl_processes_from_config(settings) == 0 + + +def test_register_cwl_processes_from_config_dir_no_cwl(): + with tempfile.TemporaryDirectory() as tmp_dir: + settings = {"weaver.cwl_processes_dir": tmp_dir} + assert register_cwl_processes_from_config(settings) == 0 + + with tempfile.NamedTemporaryFile(dir=tmp_dir, suffix=".json", mode="w", delete=False) as tmp_file: + tmp_file.write(json.dumps({"data": "test"})) + + assert register_cwl_processes_from_config(settings) == 0 + + +def test_register_cwl_processes_from_config_load_recursive(): + with tempfile.TemporaryDirectory() as tmp_dir: + first_dir = os.path.join(tmp_dir, "first") + nested_dir = os.path.join(tmp_dir, "nested") + deeper_dir = os.path.join(nested_dir, "deeper") + os.makedirs(first_dir) + os.makedirs(deeper_dir) + + # Write files in **un**ordered fashion to validate ordered loading occurs: + # /tmp + # /dir + # file3.cwl + # random.yml + # file5.cwl + # /first + # b_file9.cwl # note: must appear before file2 and a_file8, 'nested' loaded after 'first' + # file2.cwl + # invalid.cwl + # /nested + # a_file8.cwl # note: must appear after file2 and b_file9, 'nested' loaded after 'first' + # random.json + # file1.cwl + # file4.cwl + # /deeper + # c_file7.cwl + # file0.cwl + # file6.cwl + # invalid.cwl + # + # Loaded order: + # /tmp/dir/file3.cwl + # /tmp/dir/file5.cwl + # /tmp/dir/first/b_file9.cwl + # /tmp/dir/first/file2.cwl + # /tmp/dir/nested/a_file8.cwl + # /tmp/dir/nested/file1.cwl # note: + # /tmp/dir/nested/file4.cwl dir 'deeper' purposely named to appear before + # /tmp/dir/nested/deeper/file0.cwl 'file#' one level above if they were sorted *only* + # /tmp/dir/nested/deeper/file6.cwl alphabetically not considering directory structure + valid_order = [3, 5, 9, 2, 8, 1, 4, 7, 0, 6] + # doest not need to be valid CWL, mocked loading + cwl_ordered = [{"cwlVersion": "v1.0", "id": str(i)} for i in range(len(valid_order))] + with open(os.path.join(tmp_dir, "file3.cwl"), mode="w", encoding="utf-8") as tmp_file: + tmp_file.write(json.dumps(cwl_ordered[3])) + with open(os.path.join(tmp_dir, "file5.cwl"), mode="w", encoding="utf-8") as tmp_file: + tmp_file.write(json.dumps(cwl_ordered[5])) + with open(os.path.join(tmp_dir, "random.yml"), mode="w", encoding="utf-8") as tmp_file: + tmp_file.write("random: data") + with open(os.path.join(first_dir, "file2.cwl"), mode="w", encoding="utf-8") as tmp_file: + tmp_file.write(json.dumps(cwl_ordered[2])) + with open(os.path.join(first_dir, "invalid.cwl"), mode="w", encoding="utf-8") as tmp_file: + tmp_file.write(json.dumps({"invalid": True})) + with open(os.path.join(first_dir, "b_file9.cwl"), mode="w", encoding="utf-8") as tmp_file: + tmp_file.write(json.dumps(cwl_ordered[9])) + with open(os.path.join(nested_dir, "file1.cwl"), mode="w", encoding="utf-8") as tmp_file: + tmp_file.write(json.dumps(cwl_ordered[1])) + with open(os.path.join(nested_dir, "file4.cwl"), mode="w", encoding="utf-8") as tmp_file: + tmp_file.write(json.dumps(cwl_ordered[4])) + with open(os.path.join(nested_dir, "random.json"), mode="w", encoding="utf-8") as tmp_file: + tmp_file.write(json.dumps({"random": "data"})) + with open(os.path.join(nested_dir, "a_file8.cwl"), mode="w", encoding="utf-8") as tmp_file: + tmp_file.write(json.dumps(cwl_ordered[8])) + with open(os.path.join(deeper_dir, "c_file7.cwl"), mode="w", encoding="utf-8") as tmp_file: + tmp_file.write(json.dumps(cwl_ordered[7])) + with open(os.path.join(deeper_dir, "file0.cwl"), mode="w", encoding="utf-8") as tmp_file: + tmp_file.write(json.dumps(cwl_ordered[0])) + with open(os.path.join(deeper_dir, "file6.cwl"), mode="w", encoding="utf-8") as tmp_file: + tmp_file.write(json.dumps(cwl_ordered[6])) + with open(os.path.join(deeper_dir, "invalid.cwl"), mode="w", encoding="utf-8") as tmp_file: + tmp_file.write(json.dumps({"invalid": True})) + + def no_op_valid(_cwl, *_, **__): # type: ignore + if isinstance(_cwl, dict) and "invalid" in _cwl: + raise PackageRegistrationError("CWL INVALID") + + with mock.patch("weaver.processes.utils.deploy_process_from_payload", side_effect=no_op_valid) as mocked: + settings = {"weaver.cwl_processes_dir": tmp_dir} + assert register_cwl_processes_from_config(settings) == len(cwl_ordered) + + call_count = len(cwl_ordered) + 2 # 2 invalid + assert mocked.call_count == call_count + valid_calls = list(call for call in mocked.call_args_list if "invalid" not in call.args[0]) + assert len(valid_calls) == len(cwl_ordered) + for i, (order, call) in enumerate(zip(valid_order, valid_calls)): + assert call.args[0] == cwl_ordered[order], f"Expected CWL does not match load order at position: {i}" + + +def test_register_cwl_processes_from_config_error_handling(): + with tempfile.TemporaryDirectory() as tmp_dir: + with open(os.path.join(tmp_dir, "ignore.cwl"), mode="w", encoding="utf-8") as tmp_file: + tmp_file.write("not important") + + def raise_deploy(*_, **__): + raise PackageRegistrationError("test") + + with mock.patch("weaver.processes.utils.deploy_process_from_payload", side_effect=raise_deploy) as mocked: + settings = {"weaver.cwl_processes_dir": tmp_dir} + assert register_cwl_processes_from_config(settings) == 0 + assert mocked.call_count == 1 + with mock.patch("weaver.processes.utils.deploy_process_from_payload", side_effect=raise_deploy) as mocked: + result = None + with pytest.raises(PackageRegistrationError): + settings["weaver.cwl_processes_register_error"] = "true" + result = register_cwl_processes_from_config(settings) + assert mocked.call_count == 1 + assert result is None # not returned diff --git a/tests/test_base.py b/tests/test_base.py index 0475c6f54..ff94a90df 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -1,6 +1,6 @@ import pytest -from weaver.base import Constants, ExtendedEnum +from weaver.base import Constants, ExtendedEnum, classproperty class DummyConstant(Constants): @@ -11,6 +11,8 @@ class DummyConstant(Constants): t4 = "T4" T5 = "random5" # ensure name is case-insensitive (not matched via the lowercase value) t6 = "RANDOM6" # ensure name is case-insensitive (not matched via the uppercase value) + T7 = classproperty(fget=lambda self: "t7") + t8 = classproperty(fget=lambda self: "T8") def test_constants_get_by_name_or_value_case_insensitive(): @@ -30,6 +32,28 @@ def test_constants_get_by_name_or_value_case_insensitive(): assert DummyConstant.get("RANDOM5") == DummyConstant.T5 assert DummyConstant.get("random6") == DummyConstant.t6 assert DummyConstant.get("RANDOM6") == DummyConstant.t6 + assert DummyConstant.get("t7") == DummyConstant.T7 + assert DummyConstant.get("T7") == DummyConstant.T7 + assert DummyConstant.get("t8") == DummyConstant.t8 + assert DummyConstant.get("T8") == DummyConstant.t8 + + +def test_constants_get_by_attribute(): + assert DummyConstant.get(DummyConstant.T1) == DummyConstant.T1 + assert DummyConstant.get(DummyConstant.T2) == DummyConstant.T2 + assert DummyConstant.get(DummyConstant.t3) == DummyConstant.t3 + assert DummyConstant.get(DummyConstant.t4) == DummyConstant.t4 + assert DummyConstant.get(DummyConstant.T5) == DummyConstant.T5 + assert DummyConstant.get(DummyConstant.t6) == DummyConstant.t6 + assert DummyConstant.get(DummyConstant.T7) == DummyConstant.T7 + assert DummyConstant.get(DummyConstant.t8) == DummyConstant.t8 + + +def test_constants_classproperty_as_string(): + assert isinstance(DummyConstant.get("T7"), str) + assert isinstance(DummyConstant.T7, str) + assert isinstance(DummyConstant.get("T8"), str) + assert isinstance(DummyConstant.t8, str) def test_constants_in_by_name_or_value(): @@ -41,6 +65,14 @@ def test_constants_in_by_name_or_value(): assert "T3" in DummyConstant assert "t4" in DummyConstant assert "T4" in DummyConstant + assert "t5" in DummyConstant + assert "T5" in DummyConstant + assert "t6" in DummyConstant + assert "T6" in DummyConstant + assert "t7" in DummyConstant + assert "T7" in DummyConstant + assert "t8" in DummyConstant + assert "T8" in DummyConstant def test_constants_immutable(): diff --git a/tests/test_opensearch.py b/tests/test_opensearch.py index fc45f19ec..1d3049bb6 100644 --- a/tests/test_opensearch.py +++ b/tests/test_opensearch.py @@ -11,10 +11,9 @@ import mock import pytest from pyramid import testing -from pyramid.testing import DummyRequest from pywps.inout.inputs import LiteralInput -from tests.utils import setup_mongodb_processstore +from tests.utils import MockedRequest, setup_mongodb_processstore from weaver.processes import opensearch from weaver.processes.constants import OpenSearchField from weaver.processes.opensearch import make_param_id @@ -61,7 +60,7 @@ def load_json_test_file(filename): def make_request(**kw): - request = DummyRequest(**kw) + request = MockedRequest(**kw) if request.registry.settings is None: request.registry.settings = {} request.registry.settings["weaver.url"] = "localhost" diff --git a/tests/utils.py b/tests/utils.py index 95ba0ae27..ca0d6077a 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -34,6 +34,7 @@ from pyramid.config import Configurator from pyramid.httpexceptions import HTTPException, HTTPNotFound, HTTPUnprocessableEntity from pyramid.registry import Registry +from pyramid.testing import DummyRequest from requests import Response from webtest import TestApp, TestResponse @@ -45,6 +46,7 @@ from weaver.formats import ContentType from weaver.store.mongodb import MongodbJobStore, MongodbProcessStore, MongodbServiceStore from weaver.utils import ( + bytes2str, fetch_file, get_header, get_path_kvp, @@ -64,7 +66,14 @@ from owslib.wps import Process as ProcessOWSWPS from pywps.app import Process as ProcessPyWPS - from weaver.typedefs import AnyHeadersContainer, AnyRequestMethod, AnyRequestType, AnyResponseType, SettingsType + from weaver.typedefs import ( + JSON, + AnyHeadersContainer, + AnyRequestMethod, + AnyRequestType, + AnyResponseType, + SettingsType + ) # pylint: disable=C0103,invalid-name,E1101,no-member MockPatch = mock._patch # noqa @@ -385,12 +394,23 @@ def run_command(command, trim=True, expect_error=False, entrypoint=None): return out_lines +class MockedRequest(DummyRequest): + """ + Patch missing properties that are expected from :mod:`pyramid` requests. + """ + json = {} # type: JSON + + @property + def text(self): + return bytes2str(self.body) if self.body else json.dumps(self.json, ensure_ascii=False) + + class MockedResponse(TestResponse): """ - Replaces the ``json`` property by the expected callable from all real response implementations. + Replaces the ``json`` property by the expected callable from responses using :mod:`requests` implementation. """ def json(self): # pylint: disable=W0236,invalid-overridden-method - return self.json_body or json.loads(self.body.decode("UTF-8")) + return self.json_body or json.loads(bytes2str(self.body)) def mocked_file_response(path, url): @@ -989,7 +1009,7 @@ def mocked_process_package(): Provides mocks that bypasses execution when calling :module:`weaver.processes.wps_package` functions. """ return ( - mock.patch("weaver.processes.wps_package._load_package_file", return_value={"class": "test"}), + mock.patch("weaver.processes.wps_package.load_package_file", return_value={"class": "test"}), mock.patch("weaver.processes.wps_package._load_package_content", return_value=(None, "test", None)), mock.patch("weaver.processes.wps_package._get_package_inputs_outputs", return_value=(None, None)), mock.patch("weaver.processes.wps_package._merge_package_inputs_outputs", return_value=([], [])), diff --git a/tests/wps_restapi/test_processes.py b/tests/wps_restapi/test_processes.py index cb443b55e..7ebb27c89 100644 --- a/tests/wps_restapi/test_processes.py +++ b/tests/wps_restapi/test_processes.py @@ -14,6 +14,8 @@ import pyramid.testing import pytest import stopit +import webtest.app +import yaml from tests import resources from tests.utils import ( @@ -34,7 +36,7 @@ from weaver.exceptions import JobNotFound, ProcessNotFound from weaver.execute import ExecuteControlOption, ExecuteMode, ExecuteResponse, ExecuteTransmissionMode from weaver.formats import AcceptLanguage, ContentType, get_cwl_file_format -from weaver.processes.constants import CWL_REQUIREMENT_APP_WPS1, ProcessSchema +from weaver.processes.constants import CWL_REQUIREMENT_APP_DOCKER, CWL_REQUIREMENT_APP_WPS1, ProcessSchema from weaver.processes.wps_testing import WpsTestProcess from weaver.status import Status from weaver.utils import fully_qualified_name, get_path_kvp, load_file, ows_context_href @@ -535,26 +537,32 @@ def test_deploy_process_default_endpoint_wps1(self): assert process_wps_endpoint == weaver_wps_path @staticmethod - def assert_deployed_wps3(response_json, expected_process_id): + def assert_deployed_wps3(response_json, expected_process_id, assert_io=True): proc = response_json["process"] assert expected_process_id in proc["id"] - assert len(proc["inputs"]) == 1 - assert proc["inputs"][0]["id"] == "input-1" - assert proc["inputs"][0]["minOccurs"] == 1 - assert proc["inputs"][0]["maxOccurs"] == 1 - assert "formats" not in proc["inputs"][0] # literal data doesn't have "formats" - assert len(proc["outputs"]) == 1 - assert proc["outputs"][0]["id"] == "output" - assert "minOccurs" not in proc["outputs"][0] - assert "maxOccurs" not in proc["outputs"][0] - # TODO: handling multiple outputs (https://github.com/crim-ca/weaver/issues/25) - # assert proc["outputs"][0]["minOccurs"] == "1" - # assert proc["outputs"][0]["maxOccurs"] == "1" - assert isinstance(proc["outputs"][0]["formats"], list) - assert len(proc["outputs"][0]["formats"]) == 1 - assert proc["outputs"][0]["formats"][0]["mediaType"] == ContentType.APP_JSON - - def deploy_process_make_visible_and_fetch_deployed(self, deploy_payload, expected_process_id): + if assert_io: + assert len(proc["inputs"]) == 1 + assert proc["inputs"][0]["id"] == "input-1" + assert proc["inputs"][0]["minOccurs"] == 1 + assert proc["inputs"][0]["maxOccurs"] == 1 + assert "formats" not in proc["inputs"][0] # literal data doesn't have "formats" + assert len(proc["outputs"]) == 1 + assert proc["outputs"][0]["id"] == "output" + assert "minOccurs" not in proc["outputs"][0] + assert "maxOccurs" not in proc["outputs"][0] + # TODO: handling multiple outputs (https://github.com/crim-ca/weaver/issues/25) + # assert proc["outputs"][0]["minOccurs"] == "1" + # assert proc["outputs"][0]["maxOccurs"] == "1" + assert isinstance(proc["outputs"][0]["formats"], list) + assert len(proc["outputs"][0]["formats"]) == 1 + assert proc["outputs"][0]["formats"][0]["mediaType"] == ContentType.APP_JSON + + def deploy_process_make_visible_and_fetch_deployed(self, + deploy_payload, + expected_process_id, + headers=None, + assert_io=True, + ): """ Deploy, make visible and obtain process description. @@ -565,7 +573,12 @@ def deploy_process_make_visible_and_fetch_deployed(self, deploy_payload, expecte .. note:: This is a shortcut method for all ``test_deploy_process_<>`` cases. """ - resp = self.app.post_json("/processes", params=deploy_payload, headers=self.json_headers) + deploy_headers = copy.deepcopy(self.json_headers) + deploy_headers.update(headers or {}) + if "json" in deploy_headers.get("Content-Type", "json"): + resp = self.app.post_json("/processes", params=deploy_payload, headers=deploy_headers) + else: + resp = self.app.post("/processes", params=deploy_payload, headers=deploy_headers) assert resp.status_code == 201 assert resp.content_type == ContentType.APP_JSON @@ -580,7 +593,7 @@ def deploy_process_make_visible_and_fetch_deployed(self, deploy_payload, expecte proc_query = {"schema": ProcessSchema.OLD} resp = self.app.get(proc_url, params=proc_query, headers=self.json_headers) assert resp.status_code == 200 - self.assert_deployed_wps3(resp.json, expected_process_id) + self.assert_deployed_wps3(resp.json, expected_process_id, assert_io=assert_io) return resp.json def get_application_package(self, process_id): @@ -609,7 +622,7 @@ def test_deploy_process_CWL_DockerRequirement_auth_header_format(self): """ cwl = load_file(os.path.join(WEAVER_ROOT_DIR, "docs/examples/docker-shell-script-cat.cwl")) # type: CWL docker = "fake.repo/org/private-image:latest" - cwl["requirements"]["DockerRequirement"]["dockerPull"] = docker + cwl["requirements"][CWL_REQUIREMENT_APP_DOCKER]["dockerPull"] = docker body = self.get_process_deploy_template(cwl=cwl) headers = copy.deepcopy(self.json_headers) @@ -631,10 +644,156 @@ def test_deploy_process_CWL_DockerRequirement_auth_header_format(self): assert process.auth.token == token assert process.auth.docker == docker - # FIXME: implement - @pytest.mark.skip(reason="not implemented") + def test_deploy_process_CWL_direct_raised_missing_id(self): + # normally valid CWL, but not when submitted directly due to missing ID for the process + cwl = { + "cwlVersion": "v1.0", + "class": "CommandLineTool", + "baseCommand": ["python3", "-V"], + "inputs": {}, + "outputs": { + "output": { + "type": "File", + "outputBinding": { + "glob": "stdout.log" + }, + } + }, + } + headers = {"Content-Type": ContentType.APP_CWL_JSON, "Accept": ContentType.APP_JSON} + resp = self.app.post_json("/processes", params=cwl, headers=headers, expect_errors=True) + assert resp.status_code == 400 + assert "'Deploy.DeployCWL.id': 'Missing required field.'" in resp.json["cause"] + + def deploy_process_CWL_direct(self, content_type, graph_count=0): + p_id = "test-direct-cwl-json" + cwl_core = { + "id": p_id, + "class": "CommandLineTool", + "baseCommand": ["python3", "-V"], + "inputs": {}, + "outputs": { + "output": { + "type": "File", + "outputBinding": { + "glob": "stdout.log" + }, + } + }, + } + cwl = {} + cwl_base = {"cwlVersion": "v1.0"} + cwl.update(cwl_base) + if graph_count: + cwl["$graph"] = [cwl_core] * graph_count + else: + cwl.update(cwl_core) + if "yaml" in content_type: + cwl = yaml.safe_dump(cwl, sort_keys=False) + headers = {"Content-Type": content_type} + desc = self.deploy_process_make_visible_and_fetch_deployed(cwl, p_id, headers=headers, assert_io=False) + pkg = self.get_application_package(p_id) + assert desc["deploymentProfile"] == "http://www.opengis.net/profiles/eoc/dockerizedApplication" + + # once parsed, CWL I/O are converted to listing form + # rest should remain intact with the original definition + expect_cwl = copy.deepcopy(cwl_base) + expect_cwl.update(cwl_core) + expect_cwl["inputs"] = [] + cwl_out = cwl_core["outputs"]["output"] + cwl_out["id"] = "output" + expect_cwl["outputs"] = [cwl_out] + assert pkg == expect_cwl + + # process description should have been generated with relevant I/O + proc = desc["process"] + assert proc["id"] == p_id + assert proc["inputs"] == [] + assert proc["outputs"] == [{ + "id": "output", + "title": "output", + "schema": {"type": "string", "contentMediaType": "text/plain"}, + "formats": [{"default": True, "mediaType": "text/plain"}] + }] + + def test_deploy_process_CWL_direct_JSON(self): + self.deploy_process_CWL_direct(ContentType.APP_CWL_JSON) + + def test_deploy_process_CWL_direct_YAML(self): + self.deploy_process_CWL_direct(ContentType.APP_CWL_YAML) + + def test_deploy_process_CWL_direct_graph_JSON(self): + self.deploy_process_CWL_direct(ContentType.APP_CWL_JSON, graph_count=1) + + def test_deploy_process_CWL_direct_graph_YAML(self): + self.deploy_process_CWL_direct(ContentType.APP_CWL_YAML, graph_count=1) + + def test_deploy_process_CWL_direct_graph_multi_invalid(self): + with pytest.raises(webtest.app.AppError) as exc: + self.deploy_process_CWL_direct(ContentType.APP_CWL_JSON, graph_count=2) + error = str(exc.value) + assert "400 Bad Request" in error + assert "Invalid schema" in error + assert "Longer than maximum length 1" in error + def test_deploy_process_CWL_DockerRequirement_href(self): - raise NotImplementedError + with contextlib.ExitStack() as stack: + stack.enter_context(mocked_wps_output(self.settings)) + out_dir = self.settings["weaver.wps_output_dir"] + out_url = self.settings["weaver.wps_output_url"] + tmp_dir = stack.enter_context(tempfile.TemporaryDirectory(dir=out_dir)) + tmp_file = os.path.join(tmp_dir, "docker-python.cwl") + tmp_href = tmp_file.replace(out_dir, out_url, 1) + cwl = { + "cwlVersion": "v1.0", + "class": "CommandLineTool", + "requirements": { + CWL_REQUIREMENT_APP_DOCKER: { + "dockerPull": "python:3.7-alpine" + } + }, + "baseCommand": ["python3", "-V"], + "inputs": {}, + "outputs": { + "output": { + "type": "File", + "outputBinding": { + "glob": "stdout.log" + }, + } + }, + } + with open(tmp_file, mode="w", encoding="utf-8") as cwl_file: + json.dump(cwl, cwl_file) + + p_id = "test-docker-python-version" + body = { + "processDescription": {"process": {"id": p_id}}, + "executionUnit": [{"href": tmp_href}], + "deploymentProfileName": "http://www.opengis.net/profiles/eoc/dockerizedApplication", + } + desc = self.deploy_process_make_visible_and_fetch_deployed(body, p_id, assert_io=False) + pkg = self.get_application_package(p_id) + assert desc["deploymentProfile"] == "http://www.opengis.net/profiles/eoc/dockerizedApplication" + + # once parsed, CWL I/O are converted to listing form + # rest should remain intact with the original definition + cwl["inputs"] = [] + cwl_out = cwl["outputs"]["output"] + cwl_out["id"] = "output" + cwl["outputs"] = [cwl_out] + assert pkg == cwl + + # process description should have been generated with relevant I/O + proc = desc["process"] + assert proc["id"] == p_id + assert proc["inputs"] == [] + assert proc["outputs"] == [{ + "id": "output", + "title": "output", + "schema": {"type": "string", "contentMediaType": "text/plain"}, + "formats": [{"default": True, "mediaType": "text/plain"}] + }] # FIXME: implement @pytest.mark.skip(reason="not implemented") @@ -864,7 +1023,6 @@ def test_deploy_process_WPS1_DescribeProcess_executionUnit(self): resources.TEST_REMOTE_SERVER_URL ) - @pytest.mark.skip(reason="not implemented") @mocked_remote_server_requests_wps1([ resources.TEST_REMOTE_SERVER_URL, resources.TEST_REMOTE_PROCESS_GETCAP_WPS1_XML, @@ -875,12 +1033,14 @@ def test_deploy_process_WPS1_GetCapabilities_href(self): Test process deployment using a WPS-1 GetCapabilities URL specified as process description reference. """ body = { - "processDescription": {"href": resources.TEST_REMOTE_PROCESS_GETCAP_WPS1_URL}, # this one should be used + "processDescription": { + "id": resources.TEST_REMOTE_PROCESS_WPS1_ID, # must tell which process from GetCapabilities + "href": resources.TEST_REMOTE_PROCESS_GETCAP_WPS1_URL, # this one should be used + }, "executionUnit": [{"href": resources.TEST_REMOTE_SERVER_URL}] # some URL just to fulfill schema validation } self.deploy_process_make_visible_and_fetch_deployed(body, resources.TEST_REMOTE_PROCESS_WPS1_ID) - @pytest.mark.skip(reason="not implemented") @mocked_remote_server_requests_wps1([ resources.TEST_REMOTE_SERVER_URL, resources.TEST_REMOTE_PROCESS_GETCAP_WPS1_XML, @@ -897,7 +1057,6 @@ def test_deploy_process_WPS1_GetCapabilities_owsContext(self): body["processDescription"]["process"].update(ows_context_href(resources.TEST_REMOTE_PROCESS_GETCAP_WPS1_URL)) self.deploy_process_make_visible_and_fetch_deployed(body, resources.TEST_REMOTE_PROCESS_WPS1_ID) - @pytest.mark.skip(reason="not implemented") @mocked_remote_server_requests_wps1([ resources.TEST_REMOTE_SERVER_URL, resources.TEST_REMOTE_PROCESS_GETCAP_WPS1_XML, diff --git a/weaver/app.py b/weaver/app.py index 2166728c9..57cccd95e 100644 --- a/weaver/app.py +++ b/weaver/app.py @@ -13,8 +13,8 @@ from weaver.config import WEAVER_DEFAULT_REQUEST_OPTIONS_CONFIG, get_weaver_config_file, get_weaver_configuration from weaver.database import get_db from weaver.processes.builtin import register_builtin_processes -from weaver.processes.utils import register_wps_processes_from_config -from weaver.utils import get_settings, parse_extra_options, setup_cache, setup_loggers +from weaver.processes.utils import register_cwl_processes_from_config, register_wps_processes_from_config +from weaver.utils import parse_extra_options, setup_cache, setup_loggers from weaver.wps_restapi.patches import patch_pyramid_view_no_auto_head_get_method if TYPE_CHECKING: @@ -75,7 +75,7 @@ def main(global_config, **settings): local_config.include("weaver") LOGGER.info("Running database migration...") - db = get_db(settings) + db = get_db(local_config) db.run_migration() if settings.get("weaver.celery", False): @@ -85,7 +85,9 @@ def main(global_config, **settings): register_builtin_processes(local_config) LOGGER.info("Registering WPS-1 processes from configuration file...") - wps_processes_file = get_settings(local_config).get("weaver.wps_processes_file") - register_wps_processes_from_config(wps_processes_file, local_config) + register_wps_processes_from_config(local_config) + + LOGGER.info("Registering CWL processes from configuration directory...") + register_cwl_processes_from_config(local_config) return local_config.make_wsgi_app() diff --git a/weaver/base.py b/weaver/base.py index e007658a4..cd8921703 100644 --- a/weaver/base.py +++ b/weaver/base.py @@ -7,10 +7,12 @@ from typing import TYPE_CHECKING, NewType if TYPE_CHECKING: - from typing import Any, Callable, Dict, List, Optional, Union + from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union from weaver.typedefs import AnyKey + PropertyDataType = TypeVar("PropertyDataType") + # pylint: disable=E1120,no-value-for-parameter @@ -42,7 +44,7 @@ def __members__(cls): @classmethod def get(cls, key_or_value, default=None): - # type: (Union[AnyKey, EnumType], Optional[Any]) -> Any + # type: (Union[AnyKey, EnumType, PropertyDataType], Optional[Any]) -> PropertyDataType if isinstance(key_or_value, str): upper_key = key_or_value.upper() lower_key = key_or_value.lower() @@ -108,8 +110,8 @@ class classproperty(property): # pylint: disable=C0103,invalid-name """ def __init__(self, - fget=None, # type: Optional[Callable[[object], Any]] - fset=None, # type: Optional[Callable[[object, Any], None]] + fget=None, # type: Optional[Callable[[object], PropertyDataType]] + fset=None, # type: Optional[Callable[[object, PropertyDataType], None]] fdel=None, # type: Optional[Callable[[object], None]] doc="", # type: str ): # type: (...) -> None @@ -117,6 +119,7 @@ def __init__(self, self.__doc__ = inspect.cleandoc(doc) def __get__(self, cls, owner): # noqa + # type: (Type[object], Any) -> PropertyDataType return classmethod(self.fget).__get__(None, owner)() diff --git a/weaver/config.py b/weaver/config.py index ba704f373..0de4e4c1f 100644 --- a/weaver/config.py +++ b/weaver/config.py @@ -12,6 +12,8 @@ if TYPE_CHECKING: from typing import Optional + from pyramid.config import Configurator + from weaver.typedefs import AnySettingsContainer LOGGER = logging.getLogger(__name__) @@ -77,8 +79,8 @@ def get_weaver_config_file(file_path, default_config_file, generate_default_from """ Validates that the specified configuration file can be found, or falls back to the default one. - Handles 'relative' paths for settings in ``WEAVER_DEFAULT_INI_CONFIG`` referring to other configuration files. - Default file must be one of ``WEAVER_DEFAULT_CONFIGS``. + Handles 'relative' paths for settings in :data:`WEAVER_DEFAULT_INI_CONFIG` referring to other configuration files. + Default file must be one of :data:`WEAVER_DEFAULT_CONFIGS`. If both the specified file and the default file cannot be found, default file under ``WEAVER_DEFAULT_INI_CONFIG`` is auto-generated from the corresponding ``.example`` file if :paramref:`generate_default_from_example` is ``True``. @@ -119,4 +121,5 @@ def get_weaver_config_file(file_path, default_config_file, generate_default_from def includeme(config): # noqa: E811 + # type: (Configurator) -> None LOGGER.debug("Loading weaver configuration.") diff --git a/weaver/database/__init__.py b/weaver/database/__init__.py index 878d54ed4..8d4b8e6a6 100644 --- a/weaver/database/__init__.py +++ b/weaver/database/__init__.py @@ -6,32 +6,41 @@ from weaver.database.mongodb import MongoDatabase from weaver.utils import get_registry, get_settings -LOGGER = logging.getLogger(__name__) if TYPE_CHECKING: - from weaver.typedefs import AnySettingsContainer + from typing import Optional, Union + + from pyramid.config import Configurator + + from weaver.typedefs import AnyRegistryContainer, AnySettingsContainer + +LOGGER = logging.getLogger(__name__) -def get_db(container, reset_connection=False): - # type: (AnySettingsContainer, bool) -> MongoDatabase +def get_db(container=None, reset_connection=False): + # type: (Optional[Union[AnyRegistryContainer, AnySettingsContainer]], bool) -> MongoDatabase """ Obtains the database connection from configured application settings. If :paramref:`reset_connection` is ``True``, the :paramref:`container` must be the application :class:`Registry` or any container that can retrieve it to accomplish reference reset. Otherwise, any settings container can be provided. + + .. note:: + It is preferable to provide a registry reference to reuse any available connection whenever possible. + Giving application settings will require establishing a new connection. """ registry = get_registry(container, nothrow=True) if not reset_connection and registry and isinstance(getattr(registry, "db", None), MongoDatabase): return registry.db database = MongoDatabase(container) if reset_connection: - registry = get_registry(container) registry.db = database return database def includeme(config): + # type: (Configurator) -> None settings = get_settings(config) - if asbool(settings.get("weaver.build_docs", False)): + if asbool(settings.get("weaver.build_docs", False)): # pragma: no cover LOGGER.info("Skipping database when building docs...") return diff --git a/weaver/datatype.py b/weaver/datatype.py index fc5febd82..4d849b62e 100644 --- a/weaver/datatype.py +++ b/weaver/datatype.py @@ -2001,7 +2001,7 @@ def package(self, pkg): def payload(self): # type: () -> JSON """ - Deployment specification as JSON body. + Deployment specification as :term:`JSON`. """ body = self.get("payload", {}) return self._decode(body) if isinstance(body, dict) else body @@ -2009,7 +2009,7 @@ def payload(self): @payload.setter def payload(self, body): # type: (JSON) -> None - self["payload"] = self._decode(body) if isinstance(body, dict) else {} + self["payload"] = self._encode(body) if isinstance(body, dict) else {} # encode(->)/decode(<-) characters that cannot be in a key during save to db _character_codes = [("$", "\uFF04"), (".", "\uFF0E")] diff --git a/weaver/exceptions.py b/weaver/exceptions.py index 3be62962c..09659325c 100644 --- a/weaver/exceptions.py +++ b/weaver/exceptions.py @@ -412,6 +412,7 @@ def handle_known_exceptions(function): @functools.wraps(function) def wrapped(*_, **__): + # type: (Any, Any) -> Any try: return function(*_, **__) except (WeaverException, OWSException, HTTPException) as exc: @@ -466,6 +467,7 @@ def wrap(function): # type: (Callable[[Any, Any], Any]) -> Callable @functools.wraps(function) def call(*args, **kwargs): + # type: (Any, Any) -> Any try: # handle input arguments that are extended by various pyramid operations if is_request: diff --git a/weaver/formats.py b/weaver/formats.py index 3d915c37b..eb2fd509a 100644 --- a/weaver/formats.py +++ b/weaver/formats.py @@ -19,6 +19,7 @@ if TYPE_CHECKING: from typing import Any, Dict, List, Optional, Tuple, Union + from weaver.base import PropertyDataType from weaver.typedefs import JSON, AnyRequestType LOGGER = logging.getLogger(__name__) @@ -42,12 +43,17 @@ class ContentType(Constants): "/" [x- | "."] ["+" suffix] *[";" parameter=value] """ - APP_CWL = "application/x-cwl" + APP_CWL = "application/cwl" + APP_CWL_JSON = "application/cwl+json" + APP_CWL_YAML = "application/cwl+yaml" + APP_CWL_X = "application/x-cwl" # backward compatible format, others are official APP_FORM = "application/x-www-form-urlencoded" APP_GEOJSON = "application/geo+json" APP_GZIP = "application/gzip" APP_HDF5 = "application/x-hdf5" APP_JSON = "application/json" + APP_OGC_PKG_JSON = "application/ogcapppkg+json" + APP_OGC_PKG_YAML = "application/ogcapppkg+yaml" APP_NETCDF = "application/x-netcdf" APP_OCTET_STREAM = "application/octet-stream" APP_PDF = "application/pdf" @@ -71,6 +77,7 @@ class ContentType(Constants): VIDEO_MPEG = "video/mpeg" # special handling + ANY_CWL = {APP_CWL, APP_CWL_JSON, APP_CWL_YAML, APP_CWL_X} ANY_XML = {APP_XML, TEXT_XML} ANY = "*/*" @@ -120,8 +127,11 @@ class OutputFormat(Constants): """) @classmethod - def get(cls, format_or_version, default=JSON, allow_version=True): # pylint: disable=W0221,W0237 - # type: (Union[str, AnyOutputFormat], AnyOutputFormat, bool) -> AnyOutputFormat + def get(cls, # pylint: disable=W0221,W0237 # arguments differ/renamed + format_or_version, # type: Union[str, AnyOutputFormat, PropertyDataType] + default=JSON, # type: AnyOutputFormat + allow_version=True, # type: bool + ): # type: (...) -> Union[AnyOutputFormat, PropertyDataType] """ Resolve the applicable output format. @@ -317,13 +327,23 @@ class SchemaRole(Constants): # but prefer the IANA resolution with is the primary reference for Media-Types IANA_MAPPING = { ContentType.APP_JSON: ContentType.APP_JSON, + # CWL now has an official IANA definition: + # https://www.iana.org/assignments/media-types/application/cwl + ContentType.APP_CWL: ContentType.APP_CWL, + ContentType.APP_CWL_JSON: ContentType.APP_CWL, + ContentType.APP_CWL_YAML: ContentType.APP_CWL, + ContentType.APP_CWL_X: ContentType.APP_CWL, } EDAM_NAMESPACE = "edam" EDAM_NAMESPACE_URL = "http://edamontology.org/" EDAM_NAMESPACE_DEFINITION = {EDAM_NAMESPACE: EDAM_NAMESPACE_URL} EDAM_SCHEMA = "http://edamontology.org/EDAM_1.24.owl" EDAM_MAPPING = { + # preserve CWL EDAM definitions for backward compatibility in case they were used in deployed processes ContentType.APP_CWL: "format_3857", + ContentType.APP_CWL_JSON: "format_3857", + ContentType.APP_CWL_YAML: "format_3857", + ContentType.APP_CWL_X: "format_3857", ContentType.IMAGE_GIF: "format_3467", ContentType.IMAGE_JPEG: "format_3579", ContentType.APP_HDF5: "format_3590", diff --git a/weaver/processes/__init__.py b/weaver/processes/__init__.py index 263220ece..9b2b6f427 100644 --- a/weaver/processes/__init__.py +++ b/weaver/processes/__init__.py @@ -1,3 +1,9 @@ +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from pyramid.config import Configurator + def includeme(config): # noqa: E811 + # type: (Configurator) -> None pass diff --git a/weaver/processes/builtin/__init__.py b/weaver/processes/builtin/__init__.py index aa1f5b649..4b52066ba 100644 --- a/weaver/processes/builtin/__init__.py +++ b/weaver/processes/builtin/__init__.py @@ -25,10 +25,12 @@ from weaver.wps_restapi.utils import get_wps_restapi_base_url if TYPE_CHECKING: - from weaver.typedefs import AnySettingsContainer, CWL - from cwltool.context import RuntimeContext from typing import Any, Dict, Type, Union + from cwltool.context import RuntimeContext + + from weaver.typedefs import AnyRegistryContainer, CWL + LOGGER = logging.getLogger(__name__) @@ -103,7 +105,7 @@ def _get_builtin_package(process_id, package): def register_builtin_processes(container): - # type: (AnySettingsContainer) -> None + # type: (AnyRegistryContainer) -> None """ Registers every ``builtin`` CWL package to the processes database. diff --git a/weaver/processes/convert.py b/weaver/processes/convert.py index 0c83423c0..387787f46 100644 --- a/weaver/processes/convert.py +++ b/weaver/processes/convert.py @@ -638,8 +638,8 @@ def xml_wps2cwl(wps_process_response, settings): Converts a `WPS-1 ProcessDescription XML` tree structure to an equivalent `WPS-3 Process JSON`. and builds the associated `CWL` package in conformance to :data:`weaver.processes.wps_package.CWL_REQUIREMENT_APP_WPS1`. - :param wps_process_response: valid response (XML, 200) from a `WPS-1 ProcessDescription`. - :param settings: application settings to retrieve additional request options. + :param wps_process_response: Valid response (XML, 200) from a `WPS-1 ProcessDescription`. + :param settings: Application settings to retrieve additional request options. """ def _tag_name(_xml): # type: (Union[xml_util.XML, str]) -> str diff --git a/weaver/processes/utils.py b/weaver/processes/utils.py index 367b0949f..a3250c77a 100644 --- a/weaver/processes/utils.py +++ b/weaver/processes/utils.py @@ -1,4 +1,6 @@ import logging +import os +import pathlib import warnings from copy import deepcopy from distutils.version import LooseVersion @@ -15,11 +17,13 @@ HTTPForbidden, HTTPNotFound, HTTPOk, - HTTPUnprocessableEntity + HTTPUnprocessableEntity, + HTTPUnsupportedMediaType ) from pyramid.settings import asbool from weaver.config import ( + WEAVER_CONFIG_DIR, WEAVER_DEFAULT_WPS_PROCESSES_CONFIG, WeaverFeature, get_weaver_config_file, @@ -39,9 +43,17 @@ ServiceNotFound, log_unhandled_exceptions ) +from weaver.formats import ContentType from weaver.processes.types import ProcessType from weaver.store.base import StoreProcesses, StoreServices -from weaver.utils import fully_qualified_name, generate_diff, get_sane_name, get_settings, get_url_without_query +from weaver.utils import ( + fully_qualified_name, + generate_diff, + get_header, + get_sane_name, + get_settings, + get_url_without_query +) from weaver.visibility import Visibility from weaver.wps.utils import get_wps_client from weaver.wps_restapi import swagger_definitions as sd @@ -54,8 +66,9 @@ from pyramid.request import Request from weaver.typedefs import ( - AnyContainer, AnyHeadersContainer, + AnyRegistryContainer, + AnyRequestType, AnySettingsContainer, CWL, FileSystemPathType, @@ -130,7 +143,6 @@ def _check_deploy(payload): """ Validate minimum deploy payload field requirements with exception handling. """ - # FIXME: handle colander invalid directly in tween (https://github.com/crim-ca/weaver/issues/112) message = "Process deployment definition is invalid." try: results = sd.Deploy().deserialize(payload) @@ -194,6 +206,7 @@ def _check_deploy(payload): generate_diff(p_exec_unit, r_exec_unit, val_name="original payload", ref_name="parsed result") ) return results + # FIXME: handle colander invalid directly in tween (https://github.com/crim-ca/weaver/issues/112) except colander.Invalid as exc: LOGGER.debug("Failed deploy body schema validation:\n%s", exc) raise HTTPBadRequest(json={ @@ -244,8 +257,48 @@ def _validate_deploy_process_info(process_info, reference, package, settings, he raise HTTPUnprocessableEntity(detail=msg) -def deploy_process_from_payload(payload, container, overwrite=False): - # type: (JSON, AnyContainer, bool) -> HTTPException +def _load_payload(payload, content_type): + # type: (Union[JSON, str], ContentType) -> Union[JSON, CWL] + """ + Load the request payload with validation of expected content type. + """ + try: + content_type = sd.DeployContentType().deserialize(content_type) + if isinstance(payload, str): + payload = yaml.safe_load(payload) + if not isinstance(payload, dict): + raise TypeError("Not a valid JSON body for process deployment.") + except colander.Invalid as exc: + raise HTTPUnsupportedMediaType(json={ + "title": "Unsupported Media Type", + "type": "UnsupportedMediaType", + "detail": str(exc), + "status": HTTPUnsupportedMediaType.code, + "cause": str(content_type), + }) + except Exception as exc: + raise HTTPBadRequest(json={ + "title": "Bad Request", + "type": "BadRequest", + "detail": "Unable to parse process deployment content.", + "status": HTTPBadRequest.code, + "cause": str(exc), + }) + return payload + + +# FIXME: supported nested process and $graph multi-deployment (https://github.com/crim-ca/weaver/issues/56) +def resolve_cwl_graph(package): + # type: (CWL) -> CWL + if "$graph" in package and isinstance(package["$graph"], list) and len(package["$graph"]) == 1: + # consider package as if provided in non-graph representation + # must preserve top level fields (e.g.: 'cwlVersion') and nested graph item + package.update(package.pop("$graph")[0]) + return package + + +def deploy_process_from_payload(payload, container, overwrite=False): # pylint: disable=R1260,too-complex + # type: (Union[JSON, str], Union[AnySettingsContainer, AnyRequestType], bool) -> HTTPException """ Deploy the process after resolution of all references and validation of the parameters from payload definition. @@ -253,17 +306,23 @@ def deploy_process_from_payload(payload, container, overwrite=False): matching :class:`weaver.wps_restapi.swagger_definitions.ProcessDescription`. :param payload: JSON payload that was specified during the process deployment request. - :param container: container to retrieve application settings. - :param overwrite: whether to allow override of an existing process definition if conflict occurs. + :param container: + Container to retrieve application settings. + If it is a ``request``-like object, additional parameters may be used to identify the payload schema. + :param overwrite: Whether to allow override of an existing process definition if conflict occurs. :returns: HTTPOk if the process registration was successful. :raises HTTPException: for any invalid process deployment step. """ + headers = getattr(container, "headers", {}) # container is any request (as when called from API Deploy request) + c_type = ContentType.get(get_header("Content-Type", headers), default=ContentType.APP_OGC_PKG_JSON) + # use deepcopy of to remove any circular dependencies before writing to mongodb or any updates to the payload + payload = _load_payload(payload, c_type) payload_copy = deepcopy(payload) payload = _check_deploy(payload) # validate identifier naming for unsupported characters - process_description = payload.get("processDescription") + process_description = payload.get("processDescription", {}) # empty possible if CWL directly passed process_info = process_description.get("process", process_description) process_href = process_description.pop("href", None) @@ -286,7 +345,11 @@ def deploy_process_from_payload(payload, container, overwrite=False): package = None reference = content.get("href") found = isinstance(reference, str) - else: + elif c_type in (list(ContentType.ANY_CWL) + [ContentType.APP_JSON]) and "cwlVersion" in payload: + process_info = {} + package = resolve_cwl_graph(payload) + found = True + else: # ogc-apppkg type, but no explicit check since used by default (backward compat) if deployment_profile_name: # optional hint allowed_profile_suffix = [ProcessType.APPLICATION, ProcessType.WORKFLOW] if not any(deployment_profile_name.lower().endswith(typ) for typ in allowed_profile_suffix): @@ -311,6 +374,7 @@ def deploy_process_from_payload(payload, container, overwrite=False): "ProcessDescription.href", "ProcessDescription.owsContext.content.href", "executionUnit[*].(unit|href)", + "{ }", ] raise HTTPBadRequest( f"Missing one of required parameters {params} to obtain package/process definition or reference." @@ -324,7 +388,6 @@ def deploy_process_from_payload(payload, container, overwrite=False): # update and validate process information using WPS process offering, CWL/WPS reference or CWL package definition settings = get_settings(container) - headers = getattr(container, "headers", {}) # container is any request (as when called from API Deploy request) process_info = _validate_deploy_process_info(process_info, reference, package, settings, headers) restapi_url = get_wps_restapi_base_url(settings) @@ -419,7 +482,7 @@ def parse_wps_process_config(config_entry): def register_wps_processes_static(service_url, service_name, service_visibility, service_processes, container): - # type: (str, str, bool, List[str], AnySettingsContainer) -> None + # type: (str, str, bool, List[str], AnyRegistryContainer) -> None """ Register WPS-1 :term:`Process` under a service :term:`Provider` as static references. @@ -490,7 +553,7 @@ def register_wps_processes_static(service_url, service_name, service_visibility, def register_wps_processes_dynamic(service_name, service_url, service_visibility, container): - # type: (str, str, bool, AnySettingsContainer) -> None + # type: (str, str, bool, AnyRegistryContainer) -> None """ Register a WPS service ``provider`` such that ``processes`` under it are dynamically accessible on demand. @@ -541,25 +604,47 @@ def register_wps_processes_dynamic(service_name, service_url, service_visibility LOGGER.exception("Exception during provider registration: [%s] [%r]. Skipping...", service_name, ex) -def register_wps_processes_from_config(wps_processes_file_path, container): - # type: (Optional[FileSystemPathType], AnySettingsContainer) -> None +def register_wps_processes_from_config(container, wps_processes_file_path=None): + # type: (AnySettingsContainer, Optional[FileSystemPathType]) -> None """ - Registers remote `WPS` providers and/or processes as specified from the configuration file. + Registers remote :term:`WPS` providers and/or processes as specified from the configuration file. - Loads a `wps_processes.yml` file and registers `WPS-1` providers processes to the - current `Weaver` instance as equivalent `WPS-2` processes. + Loads a ``wps_processes.yml`` file and registers processes under `WPS-1/2`_ providers to the + current `Weaver` instance as equivalent :term:`OGC API - Processes` instances. - References listed under ``processes`` are registered. - When the reference is a service (provider), registration of each WPS process is done individually - for each of the specified providers with ID ``[service]_[process]`` per listed process by ``GetCapabilities``. + References listed under ``processes`` are registered statically (by themselves, unchanging snapshot). + References listed under ``providers``, the :term:`WPS` themselves are registered, making each :term:`Process` + listed in their ``GetCapabilities`` available. In this case, registered processes are defined dynamically, + meaning they will be fetched on the provider each time a request refers to them, keeping their definition + up-to-date with the remote server. .. versionadded:: 1.14.0 When references are specified using ``providers`` section instead of ``processes``, the registration - only saves the remote WPS provider endpoint to dynamically populate WPS processes on demand. + only saves the remote WPS provider endpoint to dynamically populate :term:`WPS` processes on demand. + Previous behavior was to register each :term:`WPS` process individually with ID ``[service]_[process]``. + + .. versionchanged:: 4.19.0 + Parameter position are inverted. + If :paramref:`wps_processes_file_path` is explicitly provided, it is used directly without considering settings. + Otherwise, automatically employ the definition in setting: ``weaver.wps_processes_file``. .. seealso:: - - `weaver.wps_processes.yml.example` for additional file format details + - `weaver.wps_processes.yml.example` for additional file format details. + + .. note:: + Settings with an explicit empty ``weaver.wps_processes_file`` entry will be considered as *nothing to load*. + If the entry is omitted, default location :data:`WEAVER_DEFAULT_WPS_PROCESSES_CONFIG` is attempted instead. + + :param container: Registry container to obtain database reference as well as application settings. + :param wps_processes_file_path: Override file path to employ instead of default settings definition. """ + if wps_processes_file_path is not None: + LOGGER.info("Using WPS-1 explicit override parameter to obtain file reference.") + else: + LOGGER.info("Using WPS-1 file reference from configuration settings.") + settings = get_settings(container) + wps_processes_file_path = settings.get("weaver.wps_processes_file") + if wps_processes_file_path is None: warnings.warn("No file specified for WPS-1 providers registration.", RuntimeWarning) wps_processes_file_path = get_weaver_config_file("", WEAVER_DEFAULT_WPS_PROCESSES_CONFIG, @@ -604,3 +689,85 @@ def register_wps_processes_from_config(wps_processes_file_path, container): msg = f"Invalid WPS-1 providers configuration file caused: [{fully_qualified_name(exc)}]({exc!s})." LOGGER.exception(msg) raise RuntimeError(msg) + + +def register_cwl_processes_from_config(container): + # type: (AnySettingsContainer) -> int + """ + Load multiple :term:`CWL` definitions from a directory to register corresponding :term:`Process`. + + .. versionadded:: 4.19.0 + + Each individual :term:`CWL` definition must fully describe a :term:`Process` by itself. Therefore, an ``id`` must + be available in the file to indicate the target deployment reference. In case of conflict, the existing database + :term:`Process` will be overridden to ensure file updates are applied. + + Files are loaded in alphabetical order. If a :term:`Workflow` needs to refer to other processes, they should be + named in way that dependencies will be resolvable prior to the registration of the :term:`Workflow` :term:`Process`. + The resolved directory to search for :term:`CWL` will be traversed recursively. + This, along with the name of :term:`CWL` files themselves, can be used to resolve order-dependent loading cases. + Only ``.cwl`` extensions are considered to avoid invalid parsing of other files that could be defined in the shared + configuration directory. + + .. note:: + Settings with an explicit empty ``weaver.cwl_processes_dir`` entry will be considered as *nothing to load*. + If the entry is omitted, default location :data:`WEAVER_CONFIG_DIR` is used to search for :term:`CWL` files. + + :param container: Registry container to obtain database reference as well as application settings. + :returns: Number of successfully registered processes from found :term:`CWL` files. + """ + from weaver.processes.wps_package import load_package_file + + settings = get_settings(container) + cwl_processes_dir = settings.get("weaver.cwl_processes_dir") + + if cwl_processes_dir is None: + warnings.warn("No configuration setting [weaver.cwl_processes_dir] specified for CWL processes registration. " + f"Will use default location: [{WEAVER_CONFIG_DIR}]", RuntimeWarning) + cwl_processes_dir = WEAVER_CONFIG_DIR + elif cwl_processes_dir == "": + warnings.warn("Configuration setting [weaver.cwl_processes_dir] for CWL processes registration " + "is explicitly defined as empty. Not loading anything.", RuntimeWarning) + return 0 + + if not os.path.isdir(cwl_processes_dir): + warnings.warn( + "Configuration setting [weaver.cwl_processes_dir] for CWL processes registration " + f"is not an existing directory: [{cwl_processes_dir}]. Not loading anything.", RuntimeWarning + ) + return 0 + cwl_processes_dir = os.path.abspath(cwl_processes_dir) + cwl_files = sorted(pathlib.Path(cwl_processes_dir).rglob("*.cwl"), + # consider directory structure to sort, then use usual alphabetical order for same level + key=lambda file: (len(str(file).split("/")), str(file))) + if not cwl_files: + warnings.warn( + f"Configuration directory [{cwl_processes_dir}] for CWL processes registration " + "does not contain any CWL file. Not loading anything.", RuntimeWarning + ) + return 0 + + register_count = 0 + register_total = len(cwl_files) + register_error = asbool(settings.get("weaver.cwl_processes_register_error", False)) + for cwl_path in cwl_files: + try: + cwl = load_package_file(str(cwl_path)) + deploy_process_from_payload(cwl, settings, overwrite=True) + register_count += 1 + except (HTTPException, PackageRegistrationError) as exc: + msg = ( + f"Failed registration of process from CWL file: [{cwl_path!s}] " + f"caused by [{fully_qualified_name(exc)}]({exc!s})." + ) + if register_error: + LOGGER.info("Requested immediate CWL registration failure with 'weaver.cwl_processes_register_error'.") + LOGGER.error(msg) + raise + warnings.warn(msg + " Skipping definition.", RuntimeWarning) + continue + if register_count and register_count == register_total: + LOGGER.info("Successfully registered %s processes from CWL files.", register_total) + elif register_count != register_total: + LOGGER.warning("Partial registration of CWL processes, only %s/%s succeeded.", register_count, register_total) + return register_count diff --git a/weaver/processes/wps_package.py b/weaver/processes/wps_package.py index 7a8b4c63c..f42d835e1 100644 --- a/weaver/processes/wps_package.py +++ b/weaver/processes/wps_package.py @@ -21,7 +21,7 @@ import time import uuid from typing import TYPE_CHECKING -from urllib.parse import urlparse +from urllib.parse import parse_qsl, urlparse import cwltool import cwltool.docker @@ -35,6 +35,7 @@ from pywps.inout.basic import SOURCE_TYPE from pywps.inout.storage.file import FileStorageBuilder from pywps.inout.storage.s3 import S3StorageBuilder +from requests.structures import CaseInsensitiveDict from weaver.config import WeaverConfiguration, WeaverFeature, get_weaver_configuration from weaver.database import get_db @@ -336,7 +337,7 @@ def _check_package_file(cwl_file_path_or_url): return cwl_path -def _load_package_file(file_path): +def load_package_file(file_path): # type: (str) -> CWL """ Loads the package in YAML/JSON format specified by the file path. @@ -526,8 +527,32 @@ def _update_package_metadata(wps_package_metadata, cwl_package_package): ) -def _generate_process_with_cwl_from_reference(reference): - # type: (str) -> Tuple[CWL, JSON] +def _patch_wps_process_description_url(reference, process_hint): + # type: (str, Optional[JSON]) -> str + """ + Rebuilds a :term:`WPS` ``ProcessDescription`` URL from other details. + + A ``GetCapabilities`` request can be submitted with an ID in query params directly. + Otherwise, check if a process hint can provide the ID. + """ + parts = reference.split("?", 1) + if len(parts) == 2: + url, query = parts + params = CaseInsensitiveDict(parse_qsl(query)) + process_id = get_any_id(params) + if not process_id: + process_id = get_any_id(process_hint or {}) + if process_id: + params["identifier"] = process_id + if process_id and params.get("request", "").lower() == "getcapabilities": + params["request"] = "DescribeProcess" + query = "&".join([f"{key}={val}" for key, val in params.items()]) + reference = url + "?" + query + return reference + + +def _generate_process_with_cwl_from_reference(reference, process_hint=None): + # type: (str, Optional[JSON]) -> Tuple[CWL, JSON] """ Resolves the ``reference`` type (`CWL`, `WPS-1`, `WPS-2`, `WPS-3`) and generates a `CWL` ``package`` from it. @@ -542,12 +567,13 @@ def _generate_process_with_cwl_from_reference(reference): reference_path, reference_ext = os.path.splitext(reference) reference_name = os.path.split(reference_path)[-1] if reference_ext.replace(".", "") in PACKAGE_EXTENSIONS: - cwl_package = _load_package_file(reference) + cwl_package = load_package_file(reference) process_info = {"identifier": reference_name} # match against WPS-1/2 reference else: settings = get_settings() + reference = _patch_wps_process_description_url(reference, process_hint) response = request_extra("GET", reference, retries=3, settings=settings) if response.status_code != HTTPOk.code: raise HTTPServiceUnavailable( @@ -582,11 +608,11 @@ def _generate_process_with_cwl_from_reference(reference): if "process" in payload or "owsContext" in payload: process_info = payload.get("process", payload) ows_ref = process_info.get("owsContext", {}).get("offering", {}).get("content", {}).get("href") - cwl_package = _load_package_file(ows_ref) + cwl_package = load_package_file(ows_ref) # if somehow the CWL was referenced without an extension, handle it here # also handle parsed WPS-3 process description also with a reference elif "cwlVersion" in payload: - cwl_package = _load_package_file(reference) + cwl_package = load_package_file(reference) process_info = {"identifier": reference_name} else: raise ValueError(f"Unknown parsing methodology of Content-Type [{content_type}] for reference resolution.") @@ -722,6 +748,18 @@ def get_auth_requirements(requirement, headers): return None +def get_process_identifier(process_info, package): + # type: (JSON, CWL) -> str + """ + Obtain a sane name identifier reference from the :term:`Process` or the :term:`Application Package`. + """ + process_id = get_any_id(process_info) + if not process_id: + process_id = package.get("id") + process_id = get_sane_name(process_id, assert_invalid=True) + return process_id + + def get_process_definition(process_offering, reference=None, package=None, data_source=None, headers=None): # type: (JSON, Optional[str], Optional[CWL], Optional[str], Optional[AnyHeadersContainer]) -> JSON """ @@ -754,14 +792,14 @@ def try_or_raise_package_error(call, reason): raise exc_type(f"Invalid package/reference definition. {reason} generated error: [{exc!s}].") if not (isinstance(package, dict) or isinstance(reference, str)): - raise PackageRegistrationError("Invalid parameters amongst one of [package, reference].") + raise PackageRegistrationError("Invalid parameters, one of [package, reference] is required.") if package and reference: raise PackageRegistrationError("Simultaneous parameters [package, reference] not allowed.") process_info = process_offering if reference: package, process_info = try_or_raise_package_error( - lambda: _generate_process_with_cwl_from_reference(reference), + lambda: _generate_process_with_cwl_from_reference(reference, process_info), reason="Loading package from reference") process_info.update(process_offering) # override upstream details if not isinstance(package, dict): @@ -798,7 +836,10 @@ def try_or_raise_package_error(call, reason): ) # obtain any retrieved process id if not already provided from upstream process offering, and clean it - process_id = get_sane_name(get_any_id(process_info), assert_invalid=False) + process_id = try_or_raise_package_error( + lambda: get_process_identifier(process_info, package), + reason="Obtaining process identifier" + ) if not process_id: raise PackageRegistrationError("Could not retrieve any process identifier.") diff --git a/weaver/quotation/__init__.py b/weaver/quotation/__init__.py index 263220ece..9b2b6f427 100644 --- a/weaver/quotation/__init__.py +++ b/weaver/quotation/__init__.py @@ -1,3 +1,9 @@ +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from pyramid.config import Configurator + def includeme(config): # noqa: E811 + # type: (Configurator) -> None pass diff --git a/weaver/tweens.py b/weaver/tweens.py index 5c2fce919..b9b3c63de 100644 --- a/weaver/tweens.py +++ b/weaver/tweens.py @@ -9,7 +9,14 @@ from weaver.utils import clean_json_text_body, fully_qualified_name if TYPE_CHECKING: - from typing import Union + from typing import Callable, Union + + from pyramid.config import Configurator + from pyramid.registry import Registry + + from weaver.typedefs import AnyViewResponse, PyramidRequest + + ViewHandler = Callable[[PyramidRequest], AnyViewResponse] LOGGER = logging.getLogger(__name__) @@ -44,6 +51,7 @@ def error_repr(http_err): def ows_response_tween(request, handler): + # type: (PyramidRequest, ViewHandler) -> AnyViewResponse """ Tween that wraps any API request with appropriate dispatch of error conversion to handle formatting. """ @@ -97,6 +105,7 @@ def ows_response_tween(request, handler): def ows_response_tween_factory_excview(handler, registry): # noqa: F811 + # type: (ViewHandler, Registry) -> ViewHandler """ Tween factory which produces a tween which transforms common exceptions into OWS specific exceptions. """ @@ -104,10 +113,13 @@ def ows_response_tween_factory_excview(handler, registry): # noqa: F811 def ows_response_tween_factory_ingress(handler, registry): # noqa: F811 + # type: (ViewHandler, Registry) -> ViewHandler """ Tween factory which produces a tween which transforms common exceptions into OWS specific exceptions. """ def handle_ows_tween(request): + # type: (PyramidRequest) -> AnyViewResponse + # because the EXCVIEW will also wrap any exception raised that should before be handled by OWS response # to allow conversions to occur, use a flag that will re-raise the result setattr(handler, OWS_TWEEN_HANDLED, True) @@ -121,6 +133,8 @@ def handle_ows_tween(request): def includeme(config): + # type: (Configurator) -> None + # using 'INGRESS' to run `weaver.wps_restapi.api` views that fix HTTP code before OWS response config.add_tween(OWS_RESPONSE_INGRESS, under=INGRESS) # using 'EXCVIEW' to run over any other 'valid' exception raised to adjust formatting and log diff --git a/weaver/typedefs.py b/weaver/typedefs.py index bf4970699..54c5d9c03 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING # pragma: no cover if TYPE_CHECKING: import os @@ -56,7 +56,7 @@ from celery.app import Celery from celery.result import AsyncResult, EagerResult, GroupResult, ResultSet from owslib.wps import BoundingBoxDataInput, ComplexDataInput, Process as ProcessOWS, WPSExecution - from pyramid.httpexceptions import HTTPSuccessful, HTTPRedirection + from pyramid.httpexceptions import HTTPException, HTTPSuccessful, HTTPRedirection from pyramid.registry import Registry from pyramid.request import Request as PyramidRequest from pyramid.response import Response as PyramidResponse @@ -173,11 +173,14 @@ "name": CWL_WorkflowStepID, "reference": str, # URL }) + _CWL = "CWL" # type: TypeAlias + CWL_Graph = List[_CWL] CWL = TypedDict("CWL", { "cwlVersion": str, "class": CWL_Class, "label": str, "doc": str, + "id": Optional[str], "s:keywords": List[str], "baseCommand": Optional[Union[str, List[str]]], "parameters": Optional[List[str]], @@ -189,7 +192,8 @@ "stderr": str, "stdout": str, "$namespaces": Dict[str, str], - "$schemas": Dict[str, str] + "$schemas": Dict[str, str], + "$graph": CWL_Graph, }, total=False) CWL_WorkflowStepPackage = TypedDict("CWL_WorkflowStepPackage", { "id": str, # reference ID of the package @@ -261,8 +265,9 @@ HeaderCookiesTuple = Union[Tuple[None, None], Tuple[HeadersBaseType, CookiesBaseType]] AnyHeadersContainer = Union[HeadersBaseType, ResponseHeaders, EnvironHeaders, CaseInsensitiveDict] AnyCookiesContainer = Union[CookiesBaseType, WPSRequest, PyramidRequest, AnyHeadersContainer] - AnyResponseType = Union[PyramidResponse, WebobResponse, RequestsResponse, TestResponse] AnyRequestType = Union[PyramidRequest, WerkzeugRequest, PreparedRequest, RequestsRequest, DummyRequest] + AnyResponseType = Union[PyramidResponse, WebobResponse, RequestsResponse, TestResponse] + AnyViewResponse = Union[PyramidResponse, WebobResponse, HTTPException] RequestMethod = Literal[ "HEAD", "GET", "POST", "PUT", "PATCH", "DELETE", "head", "get", "post", "put", "patch", "delete", diff --git a/weaver/utils.py b/weaver/utils.py index bd79d1e90..33cede80d 100644 --- a/weaver/utils.py +++ b/weaver/utils.py @@ -57,7 +57,7 @@ if TYPE_CHECKING: from types import FrameType - from typing import Any, Callable, Dict, List, Iterable, NoReturn, Optional, Type, Tuple, Union + from typing import Any, Callable, Dict, List, Iterable, MutableMapping, NoReturn, Optional, Type, Tuple, Union from weaver.execute import AnyExecuteControlOption, AnyExecuteMode from weaver.status import Status @@ -105,6 +105,7 @@ def __hash__(self): return hash(self.__str) def __str__(self): + # type: () -> str return self.__str def __repr__(self): @@ -205,7 +206,7 @@ def get_weaver_url(container): def get_any_id(info, default=None, pop=False, key=False): - # type: (JSON, Optional[str], bool, bool) -> Optional[str] + # type: (MutableMapping, Optional[str], bool, bool) -> Optional[str] """ Retrieves a dictionary `id-like` key using multiple common variations ``[id, identifier, _id]``. @@ -223,7 +224,7 @@ def get_any_id(info, default=None, pop=False, key=False): def get_any_value(info, default=None, file=True, data=True, pop=False, key=False): - # type: (JSON, Any, bool, bool, bool, bool) -> AnyValueType + # type: (MutableMapping, Any, bool, bool, bool, bool) -> AnyValueType """ Retrieves a dictionary `value-like` key using multiple common variations ``[href, value, reference, data]``. @@ -882,7 +883,7 @@ def str2bytes(string): raise TypeError(f"Cannot convert item to bytes: {type(string)!r}") if isinstance(string, bytes): return string - return string.encode() + return string.encode("UTF-8") def bytes2str(string): @@ -894,7 +895,7 @@ def bytes2str(string): raise TypeError(f"Cannot convert item to unicode: {type(string)!r}") if not isinstance(string, bytes): return string - return string.decode() + return string.decode("UTF-8") def islambda(func): diff --git a/weaver/vault/__init__.py b/weaver/vault/__init__.py index ff43b1744..29d558598 100644 --- a/weaver/vault/__init__.py +++ b/weaver/vault/__init__.py @@ -1,4 +1,5 @@ import logging +from typing import TYPE_CHECKING from pyramid.settings import asbool @@ -6,10 +7,14 @@ from weaver.vault import views as v from weaver.wps_restapi import swagger_definitions as sd +if TYPE_CHECKING: + from pyramid.config import Configurator + LOGGER = logging.getLogger(__name__) def includeme(config): + # type: (Configurator) -> None settings = get_settings(config) if asbool(settings.get("weaver.vault", True)): LOGGER.info("Adding file vault...") diff --git a/weaver/wps_restapi/examples/deploy_process_cwl.json b/weaver/wps_restapi/examples/deploy_process_cwl.json new file mode 100644 index 000000000..3f1458def --- /dev/null +++ b/weaver/wps_restapi/examples/deploy_process_cwl.json @@ -0,0 +1,19 @@ +{ + "cwlVersion": "v1.0", + "class": "CommandLineTool", + "requirements": { + "DockerRequirement": { + "dockerPull": "python:3.7-alpine" + } + }, + "baseCommand": ["python3", "-V"], + "inputs": {}, + "outputs": { + "output": { + "type": "File", + "outputBinding": { + "glob": "stdout.log" + } + } + } +} diff --git a/weaver/wps_restapi/examples/deploy_process_ogcapppkg.json b/weaver/wps_restapi/examples/deploy_process_ogcapppkg.json new file mode 100644 index 000000000..1219180df --- /dev/null +++ b/weaver/wps_restapi/examples/deploy_process_ogcapppkg.json @@ -0,0 +1,42 @@ +{ + "processDescription": { + "id": "Echo", + "title": "Example Echo", + "version": "1.0", + "description": "Example process that simply echo's back the input message for testing purposes.", + "keywords": [ + "test" + ], + "inputs": { + "message": { + "description": "Message to echo.", + "minOccurs": 1, + "maxOccurs": 1, + "schema": { + "type": "string" + } + } + }, + "outputs": { + "output": { + "description": "Output file with echo message.", + "schema": { + "type": "string", + "contentMediaType": "text/plain" + } + } + } + }, + "jobControlOptions": [ + "async-execute" + ], + "outputTransmission": [ + "reference" + ], + "executionUnit": [ + { + "href": "https://raw.githubusercontent.com/crim-ca/weaver/master/tests/functional/application-packages/Echo/echo.cwl" + } + ], + "deploymentProfileName": "http://www.opengis.net/profiles/eoc/dockerizedApplication" +} diff --git a/weaver/wps_restapi/examples/deploy_process_wps1.json b/weaver/wps_restapi/examples/deploy_process_wps1.json new file mode 100644 index 000000000..557c37739 --- /dev/null +++ b/weaver/wps_restapi/examples/deploy_process_wps1.json @@ -0,0 +1,13 @@ +{ + "processDescription": { + "process": { + "id": "remote-process-wps1" + } + }, + "executionUnit": [ + { + "href": "https://example.com?service=WPS&request=DescribeProcess&identifier=remote-process-wps1&version=1.0.0" + } + ], + "deploymentProfileName": "http://www.opengis.net/profiles/eoc/wpsApplication" +} diff --git a/weaver/wps_restapi/jobs/__init__.py b/weaver/wps_restapi/jobs/__init__.py index f76d00129..c2f5b8b83 100644 --- a/weaver/wps_restapi/jobs/__init__.py +++ b/weaver/wps_restapi/jobs/__init__.py @@ -1,13 +1,18 @@ import logging +from typing import TYPE_CHECKING from weaver.formats import OutputFormat from weaver.wps_restapi import swagger_definitions as sd from weaver.wps_restapi.jobs import jobs as j +if TYPE_CHECKING: + from pyramid.config import Configurator + LOGGER = logging.getLogger(__name__) def includeme(config): + # type: (Configurator) -> None LOGGER.info("Adding WPS REST API jobs...") settings = config.registry.settings config.add_route(**sd.service_api_route_info(sd.jobs_service, settings)) diff --git a/weaver/wps_restapi/processes/__init__.py b/weaver/wps_restapi/processes/__init__.py index d1650600c..aef31a735 100644 --- a/weaver/wps_restapi/processes/__init__.py +++ b/weaver/wps_restapi/processes/__init__.py @@ -1,9 +1,15 @@ import logging +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from pyramid.config import Configurator LOGGER = logging.getLogger(__name__) def includeme(config): + # type: (Configurator) -> None + from weaver.formats import OutputFormat from weaver.wps_restapi import swagger_definitions as sd from weaver.wps_restapi.processes import processes as p diff --git a/weaver/wps_restapi/processes/processes.py b/weaver/wps_restapi/processes/processes.py index 50036df81..521e68c00 100644 --- a/weaver/wps_restapi/processes/processes.py +++ b/weaver/wps_restapi/processes/processes.py @@ -28,7 +28,7 @@ from weaver.wps_restapi.providers.utils import get_provider_services if TYPE_CHECKING: - from weaver.typedefs import JSON + from weaver.typedefs import JSON, AnyViewResponse, PyramidRequest LOGGER = logging.getLogger(__name__) @@ -37,6 +37,7 @@ response_schemas=sd.get_processes_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_processes(request): + # type: (PyramidRequest) -> AnyViewResponse """ List registered processes (GetCapabilities). @@ -136,16 +137,18 @@ def get_processes(request): schema=sd.PostProcessesEndpoint(), response_schemas=sd.post_processes_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def add_local_process(request): + # type: (PyramidRequest) -> AnyViewResponse """ Register a local process. """ - return deploy_process_from_payload(request.json, request) + return deploy_process_from_payload(request.text, request) # use text to allow parsing as JSON or YAML @sd.process_service.get(tags=[sd.TAG_PROCESSES, sd.TAG_DESCRIBEPROCESS], renderer=OutputFormat.JSON, schema=sd.ProcessEndpoint(), response_schemas=sd.get_process_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_local_process(request): + # type: (PyramidRequest) -> AnyViewResponse """ Get a registered local process information (DescribeProcess). """ @@ -164,6 +167,7 @@ def get_local_process(request): schema=sd.ProcessPackageEndpoint(), response_schemas=sd.get_process_package_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_local_process_package(request): + # type: (PyramidRequest) -> AnyViewResponse """ Get a registered local process package definition. """ @@ -175,6 +179,7 @@ def get_local_process_package(request): schema=sd.ProcessPayloadEndpoint(), response_schemas=sd.get_process_payload_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_local_process_payload(request): + # type: (PyramidRequest) -> AnyViewResponse """ Get a registered local process payload definition. """ @@ -187,6 +192,7 @@ def get_local_process_payload(request): response_schemas=sd.get_process_visibility_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_process_visibility(request): + # type: (PyramidRequest) -> AnyViewResponse """ Get the visibility of a registered local process. """ @@ -199,6 +205,7 @@ def get_process_visibility(request): response_schemas=sd.put_process_visibility_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def set_process_visibility(request): + # type: (PyramidRequest) -> AnyViewResponse """ Set the visibility of a registered local process. """ @@ -228,6 +235,7 @@ def set_process_visibility(request): schema=sd.ProcessEndpoint(), response_schemas=sd.delete_process_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def delete_local_process(request): + # type: (PyramidRequest) -> AnyViewResponse """ Unregister a local process. """ @@ -269,6 +277,7 @@ def delete_local_process(request): schema=sd.PostProcessJobsEndpoint(), response_schemas=sd.post_process_jobs_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def submit_local_job(request): + # type: (PyramidRequest) -> AnyViewResponse """ Execute a process registered locally. diff --git a/weaver/wps_restapi/providers/__init__.py b/weaver/wps_restapi/providers/__init__.py index 2c71c1797..eb2e1bd58 100644 --- a/weaver/wps_restapi/providers/__init__.py +++ b/weaver/wps_restapi/providers/__init__.py @@ -1,13 +1,18 @@ import logging +from typing import TYPE_CHECKING from weaver.formats import OutputFormat from weaver.wps_restapi import swagger_definitions as sd from weaver.wps_restapi.providers import providers as p +if TYPE_CHECKING: + from pyramid.config import Configurator + LOGGER = logging.getLogger(__name__) def includeme(config): + # type: (Configurator) -> None LOGGER.info("Adding WPS REST API providers...") settings = config.registry.settings config.add_route(**sd.service_api_route_info(sd.providers_service, settings)) diff --git a/weaver/wps_restapi/providers/providers.py b/weaver/wps_restapi/providers/providers.py index bd7f5e0ca..c338850f3 100644 --- a/weaver/wps_restapi/providers/providers.py +++ b/weaver/wps_restapi/providers/providers.py @@ -26,9 +26,7 @@ from weaver.wps_restapi.utils import get_schema_ref, handle_schema_validation if TYPE_CHECKING: - from pyramid.request import Request - - from weaver.typedefs import AnyResponseType + from weaver.typedefs import AnyViewResponse, PyramidRequest LOGGER = logging.getLogger(__name__) @@ -38,7 +36,7 @@ @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) @check_provider_requirements def get_providers(request): - # type: (Request) -> AnyResponseType + # type: (PyramidRequest) -> AnyViewResponse """ Lists registered providers. """ @@ -60,7 +58,7 @@ def get_providers(request): @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) @check_provider_requirements def add_provider(request): - # type: (Request) -> AnyResponseType + # type: (PyramidRequest) -> AnyViewResponse """ Register a new service provider. """ @@ -121,7 +119,7 @@ def add_provider(request): @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) @check_provider_requirements def remove_provider(request): - # type: (Request) -> AnyResponseType + # type: (PyramidRequest) -> AnyViewResponse """ Remove an existing service provider. """ @@ -140,7 +138,7 @@ def remove_provider(request): @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) @check_provider_requirements def get_provider(request): - # type: (Request) -> AnyResponseType + # type: (PyramidRequest) -> AnyViewResponse """ Get a provider definition (GetCapabilities). """ @@ -154,7 +152,7 @@ def get_provider(request): @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) @check_provider_requirements def get_provider_processes(request): - # type: (Request) -> AnyResponseType + # type: (PyramidRequest) -> AnyViewResponse """ Retrieve available provider processes (GetCapabilities). """ @@ -170,7 +168,7 @@ def get_provider_processes(request): @check_provider_requirements def describe_provider_process(request): - # type: (Request) -> Process + # type: (PyramidRequest) -> Process """ Obtains a remote service process description in a compatible local process format. @@ -193,7 +191,7 @@ def describe_provider_process(request): @handle_schema_validation() @check_provider_requirements def get_provider_process(request): - # type: (Request) -> AnyResponseType + # type: (PyramidRequest) -> AnyViewResponse """ Retrieve a process description (DescribeProcess). """ @@ -212,7 +210,7 @@ def get_provider_process(request): @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) @check_provider_requirements def submit_provider_job(request): - # type: (Request) -> AnyResponseType + # type: (PyramidRequest) -> AnyViewResponse """ Execute a remote provider process. """ diff --git a/weaver/wps_restapi/quotation/__init__.py b/weaver/wps_restapi/quotation/__init__.py index d2700ea93..a8082f0be 100644 --- a/weaver/wps_restapi/quotation/__init__.py +++ b/weaver/wps_restapi/quotation/__init__.py @@ -1,13 +1,18 @@ import logging +from typing import TYPE_CHECKING from weaver.formats import OutputFormat from weaver.wps_restapi import swagger_definitions as sd from weaver.wps_restapi.quotation import bills as b, quotes as q +if TYPE_CHECKING: + from pyramid.config import Configurator + LOGGER = logging.getLogger(__name__) def includeme(config): + # type: (Configurator) -> None LOGGER.info("Adding WPS REST API quotation...") settings = config.registry.settings config.add_route(**sd.service_api_route_info(sd.process_quotes_service, settings)) diff --git a/weaver/wps_restapi/quotation/bills.py b/weaver/wps_restapi/quotation/bills.py index cb8b5e5ac..017d4a9c1 100644 --- a/weaver/wps_restapi/quotation/bills.py +++ b/weaver/wps_restapi/quotation/bills.py @@ -1,4 +1,5 @@ import logging +from typing import TYPE_CHECKING from pyramid.httpexceptions import HTTPNotFound, HTTPOk @@ -8,6 +9,9 @@ from weaver.store.base import StoreBills from weaver.wps_restapi import swagger_definitions as sd +if TYPE_CHECKING: + from weaver.typedefs import AnyViewResponse, PyramidRequest + LOGGER = logging.getLogger(__name__) @@ -15,6 +19,7 @@ schema=sd.BillsEndpoint(), response_schemas=sd.get_bill_list_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_bill_list(request): + # type: (PyramidRequest) -> AnyViewResponse """ Get list of bills IDs. """ @@ -27,6 +32,7 @@ def get_bill_list(request): schema=sd.BillEndpoint(), response_schemas=sd.get_bill_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_bill_info(request): + # type: (PyramidRequest) -> AnyViewResponse """ Get bill information. """ diff --git a/weaver/wps_restapi/quotation/quotes.py b/weaver/wps_restapi/quotation/quotes.py index 0dd8fa937..e9074d0fe 100644 --- a/weaver/wps_restapi/quotation/quotes.py +++ b/weaver/wps_restapi/quotation/quotes.py @@ -23,7 +23,7 @@ if TYPE_CHECKING: from weaver.datatype import Process - from weaver.typedefs import AnyResponseType, PyramidRequest + from weaver.typedefs import AnyViewResponse, PyramidRequest LOGGER = logging.getLogger(__name__) @@ -32,7 +32,7 @@ schema=sd.PostProcessQuoteRequestEndpoint(), response_schemas=sd.post_quotes_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def request_quote(request): - # type: (PyramidRequest) -> AnyResponseType + # type: (PyramidRequest) -> AnyViewResponse """ Request a quotation for a process. """ @@ -124,7 +124,7 @@ def request_quote(request): schema=sd.QuotesEndpoint(), response_schemas=sd.get_quote_list_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_quote_list(request): - # type: (PyramidRequest) -> AnyResponseType + # type: (PyramidRequest) -> AnyViewResponse """ Get list of quotes IDs. """ @@ -153,7 +153,7 @@ def get_quote_list(request): schema=sd.QuoteEndpoint(), response_schemas=sd.get_quote_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_quote_info(request): - # type: (PyramidRequest) -> AnyResponseType + # type: (PyramidRequest) -> AnyViewResponse """ Get quote information. """ @@ -172,7 +172,7 @@ def get_quote_info(request): schema=sd.PostQuote(), response_schemas=sd.post_quote_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def execute_quote(request): - # type: (PyramidRequest) -> AnyResponseType + # type: (PyramidRequest) -> AnyViewResponse """ Execute a quoted process. """ diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index 162afd434..e8aec8601 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -20,7 +20,7 @@ import duration import yaml -from colander import DateTime, Email, Money, OneOf, Range, Regex, drop, null, required +from colander import DateTime, Email, Length, Money, OneOf, Range, Regex, drop, null, required from dateutil import parser as date_parser from weaver import __meta__ @@ -3715,10 +3715,15 @@ class CWLInputList(ExtendedSequenceSchema): input = CWLInputItem(title="Input", description="Input specification. " + CWL_DOC_MESSAGE) +class CWLInputEmpty(EmptyMappingSchema): + pass + + class CWLInputsDefinition(OneOfKeywordSchema): _one_of = [ CWLInputList(description="Package inputs defined as items."), CWLInputMap(description="Package inputs defined as mapping."), + CWLInputEmpty(description="Package inputs as empty mapping when it takes no arguments."), ] @@ -3783,9 +3788,20 @@ class CWLVersion(Version): validator = SemanticVersion(v_prefix=True, rc_suffix=False) -class CWL(PermissiveMappingSchema): +class CWLIdentifier(ProcessIdentifier): + description = ( + "Reference to the process identifier. If CWL is provided within a process deployment payload, this can be " + "omitted. If used in a deployment with only CWL details, this information is required." + ) + + +class CWLBase(ExtendedMappingSchema): cwlVersion = CWLVersion() + + +class CWLApp(PermissiveMappingSchema): _class = CWLClass() + id = CWLIdentifier(missing=drop) # can be omitted only if within a process deployment that also includes it requirements = CWLRequirements(description="Explicit requirement to execute the application package.", missing=drop) hints = CWLHints(description="Non-failing additional hints that can help resolve extra requirements.", missing=drop) baseCommand = CWLCommand(description="Command called in the docker image or on shell according to requirements " @@ -3796,6 +3812,10 @@ class CWL(PermissiveMappingSchema): outputs = CWLOutputsDefinition(description="All outputs produced by the Application Package.") +class CWL(CWLBase, CWLApp): + _sort_first = ["cwlVersion", "id", "class"] + + class Unit(ExtendedMappingSchema): unit = CWL(description="Execution unit definition as CWL package specification. " + CWL_DOC_MESSAGE) @@ -4102,20 +4122,33 @@ class DeployProcessOffering(ProcessControl): processVersion = Version(title="processVersion", missing=drop) -class DeployProcessDescription(ProcessDeployment, ProcessControl): +class DeployProcessDescription(NotKeywordSchema, ProcessDeployment, ProcessControl): + _not = [ + Reference() # avoid conflict with deploy by href + ] schema_ref = f"{OGC_API_SCHEMA_URL}/{OGC_API_SCHEMA_VERSION}/core/openapi/schemas/process.yaml" description = "Process description fields directly provided." +class DeployReference(Reference): + id = ProcessIdentifier(missing=drop, description=( + "Optional identifier of the specific process to obtain the description from in case the reference URL " + "corresponds to an endpoint that can refer to multiple process definitions (e.g.: GetCapabilities)." + )) + + class ProcessDescriptionChoiceType(OneOfKeywordSchema): _one_of = [ - Reference(), + DeployReference(), DeployProcessOffering(), - DeployProcessDescription() + DeployProcessDescription(), ] -class Deploy(ExtendedMappingSchema): +class DeployOGCAppPackage(NotKeywordSchema, ExtendedMappingSchema): + _not = [ + CWLBase() + ] processDescription = ProcessDescriptionChoiceType() executionUnit = ExecutionUnitList() immediateDeployment = ExtendedSchemaNode(Boolean(), missing=drop, default=True) @@ -4123,13 +4156,86 @@ class Deploy(ExtendedMappingSchema): owsContext = OWSContext(missing=drop) +class CWLGraphItem(CWLApp): # no 'cwlVersion', only one at the top + id = CWLIdentifier() # required in this case + + +class CWLGraphList(ExtendedSequenceSchema): + cwl = CWLGraphItem() + + +# FIXME: supported nested and $graph multi-deployment (https://github.com/crim-ca/weaver/issues/56) +class CWLGraphBase(ExtendedMappingSchema): + graph = CWLGraphList( + name="$graph", description=( + "Graph definition that defines *exactly one* CWL application package represented as list. " + "Multiple definitions simultaneously deployed is NOT supported at the moment." + # "Graph definition that combines one or many CWL application packages within a single payload. " + # "If an single application is given (list of one item), it will be deployed as normal CWL by itself. " + # "If multiple applications are defined, the first MUST be the top-most Workflow process. " + # "Deployment of other items will be performed, and the full deployment will be persisted only if all are " + # "valid. The resulting Workflow will be registered as a package by itself (i.e: not as a graph)." + ), + validator=Length(min=1, max=1) + ) + + +class DeployCWLGraph(CWLBase, CWLGraphBase): + _sort_first = ["cwlVersion", "$graph"] + + +class DeployCWL(NotKeywordSchema, CWL): + _sort_first = ["cwlVersion", "id", "class"] + _not = [ + CWLGraphBase() + ] + id = CWLIdentifier() # required in this case + + +class Deploy(OneOfKeywordSchema): + _one_of = [ + DeployOGCAppPackage(), + DeployCWL(), + DeployCWLGraph(), + ] + + +class DeployContentType(ContentTypeHeader): + example = ContentType.APP_JSON + default = ContentType.APP_JSON + validator = OneOf([ + ContentType.APP_JSON, + ContentType.APP_CWL, + ContentType.APP_CWL_JSON, + ContentType.APP_CWL_YAML, + ContentType.APP_CWL_X, + ContentType.APP_OGC_PKG_JSON, + ContentType.APP_OGC_PKG_YAML, + ContentType.APP_YAML, + ]) + + class DeployHeaders(RequestHeaders): x_auth_docker = XAuthDockerHeader() + content_type = DeployContentType() class PostProcessesEndpoint(ExtendedMappingSchema): header = DeployHeaders(description="Headers employed for process deployment.") - body = Deploy(title="Deploy") + body = Deploy(title="Deploy", examples={ + "DeployCWL": { + "summary": "Deploy a process from a CWL definition.", + "value": EXAMPLES["deploy_process_cwl.json"], + }, + "DeployOGC": { + "summary": "Deploy a process from an OGC Application Package definition.", + "value": EXAMPLES["deploy_process_ogcapppkg.json"], + }, + "DeployWPS": { + "summary": "Deploy a process from a remote WPS-1 reference URL.", + "value": EXAMPLES["deploy_process_wps1.json"], + } + }) class WpsOutputContextHeader(ExtendedSchemaNode):