Skip to content

Commit

Permalink
[wip] more setup of job results with multipart
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Sep 21, 2024
1 parent 884abf1 commit fddb3b3
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 79 deletions.
67 changes: 34 additions & 33 deletions tests/functional/test_wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -3486,7 +3486,7 @@ def test_execute_cwl_enum_schema_combined_type_single_array_from_wps(self, mock_
}
status_path = os.path.join(resources.FUNCTIONAL_APP_PKG, "Finch_EnsembleGridPointWetdays/status.xml")
status_url = f"{resources.TEST_REMOTE_SERVER_URL}/status.xml"
output_log_url = f"{resources.TEST_REMOTE_SERVER_URL}/output.txt"
output_log_url = f"{resources.TEST_REMOTE_SERVER_URL}/result.txt"
output_zip_url = f"{resources.TEST_REMOTE_SERVER_URL}/output.zip"
with open(status_path, mode="r", encoding="utf-8") as status_file:
status_body = status_file.read().format(
Expand Down Expand Up @@ -3640,7 +3640,7 @@ def test_execute_single_output_prefer_header_return_representation_complex(self)
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json == {
"output_json": {
"href": f"{out_url}/{job_id}/output_json/output.json",
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
},
}
Expand Down Expand Up @@ -3723,12 +3723,12 @@ def test_execute_single_output_prefer_header_return_minimal_complex(self):
assert results.status_code == 204, "No contents expected for minimal reference result."
assert results.body == b""
assert results.content_type.startswith(ContentType.APP_JSON)
assert results.headers["Content-Location"] == f"{out_url}/{job_id}/output_json/output.json"
assert results.headers["Content-Location"] == f"{out_url}/{job_id}/output_json/result.json"
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json == {
"output_json": {
"href": f"{out_url}/{job_id}/output_json/output.json",
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
},
}
Expand Down Expand Up @@ -3859,7 +3859,7 @@ def test_execute_single_output_response_raw_reference_literal(self):
assert results.status_code == 204, "No contents expected for single reference result."
assert results.body == b""
assert results.content_type.startswith(ContentType.TEXT_PLAIN)
assert results.headers["Content-Location"] == f"{out_url}/{job_id}/output_data/output.txt"
assert results.headers["Content-Location"] == f"{out_url}/{job_id}/output_data/result.txt"
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json["outputs"] == {
Expand Down Expand Up @@ -3904,12 +3904,12 @@ def test_execute_single_output_response_raw_reference_complex(self):
assert results.status_code == 204, "No contents expected for single reference result."
assert results.body == b""
assert results.content_type.startswith(ContentType.APP_JSON)
assert results.headers["Content-Location"] == f"{out_url}/{job_id}/output_json/output.json"
assert results.headers["Content-Location"] == f"{out_url}/{job_id}/output_json/result.json"
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json["outputs"] == {
"output_json": {
"href": f"{out_url}/{job_id}/output_json/output.json",
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
},
}
Expand Down Expand Up @@ -3963,6 +3963,7 @@ def test_execute_single_output_multipart_accept_data(self):
)
job_id = results_url.rsplit("/results")[0].rsplit("/jobs/")[-1]
assert is_uuid(job_id), f"Failed to retrieve the job ID: [{job_id}] is not a UUID"
out_url = get_wps_output_url(self.settings)

# validate the results based on original execution request
results = resp
Expand All @@ -3982,7 +3983,7 @@ def test_execute_single_output_multipart_accept_data(self):
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json["outputs"] == {
"output_json": {
"href": f"{out_url}/{job_id}/output_json/output.json",
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
},
}
Expand Down Expand Up @@ -4044,15 +4045,15 @@ def test_execute_single_output_multipart_accept_link(self):
--{boundary}
Content-Type: {ContentType.APP_JSON}
Content-ID: <output_json@{job_id}>
Content-Location: {out_url}/{job_id}/output_json/output.json
Content-Location: {out_url}/{job_id}/output_json/result.json
--{boundary}--
""")
assert results.text == results_body
outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT})
assert outputs.content_type.startswith(ContentType.APP_JSON)
assert outputs.json["outputs"] == {
"output_json": {
"href": f"{out_url}/{job_id}/output_json/output.json",
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
},
}
Expand Down Expand Up @@ -4181,7 +4182,7 @@ def test_execute_multi_output_multipart_accept(self, multipart_header):
path = f"/processes/{p_id}/execution"
resp = mocked_sub_requests(self.app, "post_json", path, timeout=5,
data=exec_content, headers=exec_headers, only_local=True)
assert resp.status_code == 201, f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}"
assert resp.status_code == 200, f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}"

# request status instead of results since not expecting 'document' JSON in this case
status_url = resp.json["location"]
Expand All @@ -4201,7 +4202,7 @@ def test_execute_multi_output_multipart_accept(self, multipart_header):
--{boundary}
Content-Type: {ContentType.APP_JSON}
Content-ID: <output_json@{job_id}>
Content-Location: {out_url}/{job_id}/output_json/output.json
Content-Location: {out_url}/{job_id}/output_json/result.json
--{boundary}--
""")
assert results.content_type.startswith(ContentType.MULTIPART_MIXED)
Expand All @@ -4211,7 +4212,7 @@ def test_execute_multi_output_multipart_accept(self, multipart_header):
assert outputs.json["outputs"] == {
"output_data": "test",
"output_json": {
"href": f"{out_url}/{job_id}/output_json/output.json",
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
},
}
Expand Down Expand Up @@ -4355,7 +4356,7 @@ def test_execute_multi_output_prefer_header_return_representation(self):
assert outputs.json["outputs"] == {
"output_data": "test",
"output_json": {
"href": f"{out_url}/{job_id}/output_json/output.json",
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
},
}
Expand Down Expand Up @@ -4418,7 +4419,7 @@ def test_execute_multi_output_response_raw_value(self):
assert outputs.json["outputs"] == {
"output_data": "test",
"output_json": {
"href": f"{out_url}/{job_id}/output_json/output.json",
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
},
}
Expand Down Expand Up @@ -4464,11 +4465,11 @@ def test_execute_multi_output_response_raw_reference(self):
--{boundary}
Content-Type: {ContentType.TEXT_PLAIN}
Content-ID: <output_data@{job_id}>
Content-Location: {out_url}/{job_id}/output_data/output.txt
Content-Location: {out_url}/{job_id}/output_data/result.txt
--{boundary}
Content-Type: {ContentType.APP_JSON}
Content-ID: <output_json@{job_id}>
Content-Location: {out_url}/{job_id}/output_json/output.json
Content-Location: {out_url}/{job_id}/output_json/result.json
--{boundary}--
""")
assert results.content_type.startswith(ContentType.MULTIPART_MIXED)
Expand All @@ -4478,7 +4479,7 @@ def test_execute_multi_output_response_raw_reference(self):
assert outputs.json["outputs"] == {
"output_data": "test",
"output_json": {
"href": f"{out_url}/{job_id}/output_json/output.json",
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
},
}
Expand Down Expand Up @@ -4531,7 +4532,7 @@ def test_execute_multi_output_response_raw_mixed(self):
--{boundary}
Content-Type: {ContentType.TEXT_PLAIN}
Content-ID: <output_text@{job_id}>
Content-Location: {out_url}/{job_id}/output_text/output.txt
Content-Location: {out_url}/{job_id}/output_text/result.txt
--{boundary}
Content-Type: {ContentType.APP_JSON}
Content-ID: <output_json@{job_id}>
Expand All @@ -4546,11 +4547,11 @@ def test_execute_multi_output_response_raw_mixed(self):
assert outputs.json["outputs"] == {
"output_data": "test",
"output_text": {
"href": f"{out_url}/{job_id}/output_text/output.txt",
"href": f"{out_url}/{job_id}/output_text/result.txt",
"type": ContentType.TEXT_PLAIN,
},
"output_json": {
"href": f"{out_url}/{job_id}/output_json/output.json",
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
},
}
Expand Down Expand Up @@ -4600,7 +4601,7 @@ def test_execute_multi_output_prefer_header_return_minimal_defaults(self):
assert results_json == {
"output_data": "test",
"output_json": {
"href": f"{out_url}/{job_id}/output_json/output.json",
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
},
}
Expand All @@ -4609,7 +4610,7 @@ def test_execute_multi_output_prefer_header_return_minimal_defaults(self):
assert outputs.json["outputs"] == {
"output_data": "test",
"output_json": {
"href": f"{out_url}/{job_id}/output_json/output.json",
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
},
}
Expand Down Expand Up @@ -4664,15 +4665,15 @@ def test_execute_multi_output_prefer_header_return_minimal_override_transmission
assert results.content_type.startswith(ContentType.APP_JSON)
assert results_json == {
"output_data": {
"href": f"{out_url}/{job_id}/output_text/output.txt",
"href": f"{out_url}/{job_id}/output_text/result.txt",
"type": ContentType.TEXT_PLAIN,
},
"output_json": {
"value": output_json,
"mediaType": ContentType.APP_JSON,
},
"output_text": {
"href": f"{out_url}/{job_id}/output_text/output.txt",
"href": f"{out_url}/{job_id}/output_text/result.txt",
"type": ContentType.TEXT_PLAIN,
},
}
Expand All @@ -4681,11 +4682,11 @@ def test_execute_multi_output_prefer_header_return_minimal_override_transmission
assert outputs.json["outputs"] == {
"output_data": "test",
"output_json": {
"href": f"{out_url}/{job_id}/output_json/output.json",
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
},
"output_text": {
"href": f"{out_url}/{job_id}/output_text/output.txt",
"href": f"{out_url}/{job_id}/output_text/result.txt",
"type": ContentType.TEXT_PLAIN,
},
}
Expand Down Expand Up @@ -4735,7 +4736,7 @@ def test_execute_multi_output_response_document_defaults(self):
assert results_json == {
"output_data": "test",
"output_json": {
"href": f"{out_url}/{job_id}/output_json/output.json",
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
},
}
Expand All @@ -4744,7 +4745,7 @@ def test_execute_multi_output_response_document_defaults(self):
assert outputs.json["outputs"] == {
"output_data": "test",
"output_json": {
"href": f"{out_url}/{job_id}/output_json/output.json",
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
},
}
Expand Down Expand Up @@ -4796,15 +4797,15 @@ def test_execute_multi_output_response_document_mixed(self):
assert results.content_type.startswith(ContentType.APP_JSON)
assert results_json == {
"output_data": {
"href": f"{out_url}/{job_id}/output_text/output.txt",
"href": f"{out_url}/{job_id}/output_text/result.txt",
"type": ContentType.TEXT_PLAIN,
},
"output_json": {
"value": output_json,
"mediaType": ContentType.APP_JSON,
},
"output_text": {
"href": f"{out_url}/{job_id}/output_text/output.txt",
"href": f"{out_url}/{job_id}/output_text/result.txt",
"type": ContentType.TEXT_PLAIN,
},
}
Expand All @@ -4813,11 +4814,11 @@ def test_execute_multi_output_response_document_mixed(self):
assert outputs.json["outputs"] == {
"output_data": "test",
"output_json": {
"href": f"{out_url}/{job_id}/output_json/output.json",
"href": f"{out_url}/{job_id}/output_json/result.json",
"type": ContentType.APP_JSON,
},
"output_text": {
"href": f"{out_url}/{job_id}/output_text/output.txt",
"href": f"{out_url}/{job_id}/output_text/result.txt",
"type": ContentType.TEXT_PLAIN,
},
}
Expand Down
25 changes: 21 additions & 4 deletions weaver/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from weaver import xml_util
from weaver.exceptions import ProcessInstanceError, ServiceParsingError
from weaver.execute import ExecuteControlOption, ExecuteMode, ExecuteResponse, ExecuteTransmissionMode
from weaver.formats import AcceptLanguage, ContentType, repr_json
from weaver.formats import AcceptLanguage, ContentType, OutputFormat, repr_json
from weaver.processes.constants import (
CWL_NAMESPACE_WEAVER_ID,
CWL_REQUIREMENT_APP_DOCKER,
Expand All @@ -63,6 +63,7 @@
get_job_log_msg,
get_log_date_fmt,
get_log_fmt,
get_path_kvp,
get_settings,
now,
request_extra
Expand Down Expand Up @@ -1363,6 +1364,7 @@ def links(self, container=None, self_link=None):
:param self_link: name of a section that represents the current link that will be returned.
"""
settings = get_settings(container)
html_on = settings.get("weaver.wps_restapi_html", True)
base_url = get_wps_restapi_base_url(settings)
job_url = self._job_url(base_url) # full URL
job_path = base_url + sd.job_service.path.format(job_id=self.id)
Expand All @@ -1371,15 +1373,28 @@ def links(self, container=None, self_link=None):
job_links = [
{"href": job_url, "rel": "status", "title": "Job status."}, # OGC
{"href": job_url, "rel": "monitor", "title": "Job monitoring location."}, # IANA
{"href": job_path, "rel": "alternate", "title": "Job status generic endpoint."}, # IANA
{"href": job_list, "rel": "collection", "title": "List of submitted jobs."}, # IANA
{"href": get_path_kvp(job_path, f=OutputFormat.JSON), "type": ContentType.APP_JSON,
"rel": "alternate", "title": "Job status generic endpoint."}, # IANA
{"href": job_list, "rel": "http://www.opengis.net/def/rel/ogc/1.0/job-list", # OGC
"title": "List of submitted jobs."},
{"href": job_exec, "rel": "http://www.opengis.net/def/rel/ogc/1.0/execute",
"title": "New job submission endpoint for the corresponding process."},
{"href": f"{job_url}/inputs", "rel": "inputs", # unofficial
"title": "Submitted job inputs for process execution."}
]
if html_on:
job_links.append({
"href": get_path_kvp(job_path, f=OutputFormat.HTML),
"rel": "alternate",
"title": "HTML Job Status",
"type": ContentType.TEXT_HTML,
})
if self_link in ["status", None]:
job_links.extend([
{"href": job_list, "rel": "collection", "title": "List of submitted jobs."}, # IANA

])

