Skip to content

Commit

Permalink
better handling of WPS-REST JSON multiple-output file (relates to #25)
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed May 11, 2020
1 parent 39a87cf commit 4837745
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 20 deletions.
15 changes: 15 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@ Changes
`Unreleased <https://github.com/crim-ca/weaver/tree/master>`_ (latest)
========================================================================

Changes:
--------

- Improve handling of `WPS-REST` HREF output matching a JSON file with array of URL references corresponding to process
pseudo multiple-output format (relates to `#25 <https://github.com/crim-ca/weaver/issues/25>`_). The output is now
expanded to contain both the original JSON file reference and the extended contained URL references so that either
variation can be retrieved as result.

Fixes:
------

- Fix handling of WPS-REST output matching a JSON file for multiple-output format specified with a relative local path
as specified by job output location. Only remote HTTP references where correctly parsed. Also avoid failing the job if
the reference JSON parsing fails. It will simply return the original reference URL in this case without expanded data.

`1.6.0 <https://github.com/crim-ca/weaver/tree/1.6.0>`_ (2020-05-07)
========================================================================

Expand Down
57 changes: 38 additions & 19 deletions weaver/processes/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
import os
import warnings
from copy import deepcopy
from distutils.version import LooseVersion
Expand Down Expand Up @@ -44,6 +45,7 @@
from weaver.processes.types import PROCESS_APPLICATION, PROCESS_WORKFLOW
from weaver.store.base import StoreProcesses
from weaver.utils import get_sane_name, get_settings, get_url_without_query
from weaver.wps import get_wps_output_dir
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.utils import get_wps_restapi_base_url

Expand Down Expand Up @@ -84,29 +86,42 @@ def _read_reference(url):
return None


def _get_multi_json_references(output):
# type: (owslib.wps.Output) -> Optional[List[JSON]]
def _get_multi_json_references(output, container):
# type: (owslib.wps.Output, Optional[AnySettingsContainer]) -> Optional[List[JSON]]
"""
Since WPS standard does not allow to return multiple values for a single output,
a lot of process actually return a json array containing references to these outputs.
This function goal is to detect this particular format.
:return: An array of HTTP(S) references if the specified output is effectively a JSON containing that,
``None`` otherwise.
"""
Because the multi-output references are contained within this JSON file, it is not very convenient to retrieve
the list of URLs as one always needs to open and read the file to get them. This function goal is to detect this
particular format and expand the references to make them quickly available in the job output response.
# Check for the json datatype and mimetype
:return:
Array of HTTP(S) references if the specified output is effectively a JSON containing that, ``None`` otherwise.
"""
# Check for the json datatype and mime-type
if output.dataType == WPS_COMPLEX_DATA and output.mimeType == CONTENT_TYPE_APP_JSON:
try:
# If the json data is referenced read it's content
if output.reference:
out_ref = output.reference
if container:
if out_ref.startswith("file://"):
out_ref = out_ref[7:]
if out_ref.startswith("/"):
wps_out_dir = get_wps_output_dir(container)
out_ref = os.path.join(wps_out_dir, out_ref)
if not os.path.isfile(out_ref):
out_ref = output.reference
json_data_str = _read_reference(out_ref)
# Else get the data directly
else:
json_data_str = _get_data(output)

# If the json data is referenced read it's content
if output.reference:
json_data_str = _read_reference(output.reference)
# Else get the data directly
else:
json_data_str = _get_data(output)

# Load the actual json dict
json_data = json.loads(json_data_str)
# Load the actual json dict
json_data = json.loads(json_data_str)
except Exception:
return None

if isinstance(json_data, list):
for data_value in json_data:
Expand All @@ -122,10 +137,14 @@ def map_progress(progress, range_min, range_max):
return max(range_min, min(range_max, range_min + (progress * (range_max - range_min)) / 100))


def jsonify_output(output, process_description):
# type: (owslib.wps.Output, owslib.wps.Process) -> JSON
def jsonify_output(output, process_description, container=None):
# type: (owslib.wps.Output, owslib.wps.Process, Optional[AnySettingsContainer]) -> JSON
"""
Utility method to jsonify an output element from a WPS1 process description.
In the case that a reference JSON output is specified and that it refers to a file that contains an array list of
URL references to simulate a multiple-output, this specific output gets expanded to contain both the original
URL ``reference`` field and the loaded URL list under ``data`` field for easier access from the response body.
"""

if not output.dataType:
Expand All @@ -144,7 +163,7 @@ def jsonify_output(output, process_description):

# Handle special case where we have a reference to a json array containing dataset reference
# Avoid reference to reference by fetching directly the dataset references
json_array = _get_multi_json_references(output)
json_array = _get_multi_json_references(output, container)
if json_array and all(str(ref).startswith("http") for ref in json_array):
json_output["data"] = json_array
else:
Expand Down
3 changes: 2 additions & 1 deletion weaver/wps_restapi/processes/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ def execute_process(self, job_id, url, headers=None, notification_email=None):
job.status_message = "Job succeeded{}.".format(msg_progress)
wps_package.retrieve_package_job_log(execution, job)
job.save_log(logger=task_logger)
job_results = [jsonify_output(output, process) for output in execution.processOutputs]
job_results = [jsonify_output(output, process, settings)
for output in execution.processOutputs]
job.results = make_results_relative(job_results, settings)
else:
task_logger.debug("Job failed.")
Expand Down

0 comments on commit 4837745

Please sign in to comment.