Skip to content

Commit

Permalink
[wip] setup multipart function
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Sep 20, 2024
1 parent 46cccee commit 8624a90
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 22 deletions.
6 changes: 2 additions & 4 deletions weaver/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ def type(self):
return "provider"

def _get_inputs(self):
# type: () -> Optional[ExecutionInputs]
# type: () -> ExecutionInputs
if self.get("inputs") is None:
return {}
return dict.__getitem__(self, "inputs")
Expand All @@ -912,9 +912,7 @@ def _set_inputs(self, inputs):

def _get_outputs(self):
# type: () -> Optional[ExecutionOutputs]
if self.get("outputs") is None:
return {}
return dict.__getitem__(self, "outputs")
return self.get("outputs")

def _set_outputs(self, outputs):
# type: (Optional[ExecutionOutputs]) -> None
Expand Down
1 change: 1 addition & 0 deletions weaver/formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class ContentType(Constants):
}
ANY_CWL = {APP_CWL, APP_CWL_JSON, APP_CWL_YAML, APP_CWL_X}
ANY_XML = {APP_XML, TEXT_XML}
ANY_MULTIPART = {MULTIPART_ANY, MULTIPART_FORM, MULTIPART_MIXED, MULTIPART_RELATED}
ANY = "*/*"


Expand Down
11 changes: 7 additions & 4 deletions weaver/processes/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
ExecutionOutputsList,
ExecutionOutputsMap,
JobValueFile,
JobValueItem,
JSON,
OpenAPISchema,
OpenAPISchemaArray,
Expand Down Expand Up @@ -1865,7 +1866,7 @@ def convert_input_values_schema(inputs, schema):
raise ValueError(f"Unknown conversion method to schema [{schema}] for inputs of type [{name}]: {inputs}")
if schema == JobInputsOutputsSchema.OGC:
input_dict = {}
for input_item in inputs:
for input_item in inputs: # type: JobValueItem
input_id = get_any_id(input_item, pop=True)
input_val = get_any_value(input_item)
input_key = get_any_value(input_item, key=True, data=True, file=False)
Expand Down Expand Up @@ -1909,18 +1910,18 @@ def convert_input_values_schema(inputs, schema):

@overload
def convert_output_params_schema(inputs, schema):
# type: (ExecutionOutputs, JobInputsOutputsSchema.OGC) -> ExecutionOutputsMap
# type: (Optional[ExecutionOutputs], JobInputsOutputsSchema.OGC) -> Optional[ExecutionOutputsMap]
...


@overload
def convert_output_params_schema(inputs, schema):
# type: (ExecutionOutputs, JobInputsOutputsSchema.OLD) -> ExecutionOutputsList
# type: (Optional[ExecutionOutputs], JobInputsOutputsSchema.OLD) -> Optional[ExecutionOutputsList]
...


def convert_output_params_schema(outputs, schema):
# type: (ExecutionOutputs, JobInputsOutputsSchemaType) -> ExecutionOutputs
# type: (Optional[ExecutionOutputs], JobInputsOutputsSchemaType) -> Optional[ExecutionOutputs]
"""
Convert execution output parameters between equivalent formats.
Expand All @@ -1936,6 +1937,8 @@ def convert_output_params_schema(outputs, schema):
:param schema: Desired schema.
:return: Converted outputs.
"""
if outputs is None:
return outputs
if isinstance(schema, str):
schema = schema.lower().split("+")[0]
if (
Expand Down
70 changes: 56 additions & 14 deletions weaver/wps_restapi/jobs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import os
import shutil
from copy import deepcopy
from email.message import MIMEPart
from email.mime.multipart import MIMEMultipart
from typing import TYPE_CHECKING, cast

import colander
Expand All @@ -13,11 +15,11 @@
HTTPInternalServerError,
HTTPNoContent,
HTTPNotFound,
HTTPNotImplemented,
HTTPOk
)
from pyramid.response import FileResponse
from pyramid_celery import celery_app
from requests.structures import CaseInsensitiveDict

