From e0b406f8228bcd9c6f67002704c042320d5dd04a Mon Sep 17 00:00:00 2001 From: Francis Charette-Migneault Date: Thu, 18 Aug 2022 23:52:57 -0400 Subject: [PATCH] =?UTF-8?q?[WIP]=C2=A0workflows=20ScatterFeatureRequiremen?= =?UTF-8?q?t=20(relates=20to=20#105)=20+=20func=20workflow=20tests=20(rela?= =?UTF-8?q?tes=20to=20#11)=20+=20OGC=20Media-Types=20format=20for=20CWL?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGES.rst | 5 ++ .../DockerNetCDF2Text/package.cwl | 3 ++ tests/functional/test_workflow.py | 47 +++++++++++++++++-- tests/wps_restapi/test_processes.py | 34 +++++++++++++- weaver/formats.py | 24 +++++++--- weaver/processes/builtin/jsonarray2netcdf.cwl | 4 +- weaver/processes/builtin/jsonarray2netcdf.py | 2 +- weaver/processes/constants.py | 2 +- 8 files changed, 105 insertions(+), 16 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 7aaeae8f4..818df2937 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -14,12 +14,17 @@ Changes: -------- - Adjust OpenAPI schema definitions for `Process` deployment to allow ``owsContext`` by itself without duplicated information that was required by mandatory ``executionUnit`` definition. +- Support `CWL` definition for ``ScatterFeatureRequirement`` + (resolves `#105 `_). +- Add `OGC` Media-Type ontology for ``File`` format references within `CWL` definition. +- Adjust ``builtin`` process ``jsonarray2netcdf`` (version ``2.0``) to employ `OGC` Media-Type for NetCDF. Fixes: ------ - Fix implementation of functional ``DockerRequirement`` test cases for `Process` deployment when references are provided by ``href`` within the ``executionUnit`` or ``owsContext`` (relates to `#11 `_). +- Fix implementation of various functional test cases for `Workflow` execution. .. _changes_4.22.0: diff --git a/tests/functional/application-packages/DockerNetCDF2Text/package.cwl b/tests/functional/application-packages/DockerNetCDF2Text/package.cwl index 6781fd9df..6c48aeb81 100644 --- a/tests/functional/application-packages/DockerNetCDF2Text/package.cwl +++ b/tests/functional/application-packages/DockerNetCDF2Text/package.cwl @@ -22,8 +22,11 @@ inputs: type: File inputBinding: position: 1 + format: ogc:netcdf outputs: output_txt: type: File outputBinding: glob: "*.txt" +$namespaces: + ogc: "http://www.opengis.net/def/media-type/ogc/1.0/" diff --git a/tests/functional/test_workflow.py b/tests/functional/test_workflow.py index e61ce93f6..11dff2e06 100644 --- a/tests/functional/test_workflow.py +++ b/tests/functional/test_workflow.py @@ -970,9 +970,9 @@ def mock_tmp_input(requests_mock): "workflow of different process types.") # FIXME: implement + re-enable 'CWL_REQUIREMENT_SCATTER' - @pytest.mark.xfail( - reason="ScatterFeatureRequirement not yet supported (https://github.com/crim-ca/weaver/issues/105)" - ) + #@pytest.mark.xfail( + # reason="ScatterFeatureRequirement not yet supported (https://github.com/crim-ca/weaver/issues/105)" + #) def test_workflow_mixed_rest_builtin_wps1_docker_scatter_requirements(self): """ Test the use of multiple applications of different :term:`Process` type in a :term:`Workflow`. @@ -981,6 +981,44 @@ def test_workflow_mixed_rest_builtin_wps1_docker_scatter_requirements(self): 1. Convert JSON array of NetCDF references to corresponding NetCDF files (process registered with ``WPS1Requirement`` using WPS-1 interface of builtin ``jsonarray2netcdf``). 2. Convert NetCDF file to raw text data dumps (using scattered applications per-file). + + .. seealso:: + Inverse :term:`WPS-1` / :term:`OGC API - Processes` process references from + :meth:`test_workflow_mixed_wps1_builtin_rest_docker_scatter_requirements`. + """ + + with contextlib.ExitStack() as stack: + tmp_host = "https://mocked-file-server.com" # must match in 'Execute_WorkflowScatterCopyNestedOutDir.json' + tmp_dir = stack.enter_context(tempfile.TemporaryDirectory()) + nc_refs = [] + for i in range(3): + nc_name = f"test-file-{i}.nc" + nc_refs.append(os.path.join(tmp_host, nc_name)) + with open(os.path.join(tmp_dir, nc_name), mode="w", encoding="utf-8") as tmp_file: + tmp_file.write(f"DUMMY NETCDF DATA #{i}") + with open(os.path.join(tmp_dir, "netcdf-array.json"), mode="w", encoding="utf-8") as tmp_file: + json.dump(nc_refs, tmp_file) # must match execution body + + def mock_tmp_input(requests_mock): + mocked_file_server(tmp_dir, tmp_host, self.settings, requests_mock=requests_mock) + + self.workflow_runner(WorkflowProcesses.WORKFLOW_WPS1_SCATTER_COPY_NETCDF, + [WorkflowProcesses.APP_DOCKER_NETCDF_2_TEXT, # required for reference by WPS below + WorkflowProcesses.APP_WPS1_DOCKER_NETCDF_2_TEXT], + log_full_trace=True, requests_mock_callback=mock_tmp_input) + + def test_workflow_mixed_wps1_builtin_rest_docker_scatter_requirements(self): + """ + Test the use of multiple applications of different :term:`Process` type in a :term:`Workflow`. + + Steps: + 1. Convert JSON array of NetCDF references to corresponding NetCDF files + (process registered with ``WPS1Requirement`` using WPS-1 interface of builtin ``jsonarray2netcdf``). + 2. Convert NetCDF file to raw text data dumps (using scattered applications per-file). + + .. seealso:: + Inverse :term:`WPS-1` / :term:`OGC API - Processes` process references from + :meth:`test_workflow_mixed_rest_builtin_wps1_docker_scatter_requirements`. """ with contextlib.ExitStack() as stack: @@ -999,7 +1037,8 @@ def mock_tmp_input(requests_mock): mocked_file_server(tmp_dir, tmp_host, self.settings, requests_mock=requests_mock) self.workflow_runner(WorkflowProcesses.WORKFLOW_REST_SCATTER_COPY_NETCDF, - [WorkflowProcesses.APP_WPS1_DOCKER_NETCDF_2_TEXT], + [WorkflowProcesses.APP_WPS1_JSON_ARRAY_2_NETCDF, # no need to register its builtin ref + WorkflowProcesses.APP_DOCKER_NETCDF_2_TEXT], log_full_trace=True, requests_mock_callback=mock_tmp_input) def test_workflow_docker_applications(self): diff --git a/tests/wps_restapi/test_processes.py b/tests/wps_restapi/test_processes.py index d703ab15b..d8aa85676 100644 --- a/tests/wps_restapi/test_processes.py +++ b/tests/wps_restapi/test_processes.py @@ -1260,9 +1260,39 @@ def test_deploy_process_WPS1_GetCapabilities_executionUnit(self): self.deploy_process_make_visible_and_fetch_deployed(body, resources.TEST_REMOTE_SERVER_WPS1_PROCESS_ID) # FIXME: implement - @pytest.mark.skip(reason="not implemented") + @pytest.mark.skip(reason="not implemented - experimental") def test_deploy_process_WPS3_DescribeProcess_href(self): - raise NotImplementedError + path = f"{self.url}/processes/jsonarray2netcdf" # use builtin, re-deploy as "remote process" + p_id = "new-test-wps3" + body = { + "processDescription": {"process": {"id": p_id}}, + "executionUnit": [{"href": path}], + } + desc = self.deploy_process_make_visible_and_fetch_deployed(body, p_id, assert_io=False) + assert desc["deploymentProfile"] == "http://www.opengis.net/profiles/eoc/ogcapiApplication" + + # process description should have been generated with relevant I/O + proc = desc["process"] + assert proc["id"] == p_id + assert proc["inputs"] == [] + assert proc["outputs"] == [{ + "id": "output", + "title": "output", + "schema": {"type": "string", "contentMediaType": "text/plain"}, + "formats": [{"default": True, "mediaType": "text/plain"}] + }] + + # package should have been generated with corresponding I/O from "remote process" + ref = self.get_application_package("jsonarray2netcdf") + pkg = self.get_application_package(p_id) + # add the missing remote reference to the local definition to compare them + ref["hints"] = { # expected to be defined in + "OGCAPIRequirement": { # FIXME: implement, aka 'Wps3Process' dispatched step + "process": "jsonarray2netcdf", + "provider": self.url + } + } + assert pkg == ref # FIXME: implement @pytest.mark.skip(reason="not implemented") diff --git a/weaver/formats.py b/weaver/formats.py index 77f211036..cded16333 100644 --- a/weaver/formats.py +++ b/weaver/formats.py @@ -64,6 +64,7 @@ class ContentType(Constants): APP_YAML = "application/x-yaml" APP_ZIP = "application/zip" IMAGE_GEOTIFF = "image/tiff; subtype=geotiff" + IMAGE_OGC_GEOTIFF = "mage/tiff; application=geotiff" IMAGE_JPEG = "image/jpeg" IMAGE_GIF = "image/gif" IMAGE_PNG = "image/png" @@ -348,23 +349,32 @@ class SchemaRole(Constants): ContentType.IMAGE_JPEG: "format_3579", ContentType.APP_HDF5: "format_3590", ContentType.APP_JSON: "format_3464", - ContentType.APP_NETCDF: "format_3650", ContentType.APP_YAML: "format_3750", ContentType.TEXT_PLAIN: "format_1964", } # Official links to be employed in definitions must be formed as: -# http://www.opengis.net/def/glossary/... +# http://www.opengis.net/def/... # But they should be redirected to full definitions as: -# https://defs.opengis.net/vocprez/object?uri=http://www.opengis.net/def/glossary/... +# https://defs.opengis.net/vocprez/object?uri=http://www.opengis.net/def/... +# See common locations: +# https://www.opengis.net/def/media-type OPENGIS_NAMESPACE = "opengis" OPENGIS_NAMESPACE_URL = "http://www.opengis.net/" OPENGIS_NAMESPACE_DEFINITION = {OPENGIS_NAMESPACE: OPENGIS_NAMESPACE_URL} -OPENGIS_MAPPING = { - ContentType.IMAGE_GEOTIFF: "def/glossary/term/Geotiff" +OPENGIS_MAPPING = {} +# shorthand notation directly scoped under OGC Media-Types to allow: 'ogc:' +OGC_NAMESPACE = "ogc" +OGC_NAMESPACE_URL = f"{OPENGIS_NAMESPACE_URL}def/media-type/ogc/1.0/" +OGC_NAMESPACE_DEFINITION = {OGC_NAMESPACE: OGC_NAMESPACE_URL} +OGC_MAPPING = { + ContentType.IMAGE_GEOTIFF: "geotiff", + ContentType.IMAGE_OGC_GEOTIFF: "geotiff", + ContentType.APP_NETCDF: "netcdf", } FORMAT_NAMESPACE_DEFINITIONS = { **IANA_NAMESPACE_DEFINITION, **EDAM_NAMESPACE_DEFINITION, + **OGC_NAMESPACE_DEFINITION, **OPENGIS_NAMESPACE_DEFINITION } FORMAT_NAMESPACES = frozenset(FORMAT_NAMESPACE_DEFINITIONS) @@ -537,6 +547,8 @@ def _search_explicit_mappings(_mime_type): return _make_if_ref(IANA_NAMESPACE_DEFINITION, IANA_NAMESPACE, IANA_MAPPING[_mime_type]) if _mime_type in EDAM_MAPPING: # prefer real reference if available return _make_if_ref(EDAM_NAMESPACE_DEFINITION, EDAM_NAMESPACE, EDAM_MAPPING[_mime_type]) + if _mime_type in OGC_MAPPING: # prefer real reference if available + return _make_if_ref(OGC_NAMESPACE_DEFINITION, OGC_NAMESPACE, OGC_MAPPING[_mime_type]) if _mime_type in OPENGIS_MAPPING: # prefer real reference if available return _make_if_ref(OPENGIS_NAMESPACE_DEFINITION, OPENGIS_NAMESPACE, OPENGIS_MAPPING[_mime_type]) return None @@ -651,7 +663,7 @@ def clean_mime_type_format(mime_type, suffix_subtype=False, strip_parameters=Fal mime_type = mime_type.replace(v + ":", "") break search = True - for _map in [EDAM_MAPPING, OPENGIS_MAPPING]: + for _map in [EDAM_MAPPING, OGC_MAPPING, OPENGIS_MAPPING]: if not search: break for v in _map.values(): diff --git a/weaver/processes/builtin/jsonarray2netcdf.cwl b/weaver/processes/builtin/jsonarray2netcdf.cwl index a4ad3308f..fb334df23 100644 --- a/weaver/processes/builtin/jsonarray2netcdf.cwl +++ b/weaver/processes/builtin/jsonarray2netcdf.cwl @@ -16,7 +16,7 @@ inputs: prefix: "-i" outputs: output: - format: edam:format_3650 + format: ogc:netcdf type: type: array items: File @@ -24,4 +24,4 @@ outputs: glob: "*.nc" $namespaces: iana: "https://www.iana.org/assignments/media-types/" - edam: "http://edamontology.org/" + ogc: "http://www.opengis.net/def/media-type/ogc/1.0/" diff --git a/weaver/processes/builtin/jsonarray2netcdf.py b/weaver/processes/builtin/jsonarray2netcdf.py index b46fffdfc..bd1c15c18 100644 --- a/weaver/processes/builtin/jsonarray2netcdf.py +++ b/weaver/processes/builtin/jsonarray2netcdf.py @@ -28,7 +28,7 @@ LOGGER.setLevel(logging.INFO) # process details -__version__ = "1.3" +__version__ = "2.0" __title__ = "JSON array to NetCDF" __abstract__ = __doc__ # NOTE: '__doc__' is fetched directly, this is mostly to be informative diff --git a/weaver/processes/constants.py b/weaver/processes/constants.py index b89a373a1..e7cbe9b25 100644 --- a/weaver/processes/constants.py +++ b/weaver/processes/constants.py @@ -81,7 +81,7 @@ class OpenSearchField(Constants): CWL_REQUIREMENT_ENV_VAR, CWL_REQUIREMENT_INIT_WORKDIR, CWL_REQUIREMENT_RESOURCE, # FIXME: perform pre-check on job submit? (https://github.com/crim-ca/weaver/issues/138) - # CWL_REQUIREMENT_SCATTER, # FIXME: see workflow test + fix https://github.com/crim-ca/weaver/issues/105 + CWL_REQUIREMENT_SCATTER, ]) """ Set of :term:`CWL` requirements that corresponds to extra functionalities not completely defining