Skip to content

Commit

Permalink
adjust sync result returned directly + support response=raw for singl…
Browse files Browse the repository at this point in the history
…e outputs or multi-by-ref (relates to #376)
  • Loading branch information
fmigneault committed Mar 23, 2022
1 parent c81a99c commit fd0e83e
Show file tree
Hide file tree
Showing 17 changed files with 1,070 additions and 731 deletions.
4 changes: 4 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ Changes:
requested this way (resolves `#377 <https://github.com/crim-ca/weaver/issues/377>`_).
- Updated every ``Process`` to report that they support ``outputTransmission`` both as ``reference`` and ``value``,
since handling of results is accomplished by `Weaver` itself, regardless of the application being executed.
- Add partial support of ``response=raw`` parameter for execution request submission in order to handle results to
be returned accordingly to specified ``outputTransmission`` by ``reference`` or ``value``.
Multipart contents for multi-output results are not yet supported
(relates to `#376 <https://github.com/crim-ca/weaver/issues/376>`_).
- Add `CLI` option ``-R/--ref/--reference`` for ``execute`` operation allowing to request corresponding ``outputs``
by ID to be returned using the ``transmissionMode: reference`` method, producing HTTP ``Link`` headers for those
entries rather than inserting values in the response content body.
Expand Down
296 changes: 224 additions & 72 deletions tests/functional/test_builtin.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion tests/functional/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def check_job_status(_resp, running=False):
if return_status or expect_failed:
return resp.json
resp = self.app.get("{}/results".format(status_url), headers=self.json_headers)
assert resp.status_code == 200, "Error job info:\n{}".format(resp.json)
assert resp.status_code == 200, "Error job info:\n{}".format(resp.text)
return resp.json

def get_outputs(self, status_url):
Expand Down
46 changes: 42 additions & 4 deletions weaver/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from weaver import xml_util
from weaver.exceptions import ProcessInstanceError, ServiceParsingError
from weaver.execute import ExecuteControlOption, ExecuteMode, ExecuteTransmissionMode
from weaver.execute import ExecuteControlOption, ExecuteMode, ExecuteResponse, ExecuteTransmissionMode
from weaver.formats import AcceptLanguage, ContentType, repr_json
from weaver.processes.constants import ProcessSchema
from weaver.processes.convert import get_field, null, ows2json, wps2json_io
Expand Down Expand Up @@ -58,7 +58,7 @@

from owslib.wps import WebProcessingService

from weaver.execute import AnyExecuteControlOption, AnyExecuteTransmissionMode
from weaver.execute import AnyExecuteControlOption, AnyExecuteMode, AnyExecuteResponse, AnyExecuteTransmissionMode
from weaver.processes.constants import ProcessSchemaType
from weaver.processes.types import AnyProcessType
from weaver.quotation.status import AnyQuoteStatus
Expand Down Expand Up @@ -837,6 +837,21 @@ def status_location(self, location_url):
raise TypeError(f"Type 'str' is required for '{self.__name__}.status_location'")
self["status_location"] = location_url

def status_url(self, container=None):
# type: (Optional[AnySettingsContainer]) -> str
"""
Obtain the resolved endpoint where the :term:`Job` status information can be obtained.
"""
settings = get_settings(container)
location_base = "/providers/{provider_id}".format(provider_id=self.service) if self.service else ""
location_url = "{base_url}{location_base}/processes/{process_id}/jobs/{job_id}".format(
base_url=get_wps_restapi_base_url(settings),
location_base=location_base,
process_id=self.process,
job_id=self.id
)
return location_url

@property
def notification_email(self):
# type: () -> Optional[str]
Expand Down Expand Up @@ -873,18 +888,39 @@ def execute_sync(self):

@property
def execution_mode(self):
# type: () -> ExecuteMode
# type: () -> AnyExecuteMode
return ExecuteMode.get(self.get("execution_mode"), ExecuteMode.ASYNC)

@execution_mode.setter
def execution_mode(self, mode):
# type: (Union[ExecuteMode, str]) -> None
# type: (Union[AnyExecuteMode, str]) -> None
exec_mode = ExecuteMode.get(mode)
if exec_mode not in ExecuteMode:
modes = list(ExecuteMode.values())
raise ValueError(f"Invalid value for '{self.__name__}.execution_mode'. Must be one of {modes}")
self["execution_mode"] = mode

@property
def execution_response(self):
# type: () -> AnyExecuteResponse
out = self.setdefault("execution_response", ExecuteResponse.DOCUMENT)
if out not in ExecuteResponse.values():
out = ExecuteResponse.DOCUMENT
self["execution_response"] = out
return out

@execution_response.setter
def execution_response(self, response):
# type: (Optional[Union[AnyExecuteResponse, str]]) -> None
if response is None:
exec_resp = ExecuteResponse.DOCUMENT
else:
exec_resp = ExecuteResponse.get(response)
if exec_resp not in ExecuteResponse:
resp = list(ExecuteResponse.values())
raise ValueError(f"Invalid value for '{self.__name__}.execution_response'. Must be one of {resp}")
self["execution_response"] = exec_resp

@property
def is_local(self):
# type: () -> bool
Expand Down Expand Up @@ -1216,10 +1252,12 @@ def params(self):
"service": self.service,
"process": self.process,
"inputs": self.inputs,
"outputs": self.outputs,
"user_id": self.user_id,
"status": self.status,
"status_message": self.status_message,
"status_location": self.status_location,
"execution_response": self.execution_response,
"execution_mode": self.execution_mode,
"is_workflow": self.is_workflow,
"created": self.created,
Expand Down
11 changes: 11 additions & 0 deletions weaver/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

from weaver.base import Constants

if TYPE_CHECKING:
from typing import List


class ExecuteMode(Constants):
AUTO = "auto"
Expand All @@ -13,6 +16,14 @@ class ExecuteControlOption(Constants):
ASYNC = "async-execute"
SYNC = "sync-execute"

@classmethod
def values(cls):
# type: () -> List[AnyExecuteControlOption]
"""
Return default control options in specific order according to preferred modes for execution by `Weaver`.
"""
return [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC]


class ExecuteResponse(Constants):
RAW = "raw"
Expand Down
64 changes: 20 additions & 44 deletions weaver/processes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from celery.exceptions import TimeoutError as CeleryTaskTimeoutError
from owslib.util import clean_ows_url
from owslib.wps import ComplexDataInput
from pyramid.httpexceptions import HTTPBadRequest, HTTPNotAcceptable, HTTPNotImplemented
from pyramid.httpexceptions import HTTPBadRequest, HTTPNotAcceptable
from pyramid_celery import celery_app as app

from weaver.database import get_db
from weaver.datatype import Process, Service
from weaver.execute import ExecuteControlOption, ExecuteMode, ExecuteResponse, ExecuteTransmissionMode
from weaver.execute import ExecuteControlOption, ExecuteMode
from weaver.formats import AcceptLanguage, ContentType
from weaver.notify import encrypt_email, notify_job_complete
from weaver.owsexceptions import OWSNoApplicableCode
Expand Down Expand Up @@ -46,7 +46,7 @@
load_pywps_config
)
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.utils import get_wps_restapi_base_url
from weaver.wps_restapi.jobs.utils import get_job_results_response, get_job_submission_response