from weaver.database import get_db
from weaver.datatype import Job, Process
Expand Down Expand Up @@ -352,6 +354,15 @@ def get_results( # pylint: disable=R1260
out_ref = convert_output_params_schema(job.outputs, JobInputsOutputsSchema.OGC) if link_references else {}
references = {}
for result in job.results:
# Filter outputs not requested, unless 'all' requested by omitting
out_id = get_any_id(result)
if (
(isinstance(job.outputs, dict) and out_id not in job.outputs) or
(isinstance(job.outputs, list) and not any(get_any_id(out) == out_id for out in job.outputs))
):
LOGGER.debug("Removing [%s] from %s results response because not requested.", out_id, job)
continue

# Complex result could contain both 'data' and a reference (eg: JSON file and its direct representation).
# Literal result is only by itself. Therefore, find applicable field by non 'data' match.
rtype = "href" if get_any_value(result, key=True, file=True, data=False) else "data"
Expand All @@ -375,7 +386,6 @@ def get_results( # pylint: disable=R1260
rtype = "href" if get_any_value(val_item, key=True, file=True, data=False) else "data"
val_data = get_any_value(val_item, file=True, data=False)
out_key = rtype
out_id = get_any_id(result)
out_mode = out_ref.get(out_id, {}).get("transmissionMode")
as_ref = link_references and out_mode == ExecuteTransmissionMode.REFERENCE
if rtype == "href" and isinstance(val_data, str):
Expand Down Expand Up @@ -516,9 +526,11 @@ def get_job_results_response(
results, refs = get_results(job, container, value_key="value",
schema=JobInputsOutputsSchema.OGC, # not strict to provide more format details
link_references=is_raw)
headers = headers or {}
if "location" not in headers:
headers["Location"] = job.status_url(container)

headers = CaseInsensitiveDict(headers or {})
if "Location" in headers:
headers.setdefault("Content-Location", headers.pop("Location"))
headers.setdefault("Content-Location", job.status_url(container))

if not is_raw:
try:
Expand Down Expand Up @@ -561,15 +573,12 @@ def get_job_results_response(
out_type = get_any_value(out_info, key=True)
out_data = get_any_value(out_info)

# FIXME: https://github.com/crim-ca/weaver/issues/376
# implement multipart, both for multi-output IDs and array-output under same ID
if len(results) > 1 or (isinstance(out_data, list) and len(out_data) > 1):
# https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-raw-value-multi
raise HTTPNotImplemented(json={
"code": "NotImplemented",
"type": "NotImplemented",
"detail": "Multipart results with 'transmissionMode=value' and 'response=raw' not implemented.",
})
if (
len(results) > 1 or
(isinstance(out_data, list) and len(out_data) > 1) or
(isinstance(job.accept_type, str) and any(ctype in job.accept_type for ctype in ContentType.ANY_MULTIPART))
):
return get_job_results_multipart(job, results)

# single value only
out_data = out_data[0] if isinstance(out_data, list) else out_data
Expand All @@ -591,6 +600,39 @@ def get_job_results_response(
return resp


def get_job_results_multipart(job, results):
# type: (Job, ExecutionResults) -> HTTPOk
"""
Generates the :term:`Job` results multipart response from available or requested outputs.
.. seealso::
Function :func:`get_results` should be used to avoid re-processing all output format combinations.
:param job:
:param results: Pre-filtered and pre-processed results in a normalized format structure.
"""
# FIXME: https://github.com/crim-ca/weaver/issues/376
# implement multipart, both for multi-output IDs and array-output under same ID
multi = MIMEMultipart()
for res_id, result in results.items():
part = MIMEPart()
part.add_header() # other ? content-disposition filename from output ID?
# ctype header
part.set_type()
part.set_charset()
part.set_param() # in ctype
# data
part.set_payload()
multi.attach(part)

resp = HTTPOk(
detail=f"Multipart Response for {job}",
headers={"Content-Type": multi.get_content_type()},
)
resp.body = multi.as_bytes()
return resp


def get_job_submission_response(body, headers, error=False):
# type: (JSON, AnyHeadersContainer, bool) -> Union[HTTPOk, HTTPCreated, HTTPBadRequest]
"""
Expand Down

0 comments on commit 8624a90

Please sign in to comment.