diff --git a/tests/functional/application-packages/DockerCopyNestedOutDir/deploy.yml b/tests/functional/application-packages/DockerCopyNestedOutDir/deploy.yml index 0aa59f1a9..6f3f884c7 100644 --- a/tests/functional/application-packages/DockerCopyNestedOutDir/deploy.yml +++ b/tests/functional/application-packages/DockerCopyNestedOutDir/deploy.yml @@ -10,6 +10,7 @@ processDescription: - mimeType: text/plain default: true minOccurs: 1 + maxOccurs: "unbounded" outputs: - id: output_files formats: diff --git a/tests/functional/application-packages/DockerCopyNestedOutDir/package.cwl b/tests/functional/application-packages/DockerCopyNestedOutDir/package.cwl index e38f54517..78f3cd0f1 100644 --- a/tests/functional/application-packages/DockerCopyNestedOutDir/package.cwl +++ b/tests/functional/application-packages/DockerCopyNestedOutDir/package.cwl @@ -32,6 +32,7 @@ inputs: position: 1 outputs: output_files: + # NOTE: always one, but using array to allow chaining itself any amount of times type: type: array items: File diff --git a/tests/processes/test_wps_package.py b/tests/processes/test_wps_package.py index d2e3d737c..a83a3a799 100644 --- a/tests/processes/test_wps_package.py +++ b/tests/processes/test_wps_package.py @@ -4,8 +4,6 @@ .. seealso:: - :mod:`tests.functional.wps_package`. """ -import warnings - import contextlib import copy import io @@ -16,6 +14,7 @@ import shutil import sys import tempfile +import warnings from typing import TYPE_CHECKING import cwltool.process diff --git a/weaver/processes/convert.py b/weaver/processes/convert.py index 6298fd71e..fd5fb9ec1 100644 --- a/weaver/processes/convert.py +++ b/weaver/processes/convert.py @@ -513,6 +513,10 @@ def _convert_any2cwl_io_complex(cwl_io, cwl_ns, wps_io, io_select): """ Converts the :term:`WPS`-like I/O definition and defines them inplace into the :term:`CWL` containers. + .. seealso:: + See :meth:`weaver.processes.wps_process_base.WpsProcessInterface.stage_results` which thightly interacts + with the produced ``outputBinding.glob`` patterns generated here. Methodology should align between them. + :param cwl_io: Basic :term:`CWL` I/O container (only ID needed) where to write conversion results. :param cwl_ns: Namespaces to gradually update when encountering new format Media-Type definitions. :param wps_io: Original :term:`WPS`-like I/O to be converted. @@ -583,12 +587,21 @@ def _convert_any2cwl_io_complex(cwl_io, cwl_ns, wps_io, io_select): cwl_io_ext = [cwl_io_any] # Method 'weaver.processes.wps_process_base.WpsProcessInterface.stage_results' uses the produced glob - # pattern below of generated output definitions from WPS items that don't offer any hint about the expected - # file naming format or specification. - # Require that the file is nested in a directory named as the output ID (to isolate against conflict by - # other outputs) and has the expected extension based on the file format/schema/media-type. - # Any character can be employed for the file name within the sub-dir as generated by the remote process. - cwl_glob = [f"{cwl_id}/*{ext}" for ext in cwl_io_ext] + # pattern(s) below of generated output definitions from WPS items that don't offer any hint about the + # expected file naming format or specification (because we cannot guess what will be produced as output + # from the remote process definitions alone). We can only provide expected extension based on the file + # format/schema/media-type of the output definition. + # To avoid potential naming clashes or conflicting matching from generic patterns when CWL tries to resolve + # paths, that staging operation stage outputs and adjust each glob pattern under a directory named by the + # respective output ID. + # However, it is very important **NOT** to add the output ID directory nesting approach here, otherwise it + # will confuse the staging process between Workflow steps, since it won't be able to distinguish whether the + # nesting was already applied by Weaver (here), or provided by an user-provided CWL Application Package, since + # WPS-based. OGC-based, CWL-based, (or any future implementation) can be combined within a same Workflow. + cwl_glob = [ + f"*{ext}" if ext != "/" else "./" # handle special case of "extension" for 'Directory' type + for ext in cwl_io_ext + ] cwl_io["outputBinding"] = { "glob": cwl_glob[0] if len(cwl_glob) == 1 else cwl_glob } diff --git a/weaver/processes/wps1_process.py b/weaver/processes/wps1_process.py index 26c7fc928..49562b1a0 100644 --- a/weaver/processes/wps1_process.py +++ b/weaver/processes/wps1_process.py @@ -68,7 +68,6 @@ def __init__(self, _message, _progress, _status, self.provider, *args, **kwargs ) ) - self.stage_output_id_nested = True # set after __init__ to avoid reset by base class def format_inputs(self, workflow_inputs): # type: (CWL_RuntimeInputList) -> OWS_InputDataValues diff --git a/weaver/processes/wps_process_base.py b/weaver/processes/wps_process_base.py index 921128081..addcc5606 100644 --- a/weaver/processes/wps_process_base.py +++ b/weaver/processes/wps_process_base.py @@ -96,7 +96,7 @@ def __init__(self, request, update_status): self.settings = get_settings() self.update_status = update_status # type: UpdateStatusPartialFunction self.temp_staging = set() - self.stage_output_id_nested = False + self.stage_output_id_nested = True def execute(self, workflow_inputs, out_dir, expected_outputs): # type: (CWL_RuntimeInputsMap, str, CWL_ExpectedOutputs) -> None @@ -329,8 +329,8 @@ def stage_results(self, results, expected_outputs, out_dir): We cannot rely on specific file names to be mapped, since glob can match many (eg: ``"*.txt"``). .. seealso:: - Function :func:`weaver.processes.convert.any2cwl_io` defines a generic glob pattern using the output ID - and expected file extension based on Content-Type format. Since the remote :term:`WPS` :term:`Process` + Function :func:`weaver.processes.convert._convert_any2cwl_io_complex` defines a generic glob pattern from + the expected file extension based on Content-Type format. Since the remote :term:`WPS` :term:`Process` doesn't necessarily produce file names with the output ID as expected to find them (could be anything), staging must patch locations to let :term:`CWL` runtime resolve the files according to glob definitions. diff --git a/weaver/processes/wps_workflow.py b/weaver/processes/wps_workflow.py index bd893bef9..415707766 100644 --- a/weaver/processes/wps_workflow.py +++ b/weaver/processes/wps_workflow.py @@ -1,5 +1,6 @@ import collections.abc import logging +import os import tempfile from functools import partial from typing import TYPE_CHECKING, cast # these are actually used in the code @@ -186,11 +187,18 @@ def collect_output( ignoring any nested dirs where the modified *outputBindings* definition will be able to match as if each step :term:`Process` outputs were generated locally. """ - # if "outputBinding" in schema and "glob" in schema["outputBinding"]: - # # in case of Directory collection with '/', use '.' because cwltool replaces it by the outdir - # glob = schema["outputBinding"]["glob"] - # glob = os.path.split(glob)[-1] or "." - # schema["outputBinding"]["glob"] = glob + if "outputBinding" in schema and "glob" in schema["outputBinding"]: + glob = schema["outputBinding"]["glob"] + glob_list = isinstance(glob, list) + glob = glob if isinstance(glob, list) else [glob] + out_id = schema["id"].rsplit("#", 1)[-1] + glob_spec = [] + for glob_item in glob: + if glob_item.startswith(outdir): + # if equal -> '.', which is identical to what CWL '/.' expects for a dir entry + glob_item = os.path.relpath(glob_item, outdir) + glob_spec.append(os.path.join(out_id, glob_item)) + schema["outputBinding"]["glob"] = glob_spec if glob_list else glob_spec[0] output = super(WpsWorkflow, self).collect_output( schema, builder, @@ -198,7 +206,7 @@ def collect_output( fs_access, compute_checksum=compute_checksum, ) - return output or {} + return output class WpsWorkflowJob(CommandLineJob):