LOGGER = logging.getLogger(__name__)
if TYPE_CHECKING:
Expand All @@ -60,7 +60,7 @@
from weaver.datatype import Job
from weaver.processes.convert import OWS_Input_Type, ProcessOWS
from weaver.status import StatusType
from weaver.typedefs import CeleryResult, HeadersType, HeaderCookiesType, JSON, SettingsType
from weaver.typedefs import AnyResponseType, CeleryResult, HeadersType, HeaderCookiesType, JSON, SettingsType
from weaver.visibility import AnyVisibility


Expand Down Expand Up @@ -442,7 +442,7 @@ def map_locations(job, settings):


def submit_job(request, reference, tags=None):
# type: (Request, Union[Service, Process], Optional[List[str]]) -> Tuple[JSON, HeadersType]
# type: (Request, Union[Service, Process], Optional[List[str]]) -> AnyResponseType
"""
Generates the job submission from details retrieved in the request.
Expand Down Expand Up @@ -498,28 +498,6 @@ def submit_job(request, reference, tags=None):
visibility, language=lang, headers=headers, tags=tags, user=user, context=context)


def _validate_job_parameters(json_body):
# type: (JSON) -> None
"""
Tests supported parameters not automatically validated by colander deserialize since they are optional.
"""
exec_mode = json_body.get("mode")
if exec_mode not in [None, ExecuteMode.ASYNC, ExecuteMode.AUTO]:
raise HTTPNotImplemented(detail=f"Execution mode '{exec_mode}' not supported.")

resp_mode = json_body.get("response")
if resp_mode not in [None, ExecuteResponse.DOCUMENT]:
raise HTTPNotImplemented(detail=f"Execution response type '{resp_mode}' not supported.")

outputs = json_body.get("outputs", [])
if isinstance(outputs, dict):
outputs = [dict(id=out, **keys) for out, keys in outputs.items()]
for job_output in outputs:
mode = job_output["transmissionMode"]
if mode not in ExecuteTransmissionMode.values():
raise HTTPNotImplemented(detail=f"Execute transmissionMode '{mode}' not supported.")


def submit_job_handler(payload, # type: JSON
settings, # type: SettingsType
service_url, # type: str
Expand All @@ -533,7 +511,7 @@ def submit_job_handler(payload, # type: JSON
tags=None, # type: Optional[List[str]]
user=None, # type: Optional[int]
context=None, # type: Optional[str]
): # type: (...) -> Tuple[JSON, HeadersType]
): # type: (...) -> AnyResponseType
"""
Submits the job to the Celery worker with provided parameters.
Expand All @@ -544,11 +522,6 @@ def submit_job_handler(payload, # type: JSON
except colander.Invalid as ex:
raise HTTPBadRequest("Invalid schema: [{}]".format(str(ex)))

# TODO: remove when all parameter variations are supported
# FIXME:
# - support 'response: raw' (https://github.com/crim-ca/weaver/issues/376)
# - allow omitting 'outputs' (https://github.com/crim-ca/weaver/issues/375)
_validate_job_parameters(json_body)
db = get_db(settings)
headers = headers or {}
if is_local:
Expand All @@ -566,24 +539,20 @@ def submit_job_handler(payload, # type: JSON
# as per https://datatracker.ietf.org/doc/html/rfc7240#section-2
# Prefer header not resolve as valid still proces
is_execute_async = mode != ExecuteMode.SYNC
exec_resp = json_body.get("response")

notification_email = json_body.get("notification_email")
encrypted_email = encrypt_email(notification_email, settings) if notification_email else None

store = db.get_store(StoreJobs) # type: StoreJobs
job = store.save_job(task_id=Status.ACCEPTED, process=process_id, service=provider_id,
inputs=json_body.get("inputs"), is_local=is_local, is_workflow=is_workflow,
access=visibility, user_id=user, execute_async=is_execute_async, custom_tags=tags,
notification_email=encrypted_email, accept_language=language, context=context)
access=visibility, user_id=user, context=context,
execute_async=is_execute_async, execute_response=exec_resp,
custom_tags=tags, notification_email=encrypted_email, accept_language=language)
job.save_log(logger=LOGGER, message="Job task submitted for execution.", status=Status.ACCEPTED, progress=0)
job = store.update_job(job)
location_base = "/providers/{provider_id}".format(provider_id=provider_id) if provider_id else ""
location_url = "{base_url}{location_base}/processes/{process_id}/jobs/{job_id}".format(
base_url=get_wps_restapi_base_url(settings),
location_base=location_base,
process_id=process_id,
job_id=job.id
)
location_url = job.status_url(settings)
resp_headers = {"Location": location_url}
resp_headers.update(applied)

Expand All @@ -598,9 +567,15 @@ def submit_job_handler(payload, # type: JSON
pass
if result.ready():
job = store.fetch_by_id(job.id)
# when sync is successful, it must return the results direct instead of status info
# see: https://docs.ogc.org/is/18-062r2/18-062r2.html#sc_execute_response
if job.status == Status.SUCCEEDED:
return get_job_results_response(job, settings, headers=resp_headers)
# otherwise return the error status
body = job.json(container=settings, self_link="status")
body["location"] = location_url
return body, resp_headers
resp = get_job_submission_response(body, resp_headers, error=True)
return resp
else:
LOGGER.debug("Celery task requested as sync took too long to complete (wait=%ss). Continue in async.", wait)
# sync not respected, therefore must drop it
Expand All @@ -618,4 +593,5 @@ def submit_job_handler(payload, # type: JSON
"status": map_status(Status.ACCEPTED),
"location": location_url
}
return body, resp_headers
resp = get_job_submission_response(body, resp_headers)
return resp
26 changes: 0 additions & 26 deletions weaver/processes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,32 +103,6 @@ def get_process(process_id=None, request=None, settings=None, store=None):
raise HTTPBadRequest("Invalid schema:\n[{0!r}].".format(ex))


def get_job_submission_response(body, headers):
# type: (JSON, AnyHeadersContainer) -> Union[HTTPOk, HTTPCreated]
"""
Generates the successful response from contents returned by :term:`Job` submission process.
If :term:`Job` already finished processing within requested ``Prefer: wait=X`` seconds delay (and if allowed by
the :term:`Process` ``jobControlOptions``), return the successful status immediately instead of created status.
Otherwise, return the status monitoring location of the created :term:`Job` to be monitored asynchronously.
.. seealso::
:func:`weaver.processes.execution.submit_job`
:func:`weaver.processes.execution.submit_job_handler`
"""
status = map_status(body.get("status"))
location = get_header("location", headers)
if status in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]:
body["description"] = sd.CompletedJobResponse.description
body = sd.CompletedJobStatusSchema().deserialize(body)
return HTTPOk(location=location, json=body, headers=headers)

body["description"] = sd.CreatedLaunchJobResponse.description
body = sd.CreatedJobStatusSchema().deserialize(body)
return HTTPCreated(location=location, json=body, headers=headers)


def map_progress(progress, range_min, range_max):
# type: (Number, Number, Number) -> Number
"""
Expand Down
2 changes: 2 additions & 0 deletions weaver/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pywps import Process as ProcessWPS

from weaver.datatype import Bill, Job, Process, Quote, Service, VaultFile
from weaver.execute import AnyExecuteResponse
from weaver.typedefs import (
AnyUUID,
ExecutionInputs,
Expand Down Expand Up @@ -125,6 +126,7 @@ def save_job(self,
is_workflow=False, # type: bool
is_local=False, # type: bool
execute_async=True, # type: bool
execute_response=None, # type: Optional[AnyExecuteResponse]
custom_tags=None, # type: Optional[List[str]]
user_id=None, # type: Optional[int]
access=None, # type: Optional[str]
Expand Down
3 changes: 3 additions & 0 deletions weaver/store/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from pymongo.collection import Collection

from weaver.execute import AnyExecuteResponse
from weaver.processes.types import AnyProcessType
from weaver.store.base import DatetimeIntervalType, JobGroupCategory, JobSearchResult
from weaver.typedefs import AnyProcess, AnyProcessClass, AnyUUID, AnyValueType, ExecutionInputs, ExecutionOutputs
Expand Down Expand Up @@ -577,6 +578,7 @@ def save_job(self,
is_workflow=False, # type: bool
is_local=False, # type: bool
execute_async=True, # type: bool
execute_response=None, # type: Optional[AnyExecuteResponse]
custom_tags=None, # type: Optional[List[str]]
user_id=None, # type: Optional[int]
access=None, # type: Optional[str]
Expand Down Expand Up @@ -610,6 +612,7 @@ def save_job(self,
"inputs": inputs,
"status": map_status(Status.ACCEPTED),
"execute_async": execute_async,
"execution_response": execute_response,
"is_workflow": is_workflow,
"is_local": is_local,
"created": created if created else now(),
Expand Down
8 changes: 7 additions & 1 deletion weaver/typedefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
from webtest.response import TestResponse
from werkzeug.wrappers import Request as WerkzeugRequest

from weaver.execute import AnyExecuteTransmissionMode
from weaver.processes.wps_process_base import WpsProcessInterface
from weaver.datatype import Process
from weaver.status import AnyStatusType
Expand Down Expand Up @@ -302,6 +301,13 @@ def __call__(self, message: str, progress: Number, status: AnyStatusType, *args:
ExecutionOutputsList = List[ExecutionOutputItem]
ExecutionOutputsMap = Dict[str, ExecutionOutputObject]
ExecutionOutputs = Union[ExecutionOutputsList, ExecutionOutputsMap]
ExecutionResultObject = TypedDict("ExecutionResultObject", {
"value": Optional[AnyValueType],
"href": Optional[str],
"type": Optional[str],
}, total=False)
ExecutionResultArray = List[ExecutionResultObject]
ExecutionResults = Dict[str, Union[ExecutionResultObject, ExecutionResultArray]]

# reference employed as 'JobMonitorReference' by 'WPS1Process'
JobExecution = TypedDict("JobExecution", {"execution": WPSExecution})
Expand Down
Loading

0 comments on commit fd0e83e

Please sign in to comment.