From fd0e83e42ff0e624bdcba0878c9cc5b40e81a342 Mon Sep 17 00:00:00 2001 From: Francis Charette-Migneault Date: Tue, 22 Mar 2022 23:49:15 -0400 Subject: [PATCH] adjust sync result returned directly + support response=raw for single outputs or multi-by-ref (relates to #376) --- CHANGES.rst | 4 + tests/functional/test_builtin.py | 296 ++++++--- tests/functional/utils.py | 2 +- weaver/datatype.py | 46 +- weaver/execute.py | 11 + weaver/processes/execution.py | 64 +- weaver/processes/utils.py | 26 - weaver/store/base.py | 2 + weaver/store/mongodb.py | 3 + weaver/typedefs.py | 8 +- weaver/wps/service.py | 9 +- weaver/wps_restapi/api.py | 12 +- weaver/wps_restapi/jobs/jobs.py | 596 +----------------- weaver/wps_restapi/jobs/utils.py | 696 ++++++++++++++++++++++ weaver/wps_restapi/processes/processes.py | 5 +- weaver/wps_restapi/providers/providers.py | 7 +- weaver/wps_restapi/swagger_definitions.py | 14 +- 17 files changed, 1070 insertions(+), 731 deletions(-) create mode 100644 weaver/wps_restapi/jobs/utils.py diff --git a/CHANGES.rst b/CHANGES.rst index fd447dc9a..40d9da52a 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -34,6 +34,10 @@ Changes: requested this way (resolves `#377 `_). - Updated every ``Process`` to report that they support ``outputTransmission`` both as ``reference`` and ``value``, since handling of results is accomplished by `Weaver` itself, regardless of the application being executed. +- Add partial support of ``response=raw`` parameter for execution request submission in order to handle results to + be returned accordingly to specified ``outputTransmission`` by ``reference`` or ``value``. + Multipart contents for multi-output results are not yet supported + (relates to `#376 `_). - Add `CLI` option ``-R/--ref/--reference`` for ``execute`` operation allowing to request corresponding ``outputs`` by ID to be returned using the ``transmissionMode: reference`` method, producing HTTP ``Link`` headers for those entries rather than inserting values in the response content body. diff --git a/tests/functional/test_builtin.py b/tests/functional/test_builtin.py index 4230a9485..1ee5a69b1 100644 --- a/tests/functional/test_builtin.py +++ b/tests/functional/test_builtin.py @@ -86,30 +86,80 @@ def test_jsonarray2netcdf_describe_ogc_schema(self): assert body["jobControlOptions"] == [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC] assert body["outputTransmission"] == [ExecuteTransmissionMode.REFERENCE, ExecuteTransmissionMode.VALUE] - def test_jsonarray2netcdf_execute_async(self): + def setup_inputs(self, stack): dirname = tempfile.gettempdir() nc_data = "Hello NetCDF!" + tmp_ncdf = tempfile.NamedTemporaryFile(dir=dirname, mode="w", suffix=".nc") + tmp_json = tempfile.NamedTemporaryFile(dir=dirname, mode="w", suffix=".json") + tmp_ncdf = stack.enter_context(tmp_ncdf) # noqa + tmp_json = stack.enter_context(tmp_json) # noqa + tmp_ncdf.write(nc_data) + tmp_ncdf.seek(0) + tmp_json.write(json.dumps(["file://{}".format(os.path.join(dirname, tmp_ncdf.name))])) + tmp_json.seek(0) + body = {"inputs": [{"id": "input", "href": os.path.join(dirname, tmp_json.name)}]} + return body, nc_data + + def validate_results(self, results, outputs, data, links): + # first validate format of OGC-API results + if results is not None: + assert isinstance(results, dict) + assert "output" in results, "Expected result ID 'output' in response body" + assert isinstance(results["output"], dict), "Container of result ID 'output' should be a dict" + assert "href" in results["output"] + assert "format" in results["output"] + fmt = results["output"]["format"] # type: JSON + assert isinstance(fmt, dict), "Result format should be provided with content details" + assert "mediaType" in fmt + assert isinstance(fmt["mediaType"], str), "Result format Content-Type should be a single string definition" + assert fmt["mediaType"] == ContentType.APP_NETCDF, "Result 'output' format expected to be NetCDF file" + nc_href = results["output"]["href"] + assert isinstance(nc_href, str) and len(nc_href) + elif links: + assert isinstance(links, list) and len(links) == 1 and isinstance(links[0], tuple) + assert "rel=\"output\"" in links[0][1] + assert f"type={ContentType.APP_NETCDF}" in links[0][1] + nc_link = links[0][1].split(" ")[0] + assert nc_link.startswith("<") and nc_link.startswith(">") + nc_href = nc_link[1:-1] + else: + nc_href = None + + settings = get_settings_from_testapp(self.app) + wps_path = settings.get("weaver.wps_output_path") + wps_dir = settings.get("weaver.wps_output_dir") + wps_out = "{}{}".format(settings.get("weaver.url"), wps_path) + + # validate results if applicable + if nc_href is not None: + nc_real_path = nc_href.replace(wps_out, wps_dir) + assert nc_href.startswith(wps_out) + assert os.path.split(nc_real_path)[-1] == os.path.split(nc_href)[-1] + assert os.path.isfile(nc_real_path) + with open(nc_real_path, "r") as f: + assert f.read() == data + + # if everything was valid for results, validate equivalent but differently formatted outputs response + assert outputs["outputs"][0]["id"] == "output" + nc_href = outputs["outputs"][0]["href"] + assert isinstance(nc_href, str) and len(nc_href) + assert nc_href.startswith(wps_out) + nc_real_path = nc_href.replace(wps_out, wps_dir) + assert os.path.split(nc_real_path)[-1] == os.path.split(nc_href)[-1] + + def test_jsonarray2netcdf_execute_async(self): with contextlib.ExitStack() as stack_exec: - tmp_ncdf = tempfile.NamedTemporaryFile(dir=dirname, mode="w", suffix=".nc") - tmp_json = tempfile.NamedTemporaryFile(dir=dirname, mode="w", suffix=".json") - tmp_ncdf = stack_exec.enter_context(tmp_ncdf) # noqa - tmp_json = stack_exec.enter_context(tmp_json) # noqa - tmp_ncdf.write(nc_data) - tmp_ncdf.seek(0) - tmp_json.write(json.dumps(["file://{}".format(os.path.join(dirname, tmp_ncdf.name))])) - tmp_json.seek(0) - data = { + body, nc_data = self.setup_inputs(stack_exec) + body.update({ "mode": ExecuteMode.ASYNC, "response": ExecuteResponse.DOCUMENT, - "inputs": [{"id": "input", "href": os.path.join(dirname, tmp_json.name)}], "outputs": [{"id": "output", "transmissionMode": ExecuteTransmissionMode.VALUE}], - } - + }) for mock_exec in mocked_execute_celery(): stack_exec.enter_context(mock_exec) path = "/processes/jsonarray2netcdf/jobs" resp = mocked_sub_requests(self.app, "post_json", path, - data=data, headers=self.json_headers, only_local=True) + data=body, headers=self.json_headers, only_local=True) assert resp.status_code == 201, "Error: {}".format(resp.json) assert resp.content_type in ContentType.APP_JSON @@ -127,48 +177,180 @@ def test_jsonarray2netcdf_execute_async(self): assert resp.status_code == 200, "Error job outputs:\n{}".format(resp.json) outputs = resp.json - self.validate_results(results, outputs, nc_data) + self.validate_results(results, outputs, nc_data, None) - def test_jsonarray2netcdf_execute_sync(self): - dirname = tempfile.gettempdir() - nc_data = "Hello NetCDF!" + def test_jsonarray2netcdf_execute_async_output_by_reference_dontcare_response_document(self): + """ + Jobs submitted with ``response=document`` are not impacted by ``transmissionMode``. + + The results schema should always be returned when document is requested. + + .. seealso:: + https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-document + """ with contextlib.ExitStack() as stack_exec: - tmp_ncdf = tempfile.NamedTemporaryFile(dir=dirname, mode="w", suffix=".nc") - tmp_json = tempfile.NamedTemporaryFile(dir=dirname, mode="w", suffix=".json") - tmp_ncdf = stack_exec.enter_context(tmp_ncdf) # noqa - tmp_json = stack_exec.enter_context(tmp_json) # noqa - tmp_ncdf.write(nc_data) - tmp_ncdf.seek(0) - tmp_json.write(json.dumps(["file://{}".format(os.path.join(dirname, tmp_ncdf.name))])) - tmp_json.seek(0) - data = { - "inputs": [{"id": "input", "href": os.path.join(dirname, tmp_json.name)}], - "outputs": [{"id": "output", "transmissionMode": ExecuteTransmissionMode.VALUE}], - } - headers = {"Prefer": "wait=10"} - headers.update(self.json_headers) + body, nc_data = self.setup_inputs(stack_exec) + body.update({ + "response": ExecuteResponse.DOCUMENT, # by value/reference don't care because of this + "outputs": [{"id": "output", "transmissionMode": ExecuteTransmissionMode.REFERENCE}], + }) + for mock_exec in mocked_execute_celery(): + stack_exec.enter_context(mock_exec) + path = "/processes/jsonarray2netcdf/jobs" + resp = mocked_sub_requests(self.app, "post_json", path, + data=body, headers=self.json_headers, only_local=True) + + assert resp.status_code == 201, "Error: {}".format(resp.json) + assert resp.content_type in ContentType.APP_JSON + job_url = resp.json["location"] + self.monitor_job(job_url, return_status=True) # don't fetch results automatically + + resp = self.app.get("{}/results".format(job_url), headers=self.json_headers) + assert resp.status_code == 200, "Error: {}".format(resp.text) + assert resp.content_type == ContentType.APP_JSON + result_links = [hdr for hdr in resp.headers if hdr[0].lower() == "link"] + assert len(result_links) == 0 + results = resp.json + + # even though results are requested by Link reference, + # Weaver still offers them with document on outputs endpoint + output_url = job_url + "/outputs" + resp = self.app.get(output_url, headers=self.json_headers) + assert resp.status_code == 200, "Error job outputs:\n{}".format(resp.text) + outputs = resp.json + + self.validate_results(results, outputs, nc_data, result_links) + + def test_jsonarray2netcdf_execute_async_output_by_value_response_raw(self): + """ + Jobs submitted with ``response=raw`` and single output as ``transmissionMode=value`` must return its raw data. + + .. seealso:: + https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-raw-value-one + """ + with contextlib.ExitStack() as stack_exec: + body, nc_data = self.setup_inputs(stack_exec) + body.update({ + "response": ExecuteResponse.RAW, # by value/reference important here + # NOTE: quantity of outputs important as well + # since single output, content-type is directly that output (otherwise should be multipart) + "outputs": [{"id": "output", "transmissionMode": ExecuteTransmissionMode.VALUE}], # data dump + }) + for mock_exec in mocked_execute_celery(): + stack_exec.enter_context(mock_exec) + path = "/processes/jsonarray2netcdf/jobs" + resp = mocked_sub_requests(self.app, "post_json", path, + data=body, headers=self.json_headers, only_local=True) + + assert resp.status_code == 201, "Error: {}".format(resp.text) + assert resp.content_type in ContentType.APP_JSON + job_url = resp.json["location"] + self.monitor_job(job_url, return_status=True) # don't fetch results automatically + + resp = self.app.get("{}/results".format(job_url), headers=self.json_headers) + assert resp.status_code < 400, "Error: {}".format(resp.text) + assert resp.status_code == 200, "Body should contain literal raw data dump" + assert resp.content_type in ContentType.APP_NETCDF, "raw result by value should be directly the content-type" + assert resp.text == nc_data, "raw result by value should be directly the data content" + assert resp.headers + result_links = [hdr for hdr in resp.headers if hdr[0].lower() == "link"] + assert len(result_links) == 0 + + # even though results are requested by raw data, + # Weaver still offers them with document on outputs endpoint + output_url = job_url + "/outputs" + resp = self.app.get(output_url, headers=self.json_headers) + assert resp.status_code == 200, "Error job outputs:\n{}".format(resp.text) + outputs = resp.json + self.validate_results(None, outputs, nc_data, result_links) + + def test_jsonarray2netcdf_execute_async_output_by_reference_response_raw(self): + """ + Jobs submitted with ``response=raw`` and single output as ``transmissionMode=reference`` must a link. + + Contents should be empty, and the reference should be provided with HTTP ``Link`` header. + + .. seealso:: + https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-raw-ref + """ + with contextlib.ExitStack() as stack_exec: + body, nc_data = self.setup_inputs(stack_exec) + body.update({ + "response": ExecuteResponse.RAW, # by value/reference important here + "outputs": [{"id": "output", "transmissionMode": ExecuteTransmissionMode.REFERENCE}], # Link header + }) for mock_exec in mocked_execute_celery(): stack_exec.enter_context(mock_exec) path = "/processes/jsonarray2netcdf/jobs" resp = mocked_sub_requests(self.app, "post_json", path, - data=data, headers=headers, only_local=True) + data=body, headers=self.json_headers, only_local=True) + + assert resp.status_code == 201, "Error: {}".format(resp.json) + assert resp.content_type in ContentType.APP_JSON + job_url = resp.json["location"] + self.monitor_job(job_url, return_status=True) # don't fetch results automatically + + resp = self.app.get("{}/results".format(job_url), headers=self.json_headers) + assert resp.status_code < 400, "Error: {}".format(resp.json) + assert resp.status_code == 204, "Body should be empty since all outputs requested by reference (Link header)" + assert resp.content_type is None + assert resp.headers + result_links = [hdr for hdr in resp.headers if hdr[0].lower() == "link"] + + # even though results are requested by Link reference, + # Weaver still offers them with document on outputs endpoint + output_url = job_url + "/outputs" + resp = self.app.get(output_url, headers=self.json_headers) + assert resp.status_code == 200, "Error job outputs:\n{}".format(resp.json) + outputs = resp.json + + self.validate_results(None, outputs, nc_data, result_links) + + def test_jsonarray2netcdf_execute_sync(self): + """ + Job submitted with ``mode=sync`` or ``Prefer`` header for sync should respond directly with the results schema. + + .. seealso:: + https://docs.ogc.org/is/18-062r2/18-062r2.html#sc_execute_response + """ + with contextlib.ExitStack() as stack_exec: + body, nc_data = self.setup_inputs(stack_exec) + body.update({ + "response": ExecuteResponse.DOCUMENT, + "outputs": [{"id": "output", "transmissionMode": ExecuteTransmissionMode.VALUE}] + }) + for mock_exec in mocked_execute_celery(): + stack_exec.enter_context(mock_exec) + headers = {"Prefer": "wait=10"} + headers.update(self.json_headers) + path = "/processes/jsonarray2netcdf/jobs" + resp = mocked_sub_requests(self.app, "post_json", path, + data=body, headers=headers, only_local=True) assert resp.status_code == 200, "Error: {}".format(resp.json) assert resp.content_type in ContentType.APP_JSON - # since sync, all status details are already available! - assert resp.json["status"] == Status.SUCCEEDED + # since sync, results are directly available instead of job status + # even if results are returned directly (instead of status), + # status location link is available for reference as needed assert "Location" in resp.headers - # validate indeed sync + # validate sync was indeed applied (in normal situation, not considering mock test that runs in sync) assert resp.headers["Preference-Applied"] == headers["Prefer"] - # following details not available yet in async, but are in sync - assert isinstance(resp.json["created"], str) and resp.json["created"] - assert isinstance(resp.json["finished"], str) and resp.json["finished"] - assert isinstance(resp.json["duration"], str) and resp.json["duration"] - assert isinstance(resp.json["progress"], int) and resp.json["progress"] == 100 + # following details should not be available since results are returned in sync instead of async job status + for field in ["status", "created", "finished", "duration", "progress"]: + assert field not in resp.json + # validate that job can still be found and its metadata are defined although executed in sync job_url = resp.headers["Location"] + resp = self.app.get(job_url, headers=self.json_headers) + assert resp.status_code == 200 + assert resp.content_type == ContentType.APP_JSON + for field in ["status", "created", "finished", "duration", "progress"]: + assert field in resp.json + assert resp.json["status"] == Status.SUCCEEDED + assert resp.json["progress"] == 100 + out_url = f"{job_url}/results" resp = self.app.get(out_url, headers=self.json_headers) assert resp.status_code == 200 @@ -180,34 +362,4 @@ def test_jsonarray2netcdf_execute_sync(self): assert resp.status_code == 200, "Error job outputs:\n{}".format(resp.json) outputs = resp.json - self.validate_results(results, outputs, nc_data) - - def validate_results(self, results, outputs, data): - - # first validate format of OGC-API results - assert "output" in results, "Expected result ID 'output' in response body" - assert isinstance(results["output"], dict), "Container of result ID 'output' should be a dict" - assert "href" in results["output"] - assert "format" in results["output"] - fmt = results["output"]["format"] # type: JSON - assert isinstance(fmt, dict), "Result format should be provided with content details" - assert "mediaType" in fmt - assert isinstance(fmt["mediaType"], str), "Result format Content-Type should be a single string definition" - assert fmt["mediaType"] == ContentType.APP_NETCDF, "Result 'output' format expected to be NetCDF file" - nc_path = results["output"]["href"] - assert isinstance(nc_path, str) and len(nc_path) - settings = get_settings_from_testapp(self.app) - wps_out = "{}{}".format(settings.get("weaver.url"), settings.get("weaver.wps_output_path")) - nc_real_path = nc_path.replace(wps_out, settings.get("weaver.wps_output_dir")) - assert nc_path.startswith(wps_out) - assert os.path.split(nc_real_path)[-1] == os.path.split(nc_path)[-1] - assert os.path.isfile(nc_real_path) - with open(nc_real_path, "r") as f: - assert f.read() == data - - # if everything was valid for results, validate equivalent but differently formatted outputs response - assert outputs["outputs"][0]["id"] == "output" - nc_path = outputs["outputs"][0]["href"] - assert isinstance(nc_path, str) and len(nc_path) - assert nc_path.startswith(wps_out) - assert os.path.split(nc_real_path)[-1] == os.path.split(nc_path)[-1] + self.validate_results(results, outputs, nc_data, None) diff --git a/tests/functional/utils.py b/tests/functional/utils.py index c807b69cb..6b459fde4 100644 --- a/tests/functional/utils.py +++ b/tests/functional/utils.py @@ -284,7 +284,7 @@ def check_job_status(_resp, running=False): if return_status or expect_failed: return resp.json resp = self.app.get("{}/results".format(status_url), headers=self.json_headers) - assert resp.status_code == 200, "Error job info:\n{}".format(resp.json) + assert resp.status_code == 200, "Error job info:\n{}".format(resp.text) return resp.json def get_outputs(self, status_url): diff --git a/weaver/datatype.py b/weaver/datatype.py index d07ea3c7b..add6a9210 100644 --- a/weaver/datatype.py +++ b/weaver/datatype.py @@ -30,7 +30,7 @@ from weaver import xml_util from weaver.exceptions import ProcessInstanceError, ServiceParsingError -from weaver.execute import ExecuteControlOption, ExecuteMode, ExecuteTransmissionMode +from weaver.execute import ExecuteControlOption, ExecuteMode, ExecuteResponse, ExecuteTransmissionMode from weaver.formats import AcceptLanguage, ContentType, repr_json from weaver.processes.constants import ProcessSchema from weaver.processes.convert import get_field, null, ows2json, wps2json_io @@ -58,7 +58,7 @@ from owslib.wps import WebProcessingService - from weaver.execute import AnyExecuteControlOption, AnyExecuteTransmissionMode + from weaver.execute import AnyExecuteControlOption, AnyExecuteMode, AnyExecuteResponse, AnyExecuteTransmissionMode from weaver.processes.constants import ProcessSchemaType from weaver.processes.types import AnyProcessType from weaver.quotation.status import AnyQuoteStatus @@ -837,6 +837,21 @@ def status_location(self, location_url): raise TypeError(f"Type 'str' is required for '{self.__name__}.status_location'") self["status_location"] = location_url + def status_url(self, container=None): + # type: (Optional[AnySettingsContainer]) -> str + """ + Obtain the resolved endpoint where the :term:`Job` status information can be obtained. + """ + settings = get_settings(container) + location_base = "/providers/{provider_id}".format(provider_id=self.service) if self.service else "" + location_url = "{base_url}{location_base}/processes/{process_id}/jobs/{job_id}".format( + base_url=get_wps_restapi_base_url(settings), + location_base=location_base, + process_id=self.process, + job_id=self.id + ) + return location_url + @property def notification_email(self): # type: () -> Optional[str] @@ -873,18 +888,39 @@ def execute_sync(self): @property def execution_mode(self): - # type: () -> ExecuteMode + # type: () -> AnyExecuteMode return ExecuteMode.get(self.get("execution_mode"), ExecuteMode.ASYNC) @execution_mode.setter def execution_mode(self, mode): - # type: (Union[ExecuteMode, str]) -> None + # type: (Union[AnyExecuteMode, str]) -> None exec_mode = ExecuteMode.get(mode) if exec_mode not in ExecuteMode: modes = list(ExecuteMode.values()) raise ValueError(f"Invalid value for '{self.__name__}.execution_mode'. Must be one of {modes}") self["execution_mode"] = mode + @property + def execution_response(self): + # type: () -> AnyExecuteResponse + out = self.setdefault("execution_response", ExecuteResponse.DOCUMENT) + if out not in ExecuteResponse.values(): + out = ExecuteResponse.DOCUMENT + self["execution_response"] = out + return out + + @execution_response.setter + def execution_response(self, response): + # type: (Optional[Union[AnyExecuteResponse, str]]) -> None + if response is None: + exec_resp = ExecuteResponse.DOCUMENT + else: + exec_resp = ExecuteResponse.get(response) + if exec_resp not in ExecuteResponse: + resp = list(ExecuteResponse.values()) + raise ValueError(f"Invalid value for '{self.__name__}.execution_response'. Must be one of {resp}") + self["execution_response"] = exec_resp + @property def is_local(self): # type: () -> bool @@ -1216,10 +1252,12 @@ def params(self): "service": self.service, "process": self.process, "inputs": self.inputs, + "outputs": self.outputs, "user_id": self.user_id, "status": self.status, "status_message": self.status_message, "status_location": self.status_location, + "execution_response": self.execution_response, "execution_mode": self.execution_mode, "is_workflow": self.is_workflow, "created": self.created, diff --git a/weaver/execute.py b/weaver/execute.py index 986640de8..a51787447 100644 --- a/weaver/execute.py +++ b/weaver/execute.py @@ -2,6 +2,9 @@ from weaver.base import Constants +if TYPE_CHECKING: + from typing import List + class ExecuteMode(Constants): AUTO = "auto" @@ -13,6 +16,14 @@ class ExecuteControlOption(Constants): ASYNC = "async-execute" SYNC = "sync-execute" + @classmethod + def values(cls): + # type: () -> List[AnyExecuteControlOption] + """ + Return default control options in specific order according to preferred modes for execution by `Weaver`. + """ + return [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC] + class ExecuteResponse(Constants): RAW = "raw" diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index e089f8ac5..f8b34a4cf 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -8,12 +8,12 @@ from celery.exceptions import TimeoutError as CeleryTaskTimeoutError from owslib.util import clean_ows_url from owslib.wps import ComplexDataInput -from pyramid.httpexceptions import HTTPBadRequest, HTTPNotAcceptable, HTTPNotImplemented +from pyramid.httpexceptions import HTTPBadRequest, HTTPNotAcceptable from pyramid_celery import celery_app as app from weaver.database import get_db from weaver.datatype import Process, Service -from weaver.execute import ExecuteControlOption, ExecuteMode, ExecuteResponse, ExecuteTransmissionMode +from weaver.execute import ExecuteControlOption, ExecuteMode from weaver.formats import AcceptLanguage, ContentType from weaver.notify import encrypt_email, notify_job_complete from weaver.owsexceptions import OWSNoApplicableCode @@ -46,7 +46,7 @@ load_pywps_config ) from weaver.wps_restapi import swagger_definitions as sd -from weaver.wps_restapi.utils import get_wps_restapi_base_url +from weaver.wps_restapi.jobs.utils import get_job_results_response, get_job_submission_response LOGGER = logging.getLogger(__name__) if TYPE_CHECKING: @@ -60,7 +60,7 @@ from weaver.datatype import Job from weaver.processes.convert import OWS_Input_Type, ProcessOWS from weaver.status import StatusType - from weaver.typedefs import CeleryResult, HeadersType, HeaderCookiesType, JSON, SettingsType + from weaver.typedefs import AnyResponseType, CeleryResult, HeadersType, HeaderCookiesType, JSON, SettingsType from weaver.visibility import AnyVisibility @@ -442,7 +442,7 @@ def map_locations(job, settings): def submit_job(request, reference, tags=None): - # type: (Request, Union[Service, Process], Optional[List[str]]) -> Tuple[JSON, HeadersType] + # type: (Request, Union[Service, Process], Optional[List[str]]) -> AnyResponseType """ Generates the job submission from details retrieved in the request. @@ -498,28 +498,6 @@ def submit_job(request, reference, tags=None): visibility, language=lang, headers=headers, tags=tags, user=user, context=context) -def _validate_job_parameters(json_body): - # type: (JSON) -> None - """ - Tests supported parameters not automatically validated by colander deserialize since they are optional. - """ - exec_mode = json_body.get("mode") - if exec_mode not in [None, ExecuteMode.ASYNC, ExecuteMode.AUTO]: - raise HTTPNotImplemented(detail=f"Execution mode '{exec_mode}' not supported.") - - resp_mode = json_body.get("response") - if resp_mode not in [None, ExecuteResponse.DOCUMENT]: - raise HTTPNotImplemented(detail=f"Execution response type '{resp_mode}' not supported.") - - outputs = json_body.get("outputs", []) - if isinstance(outputs, dict): - outputs = [dict(id=out, **keys) for out, keys in outputs.items()] - for job_output in outputs: - mode = job_output["transmissionMode"] - if mode not in ExecuteTransmissionMode.values(): - raise HTTPNotImplemented(detail=f"Execute transmissionMode '{mode}' not supported.") - - def submit_job_handler(payload, # type: JSON settings, # type: SettingsType service_url, # type: str @@ -533,7 +511,7 @@ def submit_job_handler(payload, # type: JSON tags=None, # type: Optional[List[str]] user=None, # type: Optional[int] context=None, # type: Optional[str] - ): # type: (...) -> Tuple[JSON, HeadersType] + ): # type: (...) -> AnyResponseType """ Submits the job to the Celery worker with provided parameters. @@ -544,11 +522,6 @@ def submit_job_handler(payload, # type: JSON except colander.Invalid as ex: raise HTTPBadRequest("Invalid schema: [{}]".format(str(ex))) - # TODO: remove when all parameter variations are supported - # FIXME: - # - support 'response: raw' (https://github.com/crim-ca/weaver/issues/376) - # - allow omitting 'outputs' (https://github.com/crim-ca/weaver/issues/375) - _validate_job_parameters(json_body) db = get_db(settings) headers = headers or {} if is_local: @@ -566,6 +539,7 @@ def submit_job_handler(payload, # type: JSON # as per https://datatracker.ietf.org/doc/html/rfc7240#section-2 # Prefer header not resolve as valid still proces is_execute_async = mode != ExecuteMode.SYNC + exec_resp = json_body.get("response") notification_email = json_body.get("notification_email") encrypted_email = encrypt_email(notification_email, settings) if notification_email else None @@ -573,17 +547,12 @@ def submit_job_handler(payload, # type: JSON store = db.get_store(StoreJobs) # type: StoreJobs job = store.save_job(task_id=Status.ACCEPTED, process=process_id, service=provider_id, inputs=json_body.get("inputs"), is_local=is_local, is_workflow=is_workflow, - access=visibility, user_id=user, execute_async=is_execute_async, custom_tags=tags, - notification_email=encrypted_email, accept_language=language, context=context) + access=visibility, user_id=user, context=context, + execute_async=is_execute_async, execute_response=exec_resp, + custom_tags=tags, notification_email=encrypted_email, accept_language=language) job.save_log(logger=LOGGER, message="Job task submitted for execution.", status=Status.ACCEPTED, progress=0) job = store.update_job(job) - location_base = "/providers/{provider_id}".format(provider_id=provider_id) if provider_id else "" - location_url = "{base_url}{location_base}/processes/{process_id}/jobs/{job_id}".format( - base_url=get_wps_restapi_base_url(settings), - location_base=location_base, - process_id=process_id, - job_id=job.id - ) + location_url = job.status_url(settings) resp_headers = {"Location": location_url} resp_headers.update(applied) @@ -598,9 +567,15 @@ def submit_job_handler(payload, # type: JSON pass if result.ready(): job = store.fetch_by_id(job.id) + # when sync is successful, it must return the results direct instead of status info + # see: https://docs.ogc.org/is/18-062r2/18-062r2.html#sc_execute_response + if job.status == Status.SUCCEEDED: + return get_job_results_response(job, settings, headers=resp_headers) + # otherwise return the error status body = job.json(container=settings, self_link="status") body["location"] = location_url - return body, resp_headers + resp = get_job_submission_response(body, resp_headers, error=True) + return resp else: LOGGER.debug("Celery task requested as sync took too long to complete (wait=%ss). Continue in async.", wait) # sync not respected, therefore must drop it @@ -618,4 +593,5 @@ def submit_job_handler(payload, # type: JSON "status": map_status(Status.ACCEPTED), "location": location_url } - return body, resp_headers + resp = get_job_submission_response(body, resp_headers) + return resp diff --git a/weaver/processes/utils.py b/weaver/processes/utils.py index d3cfe58b9..fdd536640 100644 --- a/weaver/processes/utils.py +++ b/weaver/processes/utils.py @@ -103,32 +103,6 @@ def get_process(process_id=None, request=None, settings=None, store=None): raise HTTPBadRequest("Invalid schema:\n[{0!r}].".format(ex)) -def get_job_submission_response(body, headers): - # type: (JSON, AnyHeadersContainer) -> Union[HTTPOk, HTTPCreated] - """ - Generates the successful response from contents returned by :term:`Job` submission process. - - If :term:`Job` already finished processing within requested ``Prefer: wait=X`` seconds delay (and if allowed by - the :term:`Process` ``jobControlOptions``), return the successful status immediately instead of created status. - - Otherwise, return the status monitoring location of the created :term:`Job` to be monitored asynchronously. - - .. seealso:: - :func:`weaver.processes.execution.submit_job` - :func:`weaver.processes.execution.submit_job_handler` - """ - status = map_status(body.get("status")) - location = get_header("location", headers) - if status in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]: - body["description"] = sd.CompletedJobResponse.description - body = sd.CompletedJobStatusSchema().deserialize(body) - return HTTPOk(location=location, json=body, headers=headers) - - body["description"] = sd.CreatedLaunchJobResponse.description - body = sd.CreatedJobStatusSchema().deserialize(body) - return HTTPCreated(location=location, json=body, headers=headers) - - def map_progress(progress, range_min, range_max): # type: (Number, Number, Number) -> Number """ diff --git a/weaver/store/base.py b/weaver/store/base.py index 5634172e2..c2cc1f423 100644 --- a/weaver/store/base.py +++ b/weaver/store/base.py @@ -9,6 +9,7 @@ from pywps import Process as ProcessWPS from weaver.datatype import Bill, Job, Process, Quote, Service, VaultFile + from weaver.execute import AnyExecuteResponse from weaver.typedefs import ( AnyUUID, ExecutionInputs, @@ -125,6 +126,7 @@ def save_job(self, is_workflow=False, # type: bool is_local=False, # type: bool execute_async=True, # type: bool + execute_response=None, # type: Optional[AnyExecuteResponse] custom_tags=None, # type: Optional[List[str]] user_id=None, # type: Optional[int] access=None, # type: Optional[str] diff --git a/weaver/store/mongodb.py b/weaver/store/mongodb.py index 052c41398..fe8e8dd29 100644 --- a/weaver/store/mongodb.py +++ b/weaver/store/mongodb.py @@ -52,6 +52,7 @@ from typing import Any, Callable, Dict, List, Optional, Tuple, Union from pymongo.collection import Collection + from weaver.execute import AnyExecuteResponse from weaver.processes.types import AnyProcessType from weaver.store.base import DatetimeIntervalType, JobGroupCategory, JobSearchResult from weaver.typedefs import AnyProcess, AnyProcessClass, AnyUUID, AnyValueType, ExecutionInputs, ExecutionOutputs @@ -577,6 +578,7 @@ def save_job(self, is_workflow=False, # type: bool is_local=False, # type: bool execute_async=True, # type: bool + execute_response=None, # type: Optional[AnyExecuteResponse] custom_tags=None, # type: Optional[List[str]] user_id=None, # type: Optional[int] access=None, # type: Optional[str] @@ -610,6 +612,7 @@ def save_job(self, "inputs": inputs, "status": map_status(Status.ACCEPTED), "execute_async": execute_async, + "execution_response": execute_response, "is_workflow": is_workflow, "is_local": is_local, "created": created if created else now(), diff --git a/weaver/typedefs.py b/weaver/typedefs.py index 49d25862e..5f3cf3421 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -42,7 +42,6 @@ from webtest.response import TestResponse from werkzeug.wrappers import Request as WerkzeugRequest - from weaver.execute import AnyExecuteTransmissionMode from weaver.processes.wps_process_base import WpsProcessInterface from weaver.datatype import Process from weaver.status import AnyStatusType @@ -302,6 +301,13 @@ def __call__(self, message: str, progress: Number, status: AnyStatusType, *args: ExecutionOutputsList = List[ExecutionOutputItem] ExecutionOutputsMap = Dict[str, ExecutionOutputObject] ExecutionOutputs = Union[ExecutionOutputsList, ExecutionOutputsMap] + ExecutionResultObject = TypedDict("ExecutionResultObject", { + "value": Optional[AnyValueType], + "href": Optional[str], + "type": Optional[str], + }, total=False) + ExecutionResultArray = List[ExecutionResultObject] + ExecutionResults = Dict[str, Union[ExecutionResultObject, ExecutionResultArray]] # reference employed as 'JobMonitorReference' by 'WPS1Process' JobExecution = TypedDict("JobExecution", {"execution": WPSExecution}) diff --git a/weaver/wps/service.py b/weaver/wps/service.py index cdcd81e1a..42b726820 100644 --- a/weaver/wps/service.py +++ b/weaver/wps/service.py @@ -21,7 +21,7 @@ from weaver.processes.convert import wps2json_job_payload from weaver.processes.execution import submit_job_handler from weaver.processes.types import ProcessType -from weaver.processes.utils import get_job_submission_response, get_process +from weaver.processes.utils import get_process from weaver.store.base import StoreProcesses from weaver.utils import get_header, get_registry, get_settings, get_weaver_url from weaver.visibility import Visibility @@ -34,6 +34,7 @@ load_pywps_config ) from weaver.wps_restapi import swagger_definitions as sd +from weaver.wps_restapi.jobs.utils import get_job_submission_response LOGGER = logging.getLogger(__name__) if TYPE_CHECKING: @@ -236,7 +237,7 @@ def _submit_job(self, wps_request): is_workflow = proc.type == ProcessType.WORKFLOW tags = req.args.get("tags", "").split(",") + ["xml", "wps-{}".format(wps_request.version)] data = wps2json_job_payload(wps_request, wps_process) - body, headers = submit_job_handler( + resp = submit_job_handler( data, self.settings, proc.processEndpointWPS1, process_id=pid, is_local=True, is_workflow=is_workflow, visibility=Visibility.PUBLIC, language=wps_request.language, tags=tags, headers=dict(req.headers), context=ctx @@ -249,11 +250,11 @@ def _submit_job(self, wps_request): # way to provide explicitly Accept header. Even our Wps1Process as Workflow step depends on this behaviour. accept_type = get_header("Accept", req.headers) if accept_type == ContentType.APP_JSON: - resp = get_job_submission_response(body, headers) + resp = get_job_submission_response(resp.body, resp.headers) setattr(resp, "_update_status", lambda *_, **__: None) # patch to avoid pywps server raising return resp - return body + return resp.body @handle_known_exceptions def prepare_process_for_execution(self, identifier): diff --git a/weaver/wps_restapi/api.py b/weaver/wps_restapi/api.py index 97db3c9cd..f06dfa35c 100644 --- a/weaver/wps_restapi/api.py +++ b/weaver/wps_restapi/api.py @@ -380,11 +380,12 @@ def api_conformance(request): # noqa: F811 ogcapi_proc_core + "/req/core/job-results-failed", ogcapi_proc_core + "/req/core/job-results", ogcapi_proc_core + "/req/core/job-results-async-document", + # FIXME: support raw multipart (https://github.com/crim-ca/weaver/issues/376) # ogcapi_proc_core + "/req/core/job-results-async-raw-mixed-multi", - # ogcapi_proc_core + "/req/core/job-results-async-raw-ref", + ogcapi_proc_core + "/req/core/job-results-async-raw-ref", # ogcapi_proc_core + "/req/core/job-results-async-raw-value-multi", - # ogcapi_proc_core + "/req/core/job-results-async-raw-value-one", - # ogcapi_proc_core + "/req/core/job-results-success-sync", + ogcapi_proc_core + "/req/core/job-results-async-raw-value-one", + ogcapi_proc_core + "/req/core/job-results-success-sync", ogcapi_proc_core + "/req/core/job-success", ogcapi_proc_core + "/req/core/landingpage-op", ogcapi_proc_core + "/req/core/landingpage-success", @@ -409,9 +410,10 @@ def api_conformance(request): # noqa: F811 ogcapi_proc_core + "/req/core/process-execute-success-async", ogcapi_proc_core + "/req/core/process-execute-sync-document", # ogcapi_proc_core + "/req/core/process-execute-sync-raw-mixed-multi", - # ogcapi_proc_core + "/req/core/process-execute-sync-raw-ref", + ogcapi_proc_core + "/req/core/process-execute-sync-raw-ref", + # FIXME: support raw multipart (https://github.com/crim-ca/weaver/issues/376) # ogcapi_proc_core + "/req/core/process-execute-sync-raw-value-multi", - # ogcapi_proc_core + "/req/core/process-execute-sync-raw-value-one", + ogcapi_proc_core + "/req/core/process-execute-sync-raw-value-one", ogcapi_proc_core + "/req/core/pl-limit-definition", ogcapi_proc_core + "/req/core/pl-limit-response", ogcapi_proc_core + "/req/core/process-list", diff --git a/weaver/wps_restapi/jobs/jobs.py b/weaver/wps_restapi/jobs/jobs.py index 7e7d87a21..9111a0825 100644 --- a/weaver/wps_restapi/jobs/jobs.py +++ b/weaver/wps_restapi/jobs/jobs.py @@ -1,434 +1,39 @@ -import math -import os -import shutil -from copy import deepcopy from typing import TYPE_CHECKING from celery.utils.log import get_task_logger from colander import Invalid -from pyramid.httpexceptions import ( - HTTPBadRequest, - HTTPNoContent, - HTTPNotFound, - HTTPOk, - HTTPPermanentRedirect, - HTTPUnauthorized, - HTTPUnprocessableEntity -) -from pyramid.request import Request -from pyramid_celery import celery_app +from pyramid.httpexceptions import HTTPBadRequest, HTTPOk, HTTPPermanentRedirect, HTTPUnprocessableEntity from notify import encrypt_email from weaver.database import get_db from weaver.datatype import Job -from weaver.exceptions import ( - InvalidIdentifierValue, - JobGone, - JobInvalidParameter, - JobNotFound, - ProcessNotAccessible, - ProcessNotFound, - ServiceNotAccessible, - ServiceNotFound, - log_unhandled_exceptions -) -from weaver.execute import ExecuteTransmissionMode -from weaver.formats import ContentType, OutputFormat, get_format, repr_json -from weaver.owsexceptions import OWSNoApplicableCode, OWSNotFound -from weaver.processes.convert import ( - any2wps_literal_datatype, - convert_input_values_schema, - convert_output_params_schema, - get_field -) -from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status -from weaver.store.base import StoreJobs, StoreProcesses, StoreServices -from weaver.utils import get_any_id, get_any_value, get_path_kvp, get_settings, get_weaver_url, is_uuid -from weaver.visibility import Visibility -from weaver.wps.utils import get_wps_output_dir, get_wps_output_url +from weaver.exceptions import JobNotFound, log_unhandled_exceptions +from weaver.formats import OutputFormat, repr_json +from weaver.processes.convert import convert_input_values_schema, convert_output_params_schema +from weaver.store.base import StoreJobs +from weaver.utils import get_settings from weaver.wps_restapi import swagger_definitions as sd -from weaver.wps_restapi.constants import JobInputsOutputsSchema -from weaver.wps_restapi.providers.utils import forbid_local_only +from weaver.wps_restapi.jobs.utils import ( + dismiss_job_task, + get_job, + get_job_list_links, + get_job_results_response, + get_results, + get_schema_query, + raise_job_bad_status, + raise_job_dismissed, + validate_service_process +) from weaver.wps_restapi.swagger_definitions import datetime_interval_parser if TYPE_CHECKING: - from typing import Dict, Iterable, List, Optional, Tuple, Union + from typing import Iterable, List - from pyramid.httpexceptions import HTTPException - - from weaver.typedefs import AnySettingsContainer, AnyUUID, AnyValueType, HeadersTupleType, JSON, SettingsType - from weaver.wps_restapi.constants import JobInputsOutputsSchemaType + from weaver.typedefs import JSON, AnyResponseType, PyramidRequest LOGGER = get_task_logger(__name__) -def get_job(request): - # type: (Request) -> Job - """ - Obtain a job from request parameters. - - :returns: Job information if found. - :raise HTTPNotFound: with JSON body details on missing/non-matching job, process, provider IDs. - """ - job_id = request.matchdict.get("job_id") - try: - if not is_uuid(job_id): - raise JobInvalidParameter - store = get_db(request).get_store(StoreJobs) - job = store.fetch_by_id(job_id) - except (JobInvalidParameter, JobNotFound) as exc: - exception = type(exc) - if exception is JobInvalidParameter: - desc = "Invalid job reference is not a valid UUID." - else: - desc = "Could not find job with specified reference." - title = "NoSuchJob" - raise exception( - # new format: https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_job-exception-no-such-job - json={ - "title": title, - "type": "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/no-such-job", - "detail": desc, - "status": exception.code, - "cause": str(job_id) - }, - code=title, locator="JobID", description=desc # old format - ) - - provider_id = request.matchdict.get("provider_id", job.service) - process_id = request.matchdict.get("process_id", job.process) - if provider_id: - forbid_local_only(request) - - if job.service != provider_id: - title = "NoSuchProvider" - desc = "Could not find job reference corresponding to specified provider reference." - raise OWSNotFound( - # new format: https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_job-exception-no-such-job - json={ - "title": title, - "type": "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/no-such-job", - "detail": desc, - "status": OWSNotFound.code, - "cause": str(process_id) - }, - code=title, locator="provider", description=desc # old format - ) - if job.process != process_id: - title = "NoSuchProcess" - desc = "Could not find job reference corresponding to specified process reference." - raise OWSNotFound( - # new format: https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_job-exception-no-such-job - # note: although 'no-such-process' error, return 'no-such-job' because process could exist, only mismatches - json={ - "title": title, - "type": "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/no-such-job", - "detail": desc, - "status": OWSNotFound.code, - "cause": str(process_id) - }, - code=title, locator="process", description=desc # old format - ) - return job - - -def get_job_list_links(job_total, filters, request): - # type: (int, Dict[str, AnyValueType], Request) -> List[JSON] - """ - Obtains a list of all relevant links for the corresponding job listing defined by query parameter filters. - - :raises IndexError: if the paging values are out of bounds compared to available total :term:`Job` matching search. - """ - base_url = get_weaver_url(request) - - # reapply queries that must be given to obtain the same result in case of subsequent requests (sort, limits, etc.) - kvp_params = {param: value for param, value in request.params.items() if param != "page"} - # patch datetime that have some extra character manipulation (reapply '+' auto-converted to ' ' by params parser) - if "datetime" in kvp_params: - kvp_params["datetime"] = kvp_params["datetime"].replace(" ", "+") - alt_kvp = deepcopy(kvp_params) - - # request job uses general endpoint, obtain the full path if any service/process was given as alternate location - if request.path.startswith(sd.jobs_service.path): - job_path = base_url + sd.jobs_service.path - alt_path = None - parent_url = None - # cannot generate full path apply for 'service' by itself - if filters["process"] and filters["service"]: - alt_path = base_url + sd.provider_jobs_service.path.format( - provider_id=filters["service"], process_id=filters["process"] - ) - parent_url = alt_path.rsplit("/", 1)[0] - elif filters["process"]: - alt_path = base_url + sd.process_jobs_service.path.format(process_id=filters["process"]) - parent_url = alt_path.rsplit("/", 1)[0] - for param in ["service", "provider", "process"]: - alt_kvp.pop(param, None) - # path is whichever specific service/process endpoint, jobs are pre-filtered by them - # transform sub-endpoints into matching query parameters and use generic path as alternate location - else: - job_path = base_url + request.path - alt_path = base_url + sd.jobs_service.path - alt_kvp["process"] = filters["process"] - if filters["service"]: - alt_kvp["provider"] = filters["service"] - parent_url = job_path.rsplit("/", 1)[0] - - cur_page = filters["page"] - per_page = filters["limit"] - max_page = max(math.ceil(job_total / per_page) - 1, 0) - if cur_page < 0 or cur_page > max_page: - raise IndexError(f"Page index {cur_page} is out of range from [0,{max_page}].") - - alt_links = [] - if alt_path: - alt_links = [{ - "href": get_path_kvp(alt_path, page=cur_page, **alt_kvp), "rel": "alternate", - "type": ContentType.APP_JSON, "title": "Alternate endpoint with equivalent set of filtered jobs." - }] - - links = alt_links + [ - {"href": job_path, "rel": "collection", - "type": ContentType.APP_JSON, "title": "Complete job listing (no filtering queries applied)."}, - {"href": base_url + sd.jobs_service.path, "rel": "search", - "type": ContentType.APP_JSON, "title": "Generic query endpoint to search for jobs."}, - {"href": job_path + "?detail=false", "rel": "preview", - "type": ContentType.APP_JSON, "title": "Job listing summary (UUID and count only)."}, - {"href": job_path, "rel": "http://www.opengis.net/def/rel/ogc/1.0/job-list", - "type": ContentType.APP_JSON, "title": "List of registered jobs."}, - {"href": get_path_kvp(job_path, page=cur_page, **kvp_params), "rel": "current", - "type": ContentType.APP_JSON, "title": "Current page of job query listing."}, - {"href": get_path_kvp(job_path, page=0, **kvp_params), "rel": "first", - "type": ContentType.APP_JSON, "title": "First page of job query listing."}, - {"href": get_path_kvp(job_path, page=max_page, **kvp_params), "rel": "last", - "type": ContentType.APP_JSON, "title": "Last page of job query listing."}, - ] - if cur_page > 0: - links.append({ - "href": get_path_kvp(job_path, page=cur_page - 1, **kvp_params), "rel": "prev", - "type": ContentType.APP_JSON, "title": "Previous page of job query listing." - }) - if cur_page < max_page: - links.append({ - "href": get_path_kvp(job_path, page=cur_page + 1, **kvp_params), "rel": "next", - "type": ContentType.APP_JSON, "title": "Next page of job query listing." - }) - if parent_url: - links.append({ - "href": parent_url, "rel": "up", - "type": ContentType.APP_JSON, "title": "Parent collection for which listed jobs apply." - }) - return links - - -def get_schema_query(schema, strict=True): - # type: (Optional[JobInputsOutputsSchemaType], bool) -> Optional[JobInputsOutputsSchemaType] - if not schema: - return None - # unescape query (eg: "OGC+strict" becomes "OGC string" from URL parsing) - schema_checked = str(schema).replace(" ", "+").lower() - if JobInputsOutputsSchema.get(schema_checked) is None: - raise HTTPBadRequest(json={ - "type": "InvalidParameterValue", - "detail": "Query parameter 'schema' value is invalid.", - "status": HTTPBadRequest.code, - "locator": "query", - "value": str(schema), - }) - if not strict: - return schema_checked.split("+")[0] - return schema_checked - - -def make_result_link(result_id, result, job_id, settings): - # type: (str, Union[JSON, List[JSON]], AnyUUID, SettingsType) -> List[str] - """ - Convert a result definition as ``value`` into the corresponding ``reference`` for output transmission. - - .. seealso:: - :rfc:`8288`: HTTP ``Link`` header specification. - """ - values = result if isinstance(result, list) else [result] - suffixes = list(f".{idx}" for idx in range(len(values))) if isinstance(result, list) else [""] - wps_url = get_wps_output_url(settings).strip("/") - links = [] - for suffix, value in zip(suffixes, values): - key = get_any_value(result, key=True) - if key != "href": - # literal data to be converted to link - # plain text file must be created containing the raw literal data - typ = ContentType.TEXT_PLAIN # as per '/rec/core/process-execute-sync-document-ref' - enc = "UTF-8" - out = get_wps_output_dir(settings) - val = get_any_value(value, data=True, file=False) - loc = os.path.join(job_id, result_id + suffix + ".txt") - url = f"{wps_url}/{loc}" - path = os.path.join(out, loc) - with open(path, mode="w", encoding=enc) as out_file: - out_file.write(val) - else: - fmt = get_field(result, "format", default={"mediaType": ContentType.TEXT_PLAIN}) - typ = get_field(fmt, "mime_type", search_variations=True, default=ContentType.TEXT_PLAIN) - enc = get_field(fmt, "encoding", search_variations=True, default=None) - url = get_any_value(value, data=False, file=True) # should already include full path - links.append(f"<{url}>; rel=\"{result_id}{suffix}\"; type={typ}; charset={enc}") - return links - - -def get_results(job, # type: Job - container, # type: AnySettingsContainer - value_key=None, # type: Optional[str] - schema=JobInputsOutputsSchema.OLD, # type: JobInputsOutputsSchemaType - link_references=False, # type: bool - ): # type: (...) -> Tuple[Union[List[JSON], JSON], HeadersTupleType] - """ - Obtains the job results with extended full WPS output URL as applicable and according to configuration settings. - - :param job: job from which to retrieve results. - :param container: any container giving access to instance settings (to resolve reference output location). - :param value_key: - If not specified, the returned values will have the appropriate ``data``/``href`` key according to the content. - Otherwise, all values will have the specified key. - :param schema: - Selects which schema to employ for representing the output results (listing or mapping). - :param link_references: - If enabled, an output that was requested by reference instead of value will be returned as ``Link`` reference. - :returns: - Tuple with: - - List or mapping of all outputs each with minimally an ID and value under the requested key. - - List of ``Link`` headers for reference outputs when requested. Empty otherwise. - """ - settings = get_settings(container) - wps_url = get_wps_output_url(settings) - if not wps_url.endswith("/"): - wps_url = wps_url + "/" - schema = JobInputsOutputsSchema.get(str(schema).lower(), default=JobInputsOutputsSchema.OLD) - strict = schema.endswith("+strict") - schema = schema.split("+")[0] - ogc_api = schema == JobInputsOutputsSchema.OGC - outputs = {} if ogc_api else [] - fmt_key = "mediaType" if ogc_api else "mimeType" - out_ref = convert_output_params_schema(job.outputs, JobInputsOutputsSchema.OGC) if link_references else {} - references = {} - for result in job.results: - rtype = "data" if any(k in result for k in ["data", "value"]) else "href" - value = get_any_value(result) - out_key = rtype - out_id = get_any_id(result) - out_mode = out_ref.get(out_id, {}).get("transmissionMode") - as_ref = link_references and out_mode == ExecuteTransmissionMode.REFERENCE - if rtype == "href": - # fix paths relative to instance endpoint, but leave explicit links as is (eg: S3 bucket, remote HTTP, etc.) - if value.startswith("/"): - value = str(value).lstrip("/") - if "://" not in value: - value = wps_url + value - elif ogc_api: - out_key = "value" - elif value_key: - out_key = value_key - output = {out_key: value} - if rtype == "href": # required for the rest to be there, other fields optional - if "mimeType" not in result: - result["mimeType"] = get_format(value, default=ContentType.TEXT_PLAIN).mime_type - if ogc_api or not strict: - output["type"] = result["mimeType"] - if not ogc_api or not strict or as_ref: - output["format"] = {fmt_key: result["mimeType"]} - for field in ["encoding", "schema"]: - if field in result: - output["format"][field] = result[field] - elif rtype != "href": - # literal data - # FIXME: BoundingBox not implemented (https://github.com/crim-ca/weaver/issues/51) - dtype = result.get("dataType", any2wps_literal_datatype(value, is_value=True) or "string") - if ogc_api: - output["dataType"] = {"name": dtype} - else: - output["dataType"] = dtype - - if ogc_api or as_ref: - mapping = references if as_ref else outputs - if out_id in mapping: - output_list = mapping[out_id] - if not isinstance(output_list, list): - output_list = [output_list] - output_list.append(output) - mapping[out_id] = output_list - else: - mapping[out_id] = output - else: - # if ordered insert supported by python version, insert ID first - output = dict([("id", out_id)] + list(output.items())) # noqa - outputs.append(output) - - # needed to collect and aggregate outputs of same ID first in case of array - # convert any requested link references using indices if needed - headers = [] - for out_id, output in references.items(): - res_links = make_result_link(out_id, output, job.id, settings) - headers.extend([("Link", link) for link in res_links]) - - return outputs, headers - - -def validate_service_process(request): - # type: (Request) -> Tuple[Optional[str], Optional[str]] - """ - Verifies that service or process specified by path or query will raise the appropriate error if applicable. - """ - service_name = ( - request.matchdict.get("provider_id", None) or - request.params.get("provider", None) or - request.params.get("service", None) # backward compatibility - ) - process_name = ( - request.matchdict.get("process_id", None) or - request.params.get("process", None) or - request.params.get("processID", None) # OGC-API conformance - ) - item_test = None - item_type = None - - try: - service = None - if service_name: - forbid_local_only(request) - item_type = "Service" - item_test = service_name - store = get_db(request).get_store(StoreServices) - service = store.fetch_by_name(service_name, visibility=Visibility.PUBLIC) - if process_name: - item_type = "Process" - item_test = process_name - # local process - if not service: - store = get_db(request).get_store(StoreProcesses) - store.fetch_by_id(process_name, visibility=Visibility.PUBLIC) - # remote process - else: - processes = service.processes(request) - if process_name not in [p.id for p in processes]: - raise ProcessNotFound - except (ServiceNotFound, ProcessNotFound): - raise HTTPNotFound(json={ - "code": "NoSuch{}".format(item_type), - "description": "{} of id '{}' cannot be found.".format(item_type, item_test) - }) - except (ServiceNotAccessible, ProcessNotAccessible): - raise HTTPUnauthorized(json={ - "code": "Unauthorized{}".format(item_type), - "description": "{} of id '{}' is not accessible.".format(item_type, item_test) - }) - except InvalidIdentifierValue as ex: - raise HTTPBadRequest(json={ - "code": InvalidIdentifierValue.__name__, - "description": str(ex) - }) - - return service_name, process_name - - @sd.provider_jobs_service.get(tags=[sd.TAG_JOBS, sd.TAG_PROVIDERS], renderer=OutputFormat.JSON, schema=sd.GetProviderJobsEndpoint(), response_schemas=sd.get_prov_all_jobs_responses) @sd.process_jobs_service.get(tags=[sd.TAG_PROCESSES, sd.TAG_JOBS], renderer=OutputFormat.JSON, @@ -437,7 +42,7 @@ def validate_service_process(request): schema=sd.GetJobsEndpoint(), response_schemas=sd.get_all_jobs_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_queried_jobs(request): - # type: (Request) -> HTTPOk + # type: (PyramidRequest) -> HTTPOk """ Retrieve the list of jobs which can be filtered, sorted, paged and categorized using query parameters. """ @@ -526,7 +131,7 @@ def _job_list(jobs): # type: (Iterable[Job]) -> List[JSON] schema=sd.JobEndpoint(), response_schemas=sd.get_single_job_status_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_job_status(request): - # type: (Request) -> HTTPOk + # type: (PyramidRequest) -> HTTPOk """ Retrieve the status of a job. """ @@ -535,133 +140,6 @@ def get_job_status(request): return HTTPOk(json=job_status) -def raise_job_bad_status(job, container=None): - # type: (Job, Optional[AnySettingsContainer]) -> None - """ - Raise the appropriate message for :term:`Job` not ready or unable to retrieve output results due to status. - """ - if job.status != Status.SUCCEEDED: - links = job.links(container=container) - if job.status == Status.FAILED: - err_code = None - err_info = None - err_known_modules = [ - "pywps.exceptions", - "owslib.wps", - "weaver.exceptions", - "weaver.owsexceptions", - ] - # try to infer the cause, fallback to generic error otherwise - for error in job.exceptions: - try: - if isinstance(error, dict): - err_code = error.get("Code") - err_info = error.get("Text") - elif isinstance(error, str) and any(error.startswith(mod) for mod in err_known_modules): - err_code, err_info = error.split(":", 1) - err_code = err_code.split(".")[-1].strip() - err_info = err_info.strip() - except Exception: - err_code = None - if err_code: - break - if not err_code: # default - err_code = OWSNoApplicableCode.code - err_info = "unknown" - # /req/core/job-results-failed - raise HTTPBadRequest(json={ - "title": "JobResultsFailed", - "type": err_code, - "detail": "Job results not available because execution failed.", - "status": HTTPBadRequest.code, - "cause": err_info, - "links": links - }) - - # /req/core/job-results-exception/results-not-ready - raise HTTPBadRequest(json={ - "title": "JobResultsNotReady", - "type": "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/result-not-ready", - "detail": "Job is not ready to obtain results.", - "status": HTTPBadRequest.code, - "cause": {"status": job.status}, - "links": links - }) - - -def raise_job_dismissed(job, container=None): - # type: (Job, Optional[AnySettingsContainer]) -> None - """ - Raise the appropriate messages for dismissed :term:`Job` status. - """ - if job.status == Status.DISMISSED: - # provide the job status links since it is still available for reference - settings = get_settings(container) - job_links = job.links(settings) - job_links = [link for link in job_links if link["rel"] in ["status", "alternate", "collection", "up"]] - raise JobGone( - json={ - "title": "JobDismissed", - "type": "JobDismissed", - "status": JobGone.code, - "detail": "Job was dismissed and artifacts have been removed.", - "cause": {"status": job.status}, - "value": str(job.id), - "links": job_links - } - ) - - -def dismiss_job_task(job, container): - # type: (Job, AnySettingsContainer) -> Job - """ - Cancels any pending or running :mod:`Celery` task and removes completed job artifacts. - - .. note:: - The :term:`Job` object itself is not deleted, only its artifacts. - Therefore, its inputs, outputs, logs, exceptions, etc. are still available in the database, - but corresponding files that would be exposed by ``weaver.wps_output`` configurations are removed. - - :param job: Job to cancel or cleanup. - :param container: Application settings. - :return: Updated and dismissed job. - """ - raise_job_dismissed(job, container) - if job.status in JOB_STATUS_CATEGORIES[StatusCategory.RUNNING]: - # signal to stop celery task. Up to it to terminate remote if any. - LOGGER.debug("Job [%s] dismiss operation: Canceling task [%s]", job.id, job.task_id) - celery_app.control.revoke(job.task_id, terminate=True) - - wps_out_dir = get_wps_output_dir(container) - job_out_dir = os.path.join(wps_out_dir, str(job.id)) - job_out_log = os.path.join(wps_out_dir, str(job.id) + ".log") - job_out_xml = os.path.join(wps_out_dir, str(job.id) + ".xml") - if os.path.isdir(job_out_dir): - LOGGER.debug("Job [%s] dismiss operation: Removing output results.", job.id) - shutil.rmtree(job_out_dir, onerror=lambda func, path, _exc: LOGGER.warning( - "Job [%s] dismiss operation: Failed to delete [%s] due to [%s]", job.id, job_out_dir, _exc - )) - if os.path.isfile(job_out_log): - LOGGER.debug("Job [%s] dismiss operation: Removing output logs.", job.id) - try: - os.remove(job_out_log) - except OSError as exc: - LOGGER.warning("Job [%s] dismiss operation: Failed to delete [%s] due to [%s]", job.id, job_out_log, exc) - if os.path.isfile(job_out_xml): - LOGGER.debug("Job [%s] dismiss operation: Removing output WPS status.", job.id) - try: - os.remove(job_out_xml) - except OSError as exc: - LOGGER.warning("Job [%s] dismiss operation: Failed to delete [%s] due to [%s]", job.id, job_out_xml, exc) - - LOGGER.debug("Job [%s] dismiss operation: Updating job status.") - store = get_db(container).get_store(StoreJobs) - job.status_message = "Job {}.".format(Status.DISMISSED) - job.status = map_status(Status.DISMISSED) - job = store.update_job(job) - return job - - @sd.provider_job_service.delete(tags=[sd.TAG_JOBS, sd.TAG_DISMISS, sd.TAG_PROVIDERS], renderer=OutputFormat.JSON, schema=sd.ProviderJobEndpoint(), response_schemas=sd.delete_prov_job_responses) @sd.process_job_service.delete(tags=[sd.TAG_JOBS, sd.TAG_DISMISS, sd.TAG_PROCESSES], renderer=OutputFormat.JSON, @@ -670,6 +148,7 @@ def dismiss_job_task(job, container): schema=sd.JobEndpoint(), response_schemas=sd.delete_job_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def cancel_job(request): + # type: (PyramidRequest) -> AnyResponseType """ Dismiss a planned or running job execution, or remove result artifacts of a completed job. @@ -695,6 +174,7 @@ def cancel_job(request): schema=sd.DeleteJobsEndpoint(), response_schemas=sd.delete_jobs_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def cancel_job_batch(request): + # type: (PyramidRequest) -> AnyResponseType """ Dismiss operation for multiple jobs. @@ -736,7 +216,7 @@ def cancel_job_batch(request): schema=sd.JobInputsEndpoint(), response_schemas=sd.get_job_inputs_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_job_inputs(request): - # type: (Request) -> HTTPException + # type: (PyramidRequest) -> AnyResponseType """ Retrieve the inputs values and outputs definitions of a job. """ @@ -761,7 +241,7 @@ def get_job_inputs(request): schema=sd.JobOutputsEndpoint(), response_schemas=sd.get_job_outputs_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_job_outputs(request): - # type: (Request) -> HTTPException + # type: (PyramidRequest) -> AnyResponseType """ Retrieve the output values resulting from a job execution. """ @@ -784,30 +264,13 @@ def get_job_outputs(request): schema=sd.JobResultsEndpoint(), response_schemas=sd.get_job_results_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_job_results(request): - # type: (Request) -> HTTPException + # type: (PyramidRequest) -> AnyResponseType """ Retrieve the results of a job. """ job = get_job(request) - raise_job_dismissed(job, request) - raise_job_bad_status(job, request) - job_status = map_status(job.status) - if job_status in JOB_STATUS_CATEGORIES[StatusCategory.RUNNING]: - raise HTTPNotFound(json={ - "code": "ResultsNotReady", - "description": "Job status is '{}'. Results are not yet available.".format(job_status) - }) - - results, refs = get_results(job, request, value_key="value", - schema=JobInputsOutputsSchema.OGC, link_references=True) - # note: - # Cannot add "links" field in response body because variable Output ID keys are directly at the root - # Possible conflict with an output that would be named "links". - - if results: # avoid error if all by reference - results = sd.Result().deserialize(results) - HTTPOk(json=results, headers=refs) - return HTTPNoContent(headers=refs) + resp = get_job_results_response(job, request) + return resp @sd.provider_exceptions_service.get(tags=[sd.TAG_JOBS, sd.TAG_EXCEPTIONS, sd.TAG_PROVIDERS], @@ -819,6 +282,7 @@ def get_job_results(request): schema=sd.ProcessExceptionsEndpoint(), response_schemas=sd.get_exceptions_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_job_exceptions(request): + # type: (PyramidRequest) -> AnyResponseType """ Retrieve the exceptions of a job. """ @@ -836,6 +300,7 @@ def get_job_exceptions(request): schema=sd.ProcessLogsEndpoint(), response_schemas=sd.get_logs_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_job_logs(request): + # type: (PyramidRequest) -> AnyResponseType """ Retrieve the logs of a job. """ @@ -856,6 +321,7 @@ def get_job_logs(request): response_schemas=sd.get_result_redirect_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def redirect_job_result(request): + # type: (PyramidRequest) -> AnyResponseType """ Deprecated job result endpoint that is now returned by corresponding outputs path with added links. """ diff --git a/weaver/wps_restapi/jobs/utils.py b/weaver/wps_restapi/jobs/utils.py new file mode 100644 index 000000000..eb56cec8f --- /dev/null +++ b/weaver/wps_restapi/jobs/utils.py @@ -0,0 +1,696 @@ +import math +import os +import shutil +from copy import deepcopy +from typing import TYPE_CHECKING + +from celery.utils.log import get_task_logger +from pyramid.httpexceptions import ( + HTTPBadRequest, + HTTPCreated, + HTTPNoContent, + HTTPNotFound, + HTTPNotImplemented, + HTTPOk, + HTTPUnauthorized +) +from pyramid.response import FileResponse +from pyramid_celery import celery_app + +from weaver.database import get_db +from weaver.datatype import Job +from weaver.exceptions import ( + InvalidIdentifierValue, + JobGone, + JobInvalidParameter, + JobNotFound, + ProcessNotAccessible, + ProcessNotFound, + ServiceNotAccessible, + ServiceNotFound +) +from weaver.execute import ExecuteResponse, ExecuteTransmissionMode +from weaver.formats import ContentType, get_format +from weaver.owsexceptions import OWSNoApplicableCode, OWSNotFound +from weaver.processes.convert import any2wps_literal_datatype, convert_output_params_schema, get_field +from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status +from weaver.store.base import StoreJobs, StoreProcesses, StoreServices +from weaver.utils import ( + get_any_id, + get_any_value, + get_file_headers, + get_header, + get_path_kvp, + get_settings, + get_weaver_url, + is_uuid +) +from weaver.visibility import Visibility +from weaver.wps.utils import get_wps_output_dir, get_wps_output_url, map_wps_output_location +from weaver.wps_restapi import swagger_definitions as sd +from weaver.wps_restapi.constants import JobInputsOutputsSchema +from weaver.wps_restapi.providers.utils import forbid_local_only + +if TYPE_CHECKING: + from typing import Dict, List, Optional, Tuple, Union + + from weaver.typedefs import ( + AnyHeadersContainer, + AnyRequestType, + AnyResponseType, + AnySettingsContainer, + AnyUUID, + AnyValueType, + ExecutionResultArray, + ExecutionResultObject, + ExecutionResults, + HeadersTupleType, + JSON, + PyramidRequest, + SettingsType + ) + from weaver.wps_restapi.constants import JobInputsOutputsSchemaType + +LOGGER = get_task_logger(__name__) + + +def get_job(request): + # type: (PyramidRequest) -> Job + """ + Obtain a job from request parameters. + + :returns: Job information if found. + :raise HTTPNotFound: with JSON body details on missing/non-matching job, process, provider IDs. + """ + job_id = request.matchdict.get("job_id") + try: + if not is_uuid(job_id): + raise JobInvalidParameter + store = get_db(request).get_store(StoreJobs) + job = store.fetch_by_id(job_id) + except (JobInvalidParameter, JobNotFound) as exc: + exception = type(exc) + if exception is JobInvalidParameter: + desc = "Invalid job reference is not a valid UUID." + else: + desc = "Could not find job with specified reference." + title = "NoSuchJob" + raise exception( + # new format: https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_job-exception-no-such-job + json={ + "title": title, + "type": "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/no-such-job", + "detail": desc, + "status": exception.code, + "cause": str(job_id) + }, + code=title, locator="JobID", description=desc # old format + ) + + provider_id = request.matchdict.get("provider_id", job.service) + process_id = request.matchdict.get("process_id", job.process) + if provider_id: + forbid_local_only(request) + + if job.service != provider_id: + title = "NoSuchProvider" + desc = "Could not find job reference corresponding to specified provider reference." + raise OWSNotFound( + # new format: https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_job-exception-no-such-job + json={ + "title": title, + "type": "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/no-such-job", + "detail": desc, + "status": OWSNotFound.code, + "cause": str(process_id) + }, + code=title, locator="provider", description=desc # old format + ) + if job.process != process_id: + title = "NoSuchProcess" + desc = "Could not find job reference corresponding to specified process reference." + raise OWSNotFound( + # new format: https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_job-exception-no-such-job + # note: although 'no-such-process' error, return 'no-such-job' because process could exist, only mismatches + json={ + "title": title, + "type": "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/no-such-job", + "detail": desc, + "status": OWSNotFound.code, + "cause": str(process_id) + }, + code=title, locator="process", description=desc # old format + ) + return job + + +def get_job_list_links(job_total, filters, request): + # type: (int, Dict[str, AnyValueType], AnyRequestType) -> List[JSON] + """ + Obtains a list of all relevant links for the corresponding job listing defined by query parameter filters. + + :raises IndexError: if the paging values are out of bounds compared to available total :term:`Job` matching search. + """ + base_url = get_weaver_url(request) + + # reapply queries that must be given to obtain the same result in case of subsequent requests (sort, limits, etc.) + kvp_params = {param: value for param, value in request.params.items() if param != "page"} + # patch datetime that have some extra character manipulation (reapply '+' auto-converted to ' ' by params parser) + if "datetime" in kvp_params: + kvp_params["datetime"] = kvp_params["datetime"].replace(" ", "+") + alt_kvp = deepcopy(kvp_params) + + # request job uses general endpoint, obtain the full path if any service/process was given as alternate location + if request.path.startswith(sd.jobs_service.path): + job_path = base_url + sd.jobs_service.path + alt_path = None + parent_url = None + # cannot generate full path apply for 'service' by itself + if filters["process"] and filters["service"]: + alt_path = base_url + sd.provider_jobs_service.path.format( + provider_id=filters["service"], process_id=filters["process"] + ) + parent_url = alt_path.rsplit("/", 1)[0] + elif filters["process"]: + alt_path = base_url + sd.process_jobs_service.path.format(process_id=filters["process"]) + parent_url = alt_path.rsplit("/", 1)[0] + for param in ["service", "provider", "process"]: + alt_kvp.pop(param, None) + # path is whichever specific service/process endpoint, jobs are pre-filtered by them + # transform sub-endpoints into matching query parameters and use generic path as alternate location + else: + job_path = base_url + request.path + alt_path = base_url + sd.jobs_service.path + alt_kvp["process"] = filters["process"] + if filters["service"]: + alt_kvp["provider"] = filters["service"] + parent_url = job_path.rsplit("/", 1)[0] + + cur_page = filters["page"] + per_page = filters["limit"] + max_page = max(math.ceil(job_total / per_page) - 1, 0) + if cur_page < 0 or cur_page > max_page: + raise IndexError(f"Page index {cur_page} is out of range from [0,{max_page}].") + + alt_links = [] + if alt_path: + alt_links = [{ + "href": get_path_kvp(alt_path, page=cur_page, **alt_kvp), "rel": "alternate", + "type": ContentType.APP_JSON, "title": "Alternate endpoint with equivalent set of filtered jobs." + }] + + links = alt_links + [ + {"href": job_path, "rel": "collection", + "type": ContentType.APP_JSON, "title": "Complete job listing (no filtering queries applied)."}, + {"href": base_url + sd.jobs_service.path, "rel": "search", + "type": ContentType.APP_JSON, "title": "Generic query endpoint to search for jobs."}, + {"href": job_path + "?detail=false", "rel": "preview", + "type": ContentType.APP_JSON, "title": "Job listing summary (UUID and count only)."}, + {"href": job_path, "rel": "http://www.opengis.net/def/rel/ogc/1.0/job-list", + "type": ContentType.APP_JSON, "title": "List of registered jobs."}, + {"href": get_path_kvp(job_path, page=cur_page, **kvp_params), "rel": "current", + "type": ContentType.APP_JSON, "title": "Current page of job query listing."}, + {"href": get_path_kvp(job_path, page=0, **kvp_params), "rel": "first", + "type": ContentType.APP_JSON, "title": "First page of job query listing."}, + {"href": get_path_kvp(job_path, page=max_page, **kvp_params), "rel": "last", + "type": ContentType.APP_JSON, "title": "Last page of job query listing."}, + ] + if cur_page > 0: + links.append({ + "href": get_path_kvp(job_path, page=cur_page - 1, **kvp_params), "rel": "prev", + "type": ContentType.APP_JSON, "title": "Previous page of job query listing." + }) + if cur_page < max_page: + links.append({ + "href": get_path_kvp(job_path, page=cur_page + 1, **kvp_params), "rel": "next", + "type": ContentType.APP_JSON, "title": "Next page of job query listing." + }) + if parent_url: + links.append({ + "href": parent_url, "rel": "up", + "type": ContentType.APP_JSON, "title": "Parent collection for which listed jobs apply." + }) + return links + + +def get_schema_query(schema, strict=True): + # type: (Optional[JobInputsOutputsSchemaType], bool) -> Optional[JobInputsOutputsSchemaType] + if not schema: + return None + # unescape query (eg: "OGC+strict" becomes "OGC string" from URL parsing) + schema_checked = str(schema).replace(" ", "+").lower() + if JobInputsOutputsSchema.get(schema_checked) is None: + raise HTTPBadRequest(json={ + "type": "InvalidParameterValue", + "detail": "Query parameter 'schema' value is invalid.", + "status": HTTPBadRequest.code, + "locator": "query", + "value": str(schema), + }) + if not strict: + return schema_checked.split("+")[0] + return schema_checked + + +def make_result_link(result_id, result, job_id, settings): + # type: (str, Union[ExecutionResultObject, ExecutionResultArray], AnyUUID, SettingsType) -> List[str] + """ + Convert a result definition as ``value`` into the corresponding ``reference`` for output transmission. + + .. seealso:: + :rfc:`8288`: HTTP ``Link`` header specification. + """ + values = result if isinstance(result, list) else [result] + suffixes = list(f".{idx}" for idx in range(len(values))) if isinstance(result, list) else [""] + wps_url = get_wps_output_url(settings).strip("/") + links = [] + for suffix, value in zip(suffixes, values): + key = get_any_value(result, key=True) + if key != "href": + # literal data to be converted to link + # plain text file must be created containing the raw literal data + typ = ContentType.TEXT_PLAIN # as per '/rec/core/process-execute-sync-document-ref' + enc = "UTF-8" + out = get_wps_output_dir(settings) + val = get_any_value(value, data=True, file=False) + loc = os.path.join(job_id, result_id + suffix + ".txt") + url = f"{wps_url}/{loc}" + path = os.path.join(out, loc) + with open(path, mode="w", encoding=enc) as out_file: + out_file.write(val) + else: + fmt = get_field(result, "format", default={"mediaType": ContentType.TEXT_PLAIN}) + typ = get_field(fmt, "mime_type", search_variations=True, default=ContentType.TEXT_PLAIN) + enc = get_field(fmt, "encoding", search_variations=True, default=None) + url = get_any_value(value, data=False, file=True) # should already include full path + links.append(f"<{url}>; rel=\"{result_id}{suffix}\"; type={typ}; charset={enc}") + return links + + +def get_results(job, # type: Job + container, # type: AnySettingsContainer + value_key=None, # type: Optional[str] + schema=JobInputsOutputsSchema.OLD, # type: JobInputsOutputsSchemaType + link_references=False, # type: bool + ): # type: (...) -> Tuple[ExecutionResults, HeadersTupleType] + """ + Obtains the job results with extended full WPS output URL as applicable and according to configuration settings. + + :param job: job from which to retrieve results. + :param container: any container giving access to instance settings (to resolve reference output location). + :param value_key: + If not specified, the returned values will have the appropriate ``data``/``href`` key according to the content. + Otherwise, all values will have the specified key. + :param schema: + Selects which schema to employ for representing the output results (listing or mapping). + :param link_references: + If enabled, an output that was requested by reference instead of value will be returned as ``Link`` reference. + :returns: + Tuple with: + - List or mapping of all outputs each with minimally an ID and value under the requested key. + - List of ``Link`` headers for reference outputs when requested. Empty otherwise. + """ + settings = get_settings(container) + wps_url = get_wps_output_url(settings) + if not wps_url.endswith("/"): + wps_url = wps_url + "/" + schema = JobInputsOutputsSchema.get(str(schema).lower(), default=JobInputsOutputsSchema.OLD) + strict = schema.endswith("+strict") + schema = schema.split("+")[0] + ogc_api = schema == JobInputsOutputsSchema.OGC + outputs = {} if ogc_api else [] + fmt_key = "mediaType" if ogc_api else "mimeType" + out_ref = convert_output_params_schema(job.outputs, JobInputsOutputsSchema.OGC) if link_references else {} + references = {} + for result in job.results: + rtype = "data" if any(k in result for k in ["data", "value"]) else "href" + value = get_any_value(result) + out_key = rtype + out_id = get_any_id(result) + out_mode = out_ref.get(out_id, {}).get("transmissionMode") + as_ref = link_references and out_mode == ExecuteTransmissionMode.REFERENCE + if rtype == "href": + # fix paths relative to instance endpoint, but leave explicit links as is (eg: S3 bucket, remote HTTP, etc.) + if value.startswith("/"): + value = str(value).lstrip("/") + if "://" not in value: + value = wps_url + value + elif ogc_api: + out_key = "value" + elif value_key: + out_key = value_key + output = {out_key: value} + if rtype == "href": # required for the rest to be there, other fields optional + if "mimeType" not in result: + result["mimeType"] = get_format(value, default=ContentType.TEXT_PLAIN).mime_type + if ogc_api or not strict: + output["type"] = result["mimeType"] + if not ogc_api or not strict or as_ref: + output["format"] = {fmt_key: result["mimeType"]} + for field in ["encoding", "schema"]: + if field in result: + output["format"][field] = result[field] + elif rtype != "href": + # literal data + # FIXME: BoundingBox not implemented (https://github.com/crim-ca/weaver/issues/51) + dtype = result.get("dataType", any2wps_literal_datatype(value, is_value=True) or "string") + if ogc_api: + output["dataType"] = {"name": dtype} + else: + output["dataType"] = dtype + + if ogc_api or as_ref: + mapping = references if as_ref else outputs + if out_id in mapping: + output_list = mapping[out_id] + if not isinstance(output_list, list): + output_list = [output_list] + output_list.append(output) + mapping[out_id] = output_list + else: + mapping[out_id] = output + else: + # if ordered insert supported by python version, insert ID first + output = dict([("id", out_id)] + list(output.items())) # noqa + outputs.append(output) + + # needed to collect and aggregate outputs of same ID first in case of array + # convert any requested link references using indices if needed + headers = [] + for out_id, output in references.items(): + res_links = make_result_link(out_id, output, job.id, settings) + headers.extend([("Link", link) for link in res_links]) + + return outputs, headers + + +def get_job_results_response(job, container, headers=None): + # type: (Job, AnySettingsContainer, Optional[AnyHeadersContainer]) -> AnyResponseType + """ + Generates the :term:`OGC` compliant :term:`Job` results response according to submitted execution parameters. + + Parameters that impact the format of the response are: + - Amount of outputs to be returned. + - Parameter ``response: raw|document`` + - Parameter ``transmissionMode: value|reference`` per output if ``response: raw``. + + .. seealso:: + More details available for each combination: + - https://docs.ogc.org/is/18-062r2/18-062r2.html#sc_execute_response + - https://docs.ogc.org/is/18-062r2/18-062r2.html#_response_7 + + :param job: Job for which to generate the results response. + :param container: Application settings. + :param headers: Additional headers to provide in the response. + """ + raise_job_dismissed(job, container) + raise_job_bad_status(job, container) + job_status = map_status(job.status) + if job_status in JOB_STATUS_CATEGORIES[StatusCategory.RUNNING]: + raise HTTPNotFound(json={ + "code": "ResultsNotReady", + "title": "JobResultsNotReady", + "type": "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/result-not-ready", + "detail": "Job is not ready to obtain results.", + "status": HTTPNotFound.code, + "cause": {"status": job.status}, + }) + + # Document ignores values/references + # See: + # - https://docs.ogc.org/is/18-062r2/18-062r2.html#_response_7 (/req/core/job-results-async-document) + # - https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-document + is_raw = job.execution_response == ExecuteResponse.RAW + results, refs = get_results(job, container, value_key="value", + schema=JobInputsOutputsSchema.OGC, + link_references=is_raw) # type: Union[ExecutionResults, HeadersTupleType] + headers = headers or {} + if "location" not in headers: + headers["Location"] = job.status_url(container) + + if not is_raw: + # note: + # Cannot add "links" field in response body because variable Output ID keys are directly at the root + # Possible conflict with an output that would be named "links". + results = sd.Result().deserialize(results) + return HTTPOk(json=results, headers=headers) + + if not results: # avoid schema validation error if all by reference + # Status code 204 for empty body + # see: + # - https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-raw-ref + refs.extend(headers.items()) + return HTTPNoContent(headers=refs) + + # raw response can be only data value, only link or a mix of them + if results: + # https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-raw-value-one + out_info = list(results.items())[0][-1] + out_type = get_any_value(out_info, key=True) + out_data = get_any_value(out_info) + + # FIXME: https://github.com/crim-ca/weaver/issues/376 + # implement multipart, both for multi-output IDs and array-output under same ID + if len(results) > 1 or (isinstance(out_data, list) and len(out_data) > 1): + # https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-raw-value-multi + raise HTTPNotImplemented(json={ + "code": "NotImplemented", + "type": "NotImplemented", + "detail": "Multipart results with 'transmissionMode=value' and 'response=raw' not implemented.", + }) + + # single value only + out_data = out_data[0] if isinstance(out_data, list) else out_data + if out_type == "href": + out_path = map_wps_output_location(out_data, container, exists=True, url=False) + out_type = out_info.get("type") # noqa + out_headers = get_file_headers(out_path, download_headers=True, content_headers=True, content_type=out_type) + resp = FileResponse(out_path) + resp.headers.update(out_headers) + resp.headers.update(headers) + else: + resp = HTTPOk(body=out_data, charset="UTF-8", content_type=ContentType.TEXT_PLAIN, headers=headers) + else: + resp = HTTPOk(headers=headers) + if refs: + # https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-raw-ref + # https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-raw-mixed-multi + resp.headerlist.extend(refs) + return resp + + +def get_job_submission_response(body, headers, error=False): + # type: (JSON, AnyHeadersContainer, bool) -> Union[HTTPOk, HTTPCreated] + """ + Generates the successful response from contents returned by :term:`Job` submission process. + + If :term:`Job` already finished processing within requested ``Prefer: wait=X`` seconds delay (and if allowed by + the :term:`Process` ``jobControlOptions``), return the successful status immediately instead of created status. + + Otherwise, return the status monitoring location of the created :term:`Job` to be monitored asynchronously. + + .. seealso:: + :func:`weaver.processes.execution.submit_job` + :func:`weaver.processes.execution.submit_job_handler` + """ + status = map_status(body.get("status")) + location = get_header("location", headers) + if status in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]: + if error: + http_class = HTTPBadRequest + http_desc = sd.FailedSyncJobResponse.description + else: + http_class = HTTPOk + http_desc = sd.CompletedJobResponse.description + body = sd.CompletedJobStatusSchema().deserialize(body) + + body["description"] = http_desc + return http_class(location=location, json=body, headers=headers) + + body["description"] = sd.CreatedLaunchJobResponse.description + body = sd.CreatedJobStatusSchema().deserialize(body) + return HTTPCreated(location=location, json=body, headers=headers) + + +def validate_service_process(request): + # type: (PyramidRequest) -> Tuple[Optional[str], Optional[str]] + """ + Verifies that service or process specified by path or query will raise the appropriate error if applicable. + """ + service_name = ( + request.matchdict.get("provider_id", None) or + request.params.get("provider", None) or + request.params.get("service", None) # backward compatibility + ) + process_name = ( + request.matchdict.get("process_id", None) or + request.params.get("process", None) or + request.params.get("processID", None) # OGC-API conformance + ) + item_test = None + item_type = None + + try: + service = None + if service_name: + forbid_local_only(request) + item_type = "Service" + item_test = service_name + store = get_db(request).get_store(StoreServices) + service = store.fetch_by_name(service_name, visibility=Visibility.PUBLIC) + if process_name: + item_type = "Process" + item_test = process_name + # local process + if not service: + store = get_db(request).get_store(StoreProcesses) + store.fetch_by_id(process_name, visibility=Visibility.PUBLIC) + # remote process + else: + processes = service.processes(request) + if process_name not in [p.id for p in processes]: + raise ProcessNotFound + except (ServiceNotFound, ProcessNotFound): + raise HTTPNotFound(json={ + "code": "NoSuch{}".format(item_type), + "description": "{} of id '{}' cannot be found.".format(item_type, item_test) + }) + except (ServiceNotAccessible, ProcessNotAccessible): + raise HTTPUnauthorized(json={ + "code": "Unauthorized{}".format(item_type), + "description": "{} of id '{}' is not accessible.".format(item_type, item_test) + }) + except InvalidIdentifierValue as ex: + raise HTTPBadRequest(json={ + "code": InvalidIdentifierValue.__name__, + "description": str(ex) + }) + + return service_name, process_name + + +def raise_job_bad_status(job, container=None): + # type: (Job, Optional[AnySettingsContainer]) -> None + """ + Raise the appropriate message for :term:`Job` not ready or unable to retrieve output results due to status. + """ + if job.status != Status.SUCCEEDED: + links = job.links(container=container) + if job.status == Status.FAILED: + err_code = None + err_info = None + err_known_modules = [ + "pywps.exceptions", + "owslib.wps", + "weaver.exceptions", + "weaver.owsexceptions", + ] + # try to infer the cause, fallback to generic error otherwise + for error in job.exceptions: + try: + if isinstance(error, dict): + err_code = error.get("Code") + err_info = error.get("Text") + elif isinstance(error, str) and any(error.startswith(mod) for mod in err_known_modules): + err_code, err_info = error.split(":", 1) + err_code = err_code.split(".")[-1].strip() + err_info = err_info.strip() + except Exception: + err_code = None + if err_code: + break + if not err_code: # default + err_code = OWSNoApplicableCode.code + err_info = "unknown" + # /req/core/job-results-failed + raise HTTPBadRequest(json={ + "title": "JobResultsFailed", + "type": err_code, + "detail": "Job results not available because execution failed.", + "status": HTTPBadRequest.code, + "cause": err_info, + "links": links + }) + + # /req/core/job-results-exception/results-not-ready + raise HTTPNotFound(json={ + "title": "JobResultsNotReady", + "type": "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/result-not-ready", + "detail": "Job is not ready to obtain results.", + "status": HTTPNotFound.code, + "cause": {"status": job.status}, + "links": links + }) + + +def raise_job_dismissed(job, container=None): + # type: (Job, Optional[AnySettingsContainer]) -> None + """ + Raise the appropriate messages for dismissed :term:`Job` status. + """ + if job.status == Status.DISMISSED: + # provide the job status links since it is still available for reference + settings = get_settings(container) + job_links = job.links(settings) + job_links = [link for link in job_links if link["rel"] in ["status", "alternate", "collection", "up"]] + raise JobGone( + json={ + "title": "JobDismissed", + "type": "JobDismissed", + "status": JobGone.code, + "detail": "Job was dismissed and artifacts have been removed.", + "cause": {"status": job.status}, + "value": str(job.id), + "links": job_links + } + ) + + +def dismiss_job_task(job, container): + # type: (Job, AnySettingsContainer) -> Job + """ + Cancels any pending or running :mod:`Celery` task and removes completed job artifacts. + + .. note:: + The :term:`Job` object itself is not deleted, only its artifacts. + Therefore, its inputs, outputs, logs, exceptions, etc. are still available in the database, + but corresponding files that would be exposed by ``weaver.wps_output`` configurations are removed. + + :param job: Job to cancel or cleanup. + :param container: Application settings. + :return: Updated and dismissed job. + """ + raise_job_dismissed(job, container) + if job.status in JOB_STATUS_CATEGORIES[StatusCategory.RUNNING]: + # signal to stop celery task. Up to it to terminate remote if any. + LOGGER.debug("Job [%s] dismiss operation: Canceling task [%s]", job.id, job.task_id) + celery_app.control.revoke(job.task_id, terminate=True) + + wps_out_dir = get_wps_output_dir(container) + job_out_dir = os.path.join(wps_out_dir, str(job.id)) + job_out_log = os.path.join(wps_out_dir, str(job.id) + ".log") + job_out_xml = os.path.join(wps_out_dir, str(job.id) + ".xml") + if os.path.isdir(job_out_dir): + LOGGER.debug("Job [%s] dismiss operation: Removing output results.", job.id) + shutil.rmtree(job_out_dir, onerror=lambda func, path, _exc: LOGGER.warning( + "Job [%s] dismiss operation: Failed to delete [%s] due to [%s]", job.id, job_out_dir, _exc + )) + if os.path.isfile(job_out_log): + LOGGER.debug("Job [%s] dismiss operation: Removing output logs.", job.id) + try: + os.remove(job_out_log) + except OSError as exc: + LOGGER.warning("Job [%s] dismiss operation: Failed to delete [%s] due to [%s]", job.id, job_out_log, exc) + if os.path.isfile(job_out_xml): + LOGGER.debug("Job [%s] dismiss operation: Removing output WPS status.", job.id) + try: + os.remove(job_out_xml) + except OSError as exc: + LOGGER.warning("Job [%s] dismiss operation: Failed to delete [%s] due to [%s]", job.id, job_out_xml, exc) + + LOGGER.debug("Job [%s] dismiss operation: Updating job status.") + store = get_db(container).get_store(StoreJobs) + job.status_message = "Job {}.".format(Status.DISMISSED) + job.status = map_status(Status.DISMISSED) + job = store.update_job(job) + return job diff --git a/weaver/wps_restapi/processes/processes.py b/weaver/wps_restapi/processes/processes.py index bdaec0e14..0f4659c43 100644 --- a/weaver/wps_restapi/processes/processes.py +++ b/weaver/wps_restapi/processes/processes.py @@ -18,7 +18,7 @@ from weaver.formats import OutputFormat, repr_json from weaver.processes import opensearch from weaver.processes.execution import submit_job -from weaver.processes.utils import deploy_process_from_payload, get_job_submission_response, get_process +from weaver.processes.utils import deploy_process_from_payload, get_process from weaver.status import Status from weaver.store.base import StoreJobs, StoreProcesses from weaver.utils import fully_qualified_name, get_any_id @@ -274,5 +274,4 @@ def submit_local_job(request): Execution location and method is according to deployed Application Package. """ process = get_process(request=request) - body, headers = submit_job(request, process, tags=["wps-rest"]) - return get_job_submission_response(body, headers) + return submit_job(request, process, tags=["wps-rest"]) diff --git a/weaver/wps_restapi/providers/providers.py b/weaver/wps_restapi/providers/providers.py index 256524967..ef8579a80 100644 --- a/weaver/wps_restapi/providers/providers.py +++ b/weaver/wps_restapi/providers/providers.py @@ -17,8 +17,6 @@ from weaver.exceptions import ServiceNotFound, ServiceParsingError, log_unhandled_exceptions from weaver.formats import OutputFormat from weaver.owsexceptions import OWSMissingParameterValue, OWSNotImplemented -from weaver.processes.execution import submit_job -from weaver.processes.utils import get_job_submission_response from weaver.store.base import StoreServices from weaver.utils import get_any_id, get_settings from weaver.wps.utils import get_wps_client @@ -212,8 +210,9 @@ def submit_provider_job(request): """ Execute a remote provider process. """ + from weaver.processes.execution import submit_job # isort:skip # noqa: E402 # pylint: disable=C0413 + store = get_db(request).get_store(StoreServices) provider_id = request.matchdict.get("provider_id") service = store.fetch_by_name(provider_id) - body, headers = submit_job(request, service, tags=["wps-rest"]) - return get_job_submission_response(body, headers) + return submit_job(request, service, tags=["wps-rest"]) diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index 3c9e98792..4152f7372 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -3065,7 +3065,8 @@ class Execute(ExecuteInputOutputs): "Desired execution mode specified directly. This is intended for backward compatibility support. " "To obtain more control over execution mode selection, employ the official Prefer header instead " "(see for more details: https://pavics-weaver.readthedocs.io/en/latest/processes.html#execution-mode)." - ) + ), + validator=OneOf(ExecuteMode.values()) ) response = JobResponseOptionsEnum( missing=drop, @@ -3073,7 +3074,8 @@ class Execute(ExecuteInputOutputs): description=( "Indicates the desired representation format of the response. " "(see for more details: https://pavics-weaver.readthedocs.io/en/latest/processes.html#execution-body)." - ) + ), + validator=OneOf(ExecuteResponse.values()) ) notification_email = ExtendedSchemaNode( String(), @@ -4389,6 +4391,10 @@ class CompletedJobResponse(ExtendedMappingSchema): body = CompletedJobStatusSchema() +class FailedSyncJobResponse(CompletedJobResponse): + description = "Job submitted and failed synchronous execution. See server logs for more details." + + class OkDeleteProcessJobResponse(ExtendedMappingSchema): header = ResponseHeaders() body = DismissedJobSchema() @@ -4802,12 +4808,16 @@ class GoneVaultFileDownloadResponse(ExtendedMappingSchema): post_provider_process_job_responses = { "200": CompletedJobResponse(description="success"), "201": CreatedLaunchJobResponse(description="success"), + "204": NoContentJobResultsResponse(description="success"), + "400": FailedSyncJobResponse(), "403": ForbiddenProviderAccessResponseSchema(), "500": InternalServerErrorResponseSchema(), } post_process_jobs_responses = { "200": CompletedJobResponse(description="success"), "201": CreatedLaunchJobResponse(description="success"), + "204": NoContentJobResultsResponse(description="success"), + "400": FailedSyncJobResponse(), "403": ForbiddenProviderAccessResponseSchema(), "500": InternalServerErrorResponseSchema(), }