Skip to content

Commit

Permalink
[WIP] workflows ScatterFeatureRequirement (relates to #105) + func wo…
Browse files Browse the repository at this point in the history
…rkflow tests (relates to #11) + OGC Media-Types format for CWL
  • Loading branch information
fmigneault committed Sep 12, 2022
1 parent 07110ff commit 52720e2
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 18 deletions.
7 changes: 5 additions & 2 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ Changes

Changes:
--------
- No change.
- Support `CWL` definition for ``ScatterFeatureRequirement``
(resolves `#105 <https://github.com/crim-ca/weaver/issues/105>`_).
- Add `OGC` Media-Type ontology for ``File`` format references within `CWL` definition.
- Adjust ``builtin`` process ``jsonarray2netcdf`` (version ``2.0``) to employ `OGC` Media-Type for NetCDF.

Fixes:
------
- No change.
- Fix implementation of various functional test cases for `Workflow` execution.

.. _changes_4.23.0:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ inputs:
type: File
inputBinding:
position: 1
format: ogc:netcdf
outputs:
output_txt:
type: File
outputBinding:
glob: "*.txt"
$namespaces:
ogc: "http://www.opengis.net/def/media-type/ogc/1.0/"
47 changes: 43 additions & 4 deletions tests/functional/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -970,9 +970,9 @@ def mock_tmp_input(requests_mock):
"workflow of different process types.")

# FIXME: implement + re-enable 'CWL_REQUIREMENT_SCATTER'
@pytest.mark.xfail(
reason="ScatterFeatureRequirement not yet supported (https://github.com/crim-ca/weaver/issues/105)"
)
#@pytest.mark.xfail(
# reason="ScatterFeatureRequirement not yet supported (https://github.com/crim-ca/weaver/issues/105)"
#)
def test_workflow_mixed_rest_builtin_wps1_docker_scatter_requirements(self):
"""
Test the use of multiple applications of different :term:`Process` type in a :term:`Workflow`.
Expand All @@ -981,6 +981,44 @@ def test_workflow_mixed_rest_builtin_wps1_docker_scatter_requirements(self):
1. Convert JSON array of NetCDF references to corresponding NetCDF files
(process registered with ``WPS1Requirement`` using WPS-1 interface of builtin ``jsonarray2netcdf``).
2. Convert NetCDF file to raw text data dumps (using scattered applications per-file).
.. seealso::
Inverse :term:`WPS-1` / :term:`OGC API - Processes` process references from
:meth:`test_workflow_mixed_wps1_builtin_rest_docker_scatter_requirements`.
"""

with contextlib.ExitStack() as stack:
tmp_host = "https://mocked-file-server.com" # must match in 'Execute_WorkflowScatterCopyNestedOutDir.json'
tmp_dir = stack.enter_context(tempfile.TemporaryDirectory())
nc_refs = []
for i in range(3):
nc_name = f"test-file-{i}.nc"
nc_refs.append(os.path.join(tmp_host, nc_name))
with open(os.path.join(tmp_dir, nc_name), mode="w", encoding="utf-8") as tmp_file:
tmp_file.write(f"DUMMY NETCDF DATA #{i}")
with open(os.path.join(tmp_dir, "netcdf-array.json"), mode="w", encoding="utf-8") as tmp_file:
json.dump(nc_refs, tmp_file) # must match execution body

def mock_tmp_input(requests_mock):
mocked_file_server(tmp_dir, tmp_host, self.settings, requests_mock=requests_mock)

self.workflow_runner(WorkflowProcesses.WORKFLOW_WPS1_SCATTER_COPY_NETCDF,
[WorkflowProcesses.APP_DOCKER_NETCDF_2_TEXT, # required for reference by WPS below
WorkflowProcesses.APP_WPS1_DOCKER_NETCDF_2_TEXT],
log_full_trace=True, requests_mock_callback=mock_tmp_input)

def test_workflow_mixed_wps1_builtin_rest_docker_scatter_requirements(self):
"""
Test the use of multiple applications of different :term:`Process` type in a :term:`Workflow`.
Steps:
1. Convert JSON array of NetCDF references to corresponding NetCDF files
(process registered with ``WPS1Requirement`` using WPS-1 interface of builtin ``jsonarray2netcdf``).
2. Convert NetCDF file to raw text data dumps (using scattered applications per-file).
.. seealso::
Inverse :term:`WPS-1` / :term:`OGC API - Processes` process references from
:meth:`test_workflow_mixed_rest_builtin_wps1_docker_scatter_requirements`.
"""

with contextlib.ExitStack() as stack:
Expand All @@ -999,7 +1037,8 @@ def mock_tmp_input(requests_mock):
mocked_file_server(tmp_dir, tmp_host, self.settings, requests_mock=requests_mock)

self.workflow_runner(WorkflowProcesses.WORKFLOW_REST_SCATTER_COPY_NETCDF,
[WorkflowProcesses.APP_WPS1_DOCKER_NETCDF_2_TEXT],
[WorkflowProcesses.APP_WPS1_JSON_ARRAY_2_NETCDF, # no need to register its builtin ref
WorkflowProcesses.APP_DOCKER_NETCDF_2_TEXT],
log_full_trace=True, requests_mock_callback=mock_tmp_input)

def test_workflow_docker_applications(self):
Expand Down
34 changes: 32 additions & 2 deletions tests/wps_restapi/test_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1260,9 +1260,39 @@ def test_deploy_process_WPS1_GetCapabilities_executionUnit(self):
self.deploy_process_make_visible_and_fetch_deployed(body, resources.TEST_REMOTE_SERVER_WPS1_PROCESS_ID)

# FIXME: implement
@pytest.mark.skip(reason="not implemented")
@pytest.mark.skip(reason="not implemented - experimental")
def test_deploy_process_WPS3_DescribeProcess_href(self):
raise NotImplementedError
path = f"{self.url}/processes/jsonarray2netcdf" # use builtin, re-deploy as "remote process"
p_id = "new-test-wps3"
body = {
"processDescription": {"process": {"id": p_id}},
"executionUnit": [{"href": path}],
}
desc = self.deploy_process_make_visible_and_fetch_deployed(body, p_id, assert_io=False)
assert desc["deploymentProfile"] == "http://www.opengis.net/profiles/eoc/ogcapiApplication"

# process description should have been generated with relevant I/O
proc = desc["process"]
assert proc["id"] == p_id
assert proc["inputs"] == []
assert proc["outputs"] == [{
"id": "output",
"title": "output",
"schema": {"type": "string", "contentMediaType": "text/plain"},
"formats": [{"default": True, "mediaType": "text/plain"}]
}]

# package should have been generated with corresponding I/O from "remote process"
ref = self.get_application_package("jsonarray2netcdf")
pkg = self.get_application_package(p_id)
# add the missing remote reference to the local definition to compare them
ref["hints"] = { # expected to be defined in
"OGCAPIRequirement": { # FIXME: implement, aka 'Wps3Process' dispatched step
"process": "jsonarray2netcdf",
"provider": self.url
}
}
assert pkg == ref

# FIXME: implement
@pytest.mark.skip(reason="not implemented")
Expand Down
24 changes: 18 additions & 6 deletions weaver/formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class ContentType(Constants):
APP_YAML = "application/x-yaml"
APP_ZIP = "application/zip"
IMAGE_GEOTIFF = "image/tiff; subtype=geotiff"
IMAGE_OGC_GEOTIFF = "mage/tiff; application=geotiff"
IMAGE_JPEG = "image/jpeg"
IMAGE_GIF = "image/gif"
IMAGE_PNG = "image/png"
Expand Down Expand Up @@ -348,23 +349,32 @@ class SchemaRole(Constants):
ContentType.IMAGE_JPEG: "format_3579",
ContentType.APP_HDF5: "format_3590",
ContentType.APP_JSON: "format_3464",
ContentType.APP_NETCDF: "format_3650",
ContentType.APP_YAML: "format_3750",
ContentType.TEXT_PLAIN: "format_1964",
}
# Official links to be employed in definitions must be formed as:
# http://www.opengis.net/def/glossary/...
# http://www.opengis.net/def/...
# But they should be redirected to full definitions as:
# https://defs.opengis.net/vocprez/object?uri=http://www.opengis.net/def/glossary/...
# https://defs.opengis.net/vocprez/object?uri=http://www.opengis.net/def/...
# See common locations:
# https://www.opengis.net/def/media-type
OPENGIS_NAMESPACE = "opengis"
OPENGIS_NAMESPACE_URL = "http://www.opengis.net/"
OPENGIS_NAMESPACE_DEFINITION = {OPENGIS_NAMESPACE: OPENGIS_NAMESPACE_URL}
OPENGIS_MAPPING = {
ContentType.IMAGE_GEOTIFF: "def/glossary/term/Geotiff"
OPENGIS_MAPPING = {}
# shorthand notation directly scoped under OGC Media-Types to allow: 'ogc:<media-type-id>'
OGC_NAMESPACE = "ogc"
OGC_NAMESPACE_URL = f"{OPENGIS_NAMESPACE_URL}def/media-type/ogc/1.0/"
OGC_NAMESPACE_DEFINITION = {OGC_NAMESPACE: OGC_NAMESPACE_URL}
OGC_MAPPING = {
ContentType.IMAGE_GEOTIFF: "geotiff",
ContentType.IMAGE_OGC_GEOTIFF: "geotiff",
ContentType.APP_NETCDF: "netcdf",
}
FORMAT_NAMESPACE_DEFINITIONS = {
**IANA_NAMESPACE_DEFINITION,
**EDAM_NAMESPACE_DEFINITION,
**OGC_NAMESPACE_DEFINITION,
**OPENGIS_NAMESPACE_DEFINITION
}
FORMAT_NAMESPACES = frozenset(FORMAT_NAMESPACE_DEFINITIONS)
Expand Down Expand Up @@ -537,6 +547,8 @@ def _search_explicit_mappings(_mime_type):
return _make_if_ref(IANA_NAMESPACE_DEFINITION, IANA_NAMESPACE, IANA_MAPPING[_mime_type])
if _mime_type in EDAM_MAPPING: # prefer real reference if available
return _make_if_ref(EDAM_NAMESPACE_DEFINITION, EDAM_NAMESPACE, EDAM_MAPPING[_mime_type])
if _mime_type in OGC_MAPPING: # prefer real reference if available
return _make_if_ref(OGC_NAMESPACE_DEFINITION, OGC_NAMESPACE, OGC_MAPPING[_mime_type])
if _mime_type in OPENGIS_MAPPING: # prefer real reference if available
return _make_if_ref(OPENGIS_NAMESPACE_DEFINITION, OPENGIS_NAMESPACE, OPENGIS_MAPPING[_mime_type])
return None
Expand Down Expand Up @@ -651,7 +663,7 @@ def clean_mime_type_format(mime_type, suffix_subtype=False, strip_parameters=Fal
mime_type = mime_type.replace(v + ":", "")
break
search = True
for _map in [EDAM_MAPPING, OPENGIS_MAPPING]:
for _map in [EDAM_MAPPING, OGC_MAPPING, OPENGIS_MAPPING]:
if not search:
break
for v in _map.values():
Expand Down
4 changes: 2 additions & 2 deletions weaver/processes/builtin/jsonarray2netcdf.cwl
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ inputs:
prefix: "-i"
outputs:
output:
format: edam:format_3650
format: ogc:netcdf
type:
type: array
items: File
outputBinding:
glob: "*.nc"
$namespaces:
iana: "https://www.iana.org/assignments/media-types/"
edam: "http://edamontology.org/"
ogc: "http://www.opengis.net/def/media-type/ogc/1.0/"
2 changes: 1 addition & 1 deletion weaver/processes/builtin/jsonarray2netcdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
LOGGER.setLevel(logging.INFO)

# process details
__version__ = "1.3"
__version__ = "2.0"
__title__ = "JSON array to NetCDF"
__abstract__ = __doc__ # NOTE: '__doc__' is fetched directly, this is mostly to be informative

Expand Down
2 changes: 1 addition & 1 deletion weaver/processes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class OpenSearchField(Constants):
CWL_REQUIREMENT_ENV_VAR,
CWL_REQUIREMENT_INIT_WORKDIR,
CWL_REQUIREMENT_RESOURCE, # FIXME: perform pre-check on job submit? (https://github.com/crim-ca/weaver/issues/138)
# CWL_REQUIREMENT_SCATTER, # FIXME: see workflow test + fix https://github.com/crim-ca/weaver/issues/105
CWL_REQUIREMENT_SCATTER,
])
"""
Set of :term:`CWL` requirements that corresponds to extra functionalities not completely defining
Expand Down

0 comments on commit 52720e2

Please sign in to comment.