if self.status in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]:
job_status = map_status(self.status)
if job_status == Status.SUCCEEDED:
Expand Down Expand Up @@ -1413,7 +1428,8 @@ def links(self, container=None, self_link=None):
job_links.extend([self_link_body, self_link_up])
link_meta = {"type": ContentType.APP_JSON, "hreflang": AcceptLanguage.EN_CA}
for link in job_links:
link.update(link_meta)
for meta, parma in link_meta.items():
link.setdefault(meta, parma)
return job_links

def json(self, container=None): # pylint: disable=W0221,arguments-differ
Expand Down Expand Up @@ -1484,6 +1500,7 @@ def params(self):
"request": self.request,
"response": self.response,
"subscribers": self.subscribers,
"accept_type": self.accept_type,
"accept_language": self.accept_language,
}

Expand Down
5 changes: 3 additions & 2 deletions weaver/processes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
from pywps.inout.inputs import BoundingBoxInput, ComplexInput

from weaver.datatype import Job
from weaver.execute import AnyExecuteMode
from weaver.processes.convert import OWS_Input_Type, ProcessOWS
from weaver.status import StatusType
from weaver.typedefs import (
Expand Down Expand Up @@ -836,7 +837,7 @@ def submit_job_handler(payload, # type: ProcessExecution
# sync not respected, therefore must drop it
# since both could be provided as alternative preferences, drop only async with limited subset
prefer = get_header("Preference-Applied", headers, pop=True)
_, _, async_applied = parse_prefer_header_execute_mode({"Prefer": prefer}, [ExecuteMode.ASYNC])
_, _, async_applied = parse_prefer_header_execute_mode({"Prefer": prefer}, [ExecuteControlOption.ASYNC])
if async_applied:
resp_headers.update(async_applied)

Expand All @@ -853,7 +854,7 @@ def submit_job_handler(payload, # type: ProcessExecution


def validate_job_accept_header(headers, execution_mode):
# type: (AnyHeadersContainer, ExecuteMode) -> Optional[str]
# type: (AnyHeadersContainer, AnyExecuteMode) -> Optional[str]
"""
Validate that the submitted ``Accept`` header is permitted.
"""
Expand Down
Loading

0 comments on commit fddb3b3

Please sign in to comment.