diff --git a/CHANGES.rst b/CHANGES.rst index 281639ae9..dda197b28 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,7 +12,8 @@ Changes Changes: -------- -- No change. +- Add support of *OGC API - Processes - Part 4: Job Management* endpoints for `Job` creation and execution + (fixes `#716 `_). Fixes: ------ diff --git a/tests/test_utils.py b/tests/test_utils.py index 9d086a845..9a92c40d8 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -111,6 +111,8 @@ AWS_S3_REGION_SUBSET_WITH_MOCK = {MOCK_AWS_REGION} | AWS_S3_REGION_SUBSET AWS_S3_REGION_NON_DEFAULT = list(AWS_S3_REGION_SUBSET_WITH_MOCK - {MOCK_AWS_REGION})[0] +KNOWN_STATUSES = set(Status.values()) - {Status.UNKNOWN} + # pylint: disable=R1732,W1514 # not using with open + encoding @@ -391,32 +393,24 @@ def test_pass_http_error_raises_other_error_with_multi_pyramid_error(): pass_http_error(ex, [HTTPConflict, HTTPInternalServerError]) -def get_status_variations(status_value): - return [status_value.lower(), - status_value.upper(), - status_value.capitalize(), - f"Process{status_value.capitalize()}"] - - -def test_map_status_ogc_compliant(): - known_statuses = set(Status.values()) - {Status.UNKNOWN} - for sv in known_statuses: - for s in get_status_variations(sv): - assert map_status(s, StatusCompliant.OGC) in JOB_STATUS_CATEGORIES[StatusCompliant.OGC] - - -def test_map_status_pywps_compliant(): - known_statuses = set(Status.values()) - {Status.UNKNOWN} - for sv in known_statuses: - for s in get_status_variations(sv): - assert map_status(s, StatusCompliant.PYWPS) in JOB_STATUS_CATEGORIES[StatusCompliant.PYWPS] - - -def test_map_status_owslib_compliant(): - known_statuses = set(Status.values()) - {Status.UNKNOWN} - for sv in known_statuses: - for s in get_status_variations(sv): - assert map_status(s, StatusCompliant.OWSLIB) in JOB_STATUS_CATEGORIES[StatusCompliant.OWSLIB] +@pytest.mark.parametrize( + ["compliance", "status"], + itertools.product( + list(StatusCompliant), + itertools.chain.from_iterable( + [ + status.lower(), + status.upper(), + status.capitalize(), + f"Process{status.capitalize()}" + ] + for status in KNOWN_STATUSES + ) + ) +) +def test_map_status_compliant(compliance, status): + # type: (StatusCompliant, str) -> None + assert map_status(status, compliance) in JOB_STATUS_CATEGORIES[compliance] def test_map_status_back_compatibility_and_special_cases(): diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index e63f9bd20..c3cd767e8 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -12,6 +12,7 @@ from owslib.wps import BoundingBoxDataInput, ComplexDataInput from pyramid.httpexceptions import HTTPBadRequest, HTTPNotAcceptable from pyramid_celery import celery_app as app +from werkzeug.wrappers.request import Request as WerkzeugRequest from weaver.database import get_db from weaver.datatype import Process, Service @@ -34,10 +35,12 @@ from weaver.utils import ( apply_number_with_unit, as_int, + extend_instance, fully_qualified_name, get_any_id, get_any_value, get_header, + get_path_kvp, get_registry, get_settings, now, @@ -48,6 +51,7 @@ wait_secs ) from weaver.visibility import Visibility +from weaver.wps.service import get_pywps_service from weaver.wps.utils import ( check_wps_status, get_wps_client, @@ -56,6 +60,7 @@ get_wps_output_dir, get_wps_output_path, get_wps_output_url, + get_wps_path, load_pywps_config ) from weaver.wps_restapi import swagger_definitions as sd @@ -79,6 +84,7 @@ AnyProcessRef, AnyResponseType, AnyServiceRef, + AnyViewResponse, AnyValueType, CeleryResult, HeaderCookiesType, @@ -116,8 +122,6 @@ def execute_process(task, job_id, wps_url, headers=None): """ Celery task that executes the WPS process job monitoring as status updates (local and remote). """ - from weaver.wps.service import get_pywps_service - LOGGER.debug("Job execute process called.") task_process = get_celery_process() @@ -660,8 +664,30 @@ def map_locations(job, settings): os.symlink(wps_ref, job_ref) -def submit_job(request, reference, tags=None): - # type: (Request, Union[Service, Process], Optional[List[str]]) -> AnyResponseType +def submit_job_dispatch_wps(request, process): + # type: (Request, Process) -> AnyViewResponse + """ + Dispatch a :term:`XML` request to the relevant :term:`Process` handler using the :term:`WPS` endpoint. + + Sends the :term:`XML` request to the :term:`WPS` endpoint which knows how to parse it properly. + Execution will end up in the same :func:`submit_job_handler` function as for :term:`OGC API` :term:`JSON` execution. + + .. warning:: + The function assumes that :term:`XML` was pre-validated as present in the :paramref:`request`. + """ + service = get_pywps_service() + wps_params = {"version": "1.0.0", "request": "Execute", "service": "WPS", "identifier": process.id} + request.path_info = get_wps_path(request) + request.query_string = get_path_kvp("", **wps_params)[1:] + location = request.application_url + request.path_info + request.query_string + LOGGER.warning("Route redirection [%s] -> [%s] for WPS-XML support.", request.url, location) + http_request = extend_instance(request, WerkzeugRequest) + http_request.shallow = False + return service.call(http_request) + + +def submit_job(request, reference, tags=None, process_id=None): + # type: (Request, Union[Service, Process], Optional[List[str]], Optional[str]) -> AnyResponseType """ Generates the job submission from details retrieved in the request. @@ -683,13 +709,13 @@ def submit_job(request, reference, tags=None): # validate context if needed later on by the job for early failure context = get_wps_output_context(request) - provider_id = None # None OK if local - process_id = None # None OK if remote, but can be found as well if available from WPS-REST path # noqa + prov_id = None # None OK if local + proc_id = None # None OK if remote, but can be found as well if available from WPS-REST path # noqa tags = tags or [] lang = request.accept_language.header_value # can only preemptively check if local process if isinstance(reference, Process): service_url = reference.processEndpointWPS1 - process_id = reference.identifier # explicit 'id:version' process revision if available, otherwise simply 'id' + proc_id = reference.identifier # explicit 'id:version' process revision if available, otherwise simply 'id' visibility = reference.visibility is_workflow = reference.type == ProcessType.WORKFLOW is_local = True @@ -702,8 +728,8 @@ def submit_job(request, reference, tags=None): lang = matched_lang elif isinstance(reference, Service): service_url = reference.url - provider_id = reference.id - process_id = resolve_process_tag(request) + prov_id = reference.id + proc_id = process_id or resolve_process_tag(request) visibility = Visibility.PUBLIC is_workflow = False is_local = False @@ -716,7 +742,7 @@ def submit_job(request, reference, tags=None): user = request.authenticated_userid # FIXME: consider other methods to provide the user headers = dict(request.headers) settings = get_settings(request) - return submit_job_handler(json_body, settings, service_url, provider_id, process_id, is_workflow, is_local, + return submit_job_handler(json_body, settings, service_url, prov_id, proc_id, is_workflow, is_local, visibility, language=lang, headers=headers, tags=tags, user=user, context=context) @@ -787,28 +813,38 @@ def submit_job_handler(payload, # type: ProcessExecution exec_resp = json_body.get("response") subscribers = map_job_subscribers(json_body, settings) + job_pending_created = payload.get("status") == "create" + if job_pending_created: + job_status = Status.CREATED + job_message = "Job created with pending trigger." + else: + job_status = Status.ACCEPTED + job_message = "Job task submitted for execution." + store = db.get_store(StoreJobs) # type: StoreJobs - job = store.save_job(task_id=Status.ACCEPTED, process=process, service=provider_id, + job = store.save_job(task_id=job_status, process=process, service=provider_id, inputs=json_body.get("inputs"), outputs=json_body.get("outputs"), is_local=is_local, is_workflow=is_workflow, access=visibility, user_id=user, context=context, execute_async=is_execute_async, execute_response=exec_resp, custom_tags=tags, accept_language=language, subscribers=subscribers) - job.save_log(logger=LOGGER, message="Job task submitted for execution.", status=Status.ACCEPTED, progress=0) + job.save_log(logger=LOGGER, message=job_message, status=job_status, progress=0) job = store.update_job(job) location_url = job.status_url(settings) resp_headers = {"Location": location_url} resp_headers.update(applied) - wps_url = clean_ows_url(service_url) - result = execute_process.delay(job_id=job.id, wps_url=wps_url, headers=headers) # type: CeleryResult - LOGGER.debug("Celery pending task [%s] for job [%s].", result.id, job.id) - if not is_execute_async: + task_result = None # type: Optional[CeleryResult] + if not job_pending_created: + wps_url = clean_ows_url(service_url) + task_result = execute_process.delay(job_id=job.id, wps_url=wps_url, headers=headers) + LOGGER.debug("Celery pending task [%s] for job [%s].", task_result.id, job.id) + if not job_pending_created and not is_execute_async: LOGGER.debug("Celery task requested as sync if it completes before (wait=%ss)", wait) try: - result.wait(timeout=wait) + task_result.wait(timeout=wait) except CeleryTaskTimeoutError: pass - if result.ready(): + if task_result.ready(): job = store.fetch_by_id(job.id) # when sync is successful, it must return the results direct instead of status info # see: https://docs.ogc.org/is/18-062r2/18-062r2.html#sc_execute_response @@ -833,8 +869,8 @@ def submit_job_handler(payload, # type: ProcessExecution "jobID": job.id, "processID": job.process, "providerID": provider_id, # dropped by validator if not applicable - "status": map_status(Status.ACCEPTED), - "location": location_url + "status": map_status(job_status), + "location": location_url, # for convenience/backward compatibility, but official is Location *header* } resp = get_job_submission_response(body, resp_headers) return resp diff --git a/weaver/processes/utils.py b/weaver/processes/utils.py index 331df523f..610881d95 100644 --- a/weaver/processes/utils.py +++ b/weaver/processes/utils.py @@ -107,9 +107,6 @@ UpdateFields = List[Union[str, UpdateFieldListMethod]] -# FIXME: -# https://github.com/crim-ca/weaver/issues/215 -# define common Exception classes that won't require this type of conversion def get_process(process_id=None, request=None, settings=None, store=None, revision=True): # type: (Optional[str], Optional[PyramidRequest], Optional[SettingsType], Optional[StoreProcesses], bool) -> Process """ diff --git a/weaver/status.py b/weaver/status.py index 45042655f..5016cfc99 100644 --- a/weaver/status.py +++ b/weaver/status.py @@ -9,22 +9,29 @@ class StatusCompliant(ExtendedEnum): OGC = "OGC" PYWPS = "PYWPS" OWSLIB = "OWSLIB" + OPENEO = "OPENEO" class StatusCategory(ExtendedEnum): FINISHED = "FINISHED" RUNNING = "RUNNING" + PENDING = "PENDING" FAILED = "FAILED" class Status(Constants): + CREATED = "created" + QUEUED = "queued" ACCEPTED = "accepted" STARTED = "started" PAUSED = "paused" SUCCEEDED = "succeeded" SUCCESSFUL = "successful" FAILED = "failed" + ERROR = "error" + FINISHED = "finished" RUNNING = "running" + CANCELED = "canceled" DISMISSED = "dismissed" EXCEPTION = "exception" UNKNOWN = "unknown" # don't include in any below collections @@ -33,14 +40,16 @@ class Status(Constants): JOB_STATUS_CATEGORIES = { # note: # OGC compliant (old): [Accepted, Running, Succeeded, Failed] - # OGC compliant (new): [accepted, running, successful, failed, dismissed] + # OGC compliant (new): [accepted, running, successful, failed, dismissed, created] ('created' in Part 4 only) # PyWPS uses: [Accepted, Started, Succeeded, Failed, Paused, Exception] - # OWSLib users: [Accepted, Running, Succeeded, Failed, Paused] (with 'Process' in front) + # OWSLib uses: [Accepted, Running, Succeeded, Failed, Paused] (with 'Process' in front) + # OpenEO uses: [queued, running, finished, error, canceled, created] # https://github.com/opengeospatial/ogcapi-processes/blob/master/openapi/schemas/processes-core/statusCode.yaml # http://docs.opengeospatial.org/is/14-065/14-065.html#17 # corresponding statuses are aligned vertically for 'COMPLIANT' groups StatusCompliant.OGC: frozenset([ + Status.CREATED, # Part 4: Job Management Status.ACCEPTED, Status.RUNNING, Status.SUCCEEDED, # old (keep it because it matches existing ADES/EMS and other providers) @@ -63,31 +72,50 @@ class Status(Constants): Status.FAILED, Status.PAUSED ]), + StatusCompliant.OPENEO: frozenset([ + Status.CREATED, + Status.QUEUED, + Status.RUNNING, + Status.FINISHED, + Status.ERROR, + Status.CANCELED + ]), # utility categories StatusCategory.RUNNING: frozenset([ Status.ACCEPTED, Status.RUNNING, Status.STARTED, + Status.QUEUED, + Status.PAUSED + ]), + StatusCategory.PENDING: frozenset([ + Status.CREATED, + Status.ACCEPTED, + Status.QUEUED, Status.PAUSED ]), StatusCategory.FINISHED: frozenset([ Status.FAILED, Status.DISMISSED, + Status.CANCELED, Status.EXCEPTION, + Status.ERROR, Status.SUCCEEDED, - Status.SUCCESSFUL + Status.SUCCESSFUL, + Status.FINISHED ]), StatusCategory.FAILED: frozenset([ Status.FAILED, Status.DISMISSED, - Status.EXCEPTION + Status.EXCEPTION, + Status.ERROR ]), } # FIXME: see below detail in map_status about 'successful', partially compliant to OGC statuses # https://github.com/opengeospatial/ogcapi-processes/blob/ca8e90/core/openapi/schemas/statusCode.yaml JOB_STATUS_CODE_API = JOB_STATUS_CATEGORIES[StatusCompliant.OGC] - {Status.SUCCESSFUL} -JOB_STATUS_SEARCH_API = set(list(JOB_STATUS_CODE_API) + [StatusCategory.FINISHED.value.lower()]) +JOB_STATUS_SEARCH_API = set(list(JOB_STATUS_CODE_API) + [Status.FINISHED]) # id -> str STATUS_PYWPS_MAP = {s: _WPS_STATUS._fields[s].lower() for s in range(len(WPS_STATUS))} @@ -100,14 +128,19 @@ class Status(Constants): from weaver.typedefs import Literal, TypeAlias StatusType: Status = Literal[ + Status.CREATED, Status.ACCEPTED, Status.STARTED, + Status.QUEUED, Status.PAUSED, Status.SUCCEEDED, + Status.FINISHED, Status.FAILED, Status.RUNNING, Status.DISMISSED, + Status.CANCELED, Status.EXCEPTION, + Status.ERROR, Status.UNKNOWN ] AnyStatusType = Union[Status, StatusType, int] @@ -116,6 +149,7 @@ class Status(Constants): StatusCategory, Literal[ StatusCategory.RUNNING, + StatusCategory.PENDING, StatusCategory.FINISHED, StatusCategory.FAILED, ], @@ -160,21 +194,48 @@ def map_status(wps_status, compliant=StatusCompliant.OGC): if job_status in JOB_STATUS_CATEGORIES[StatusCategory.RUNNING]: if job_status in [Status.STARTED, Status.PAUSED]: job_status = Status.RUNNING + elif job_status == Status.QUEUED: + job_status = Status.ACCEPTED + elif job_status in [Status.CANCELED, Status.DISMISSED]: + job_status = Status.DISMISSED elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.FAILED]: - if job_status not in [Status.FAILED, Status.DISMISSED]: - job_status = Status.FAILED + job_status = Status.FAILED + elif job_status == Status.FINISHED: + job_status = Status.SUCCEEDED elif compliant == StatusCompliant.PYWPS: - if job_status == Status.RUNNING: + if job_status in Status.RUNNING: job_status = Status.STARTED - elif job_status == Status.DISMISSED: + elif job_status in [Status.DISMISSED, Status.CANCELED]: job_status = Status.FAILED + elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.FAILED]: + job_status = Status.EXCEPTION + elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.PENDING]: + job_status = Status.PAUSED + elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]: + job_status = Status.SUCCEEDED elif compliant == StatusCompliant.OWSLIB: - if job_status == Status.STARTED: + if job_status in JOB_STATUS_CATEGORIES[StatusCategory.PENDING]: + job_status = Status.PAUSED + elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.RUNNING]: job_status = Status.RUNNING - elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.FAILED] and job_status != Status.FAILED: + elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.FAILED]: job_status = Status.FAILED + elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]: + job_status = Status.SUCCEEDED + + elif compliant == StatusCompliant.OPENEO: + if job_status in JOB_STATUS_CATEGORIES[StatusCategory.PENDING]: + job_status = Status.QUEUED + elif job_status == Status.DISMISSED: + job_status = Status.CANCELED + elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.RUNNING]: + job_status = Status.RUNNING + elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.FAILED]: + job_status = Status.ERROR + elif job_status in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]: + job_status = Status.FINISHED # FIXME: new official status is 'successful', but this breaks everywhere (tests, local/remote execute, etc.) # https://github.com/opengeospatial/ogcapi-processes/blob/master/openapi/schemas/processes-core/statusCode.yaml diff --git a/weaver/typedefs.py b/weaver/typedefs.py index ae77096d5..a89521090 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -959,6 +959,7 @@ class CWL_SchemaName(Protocol): }, total=True) ProcessExecution = TypedDict("ProcessExecution", { + "status": NotRequired[Literal["create"]], "mode": NotRequired[AnyExecuteMode], "response": NotRequired[AnyExecuteResponse], "inputs": NotRequired[ExecutionInputs], diff --git a/weaver/utils.py b/weaver/utils.py index 7f98b996c..effa9c3ae 100644 --- a/weaver/utils.py +++ b/weaver/utils.py @@ -120,6 +120,7 @@ JSON, KVP, KVP_Item, + Link, Literal, Number, OpenAPISchema, @@ -1385,6 +1386,44 @@ def get_href_headers(path, # type: str return headers +def make_link_header( + href, # type: Union[str, Link] + hreflang=None, # type: Optional[str] + rel=None, # type: Optional[str] + type=None, # type: Optional[str] # noqa + title=None, # type: Optional[str] + charset=None, # type: Optional[str] +): # type: (...) -> str + """ + Creates the HTTP Link (:rfc:`8288`) header value from input parameters or a dictionary representation. + + Parameter names are specifically selected to allow direct unpacking from the dictionary representation. + Otherwise, a dictionary can be passed as the first parameter, allowing other parameters to act as override values. + Alternatively, all parameters can be supplied individually. + + .. note:: + Parameter :paramref:`rel` is optional to allow unpacking with a single parameter, + but its value is required to form a valid ``Link`` header. + """ + if isinstance(href, dict): + rel = rel or href.get("rel") + type = type or href.get("type") # noqa + title = title or href.get("title") + charset = charset or href.get("charset") # noqa + hreflang = hreflang or href.get("hreflang") + href = href["href"] + link = f"<{href}>; rel=\"{rel}\"" + if type: + link += f"; type=\"{type}\"" + if charset: + link += f"; charset=\"{charset}\"" + if title: + link += f"; title=\"{title}\"" + if hreflang: + link += f"; hreflang={hreflang}" + return link + + def get_base_url(url): # type: (str) -> str """ diff --git a/weaver/wps/service.py b/weaver/wps/service.py index 84359d510..192d53ddd 100644 --- a/weaver/wps/service.py +++ b/weaver/wps/service.py @@ -258,15 +258,17 @@ def prepare_process_for_execution(self, identifier): def execute(self, identifier, wps_request, uuid): # type: (str, Union[WPSRequest, WorkerRequest], str) -> Union[WPSResponse, HTTPValid] """ - Handles the ``Execute`` KVP/XML request submitted on the WPS endpoint. + Handles the ``Execute`` :term:`KVP`/:term:`XML` request submitted on the :term:`WPS` endpoint. - Submit WPS request to corresponding WPS-REST endpoint and convert back for requested ``Accept`` content-type. + Submit :term:`WPS` request to corresponding :term:`WPS-REST` endpoint and convert back for + requested ``Accept`` content-type. - Overrides the original execute operation, that will instead be handled by :meth:`execute_job` following - callback from Celery Worker, which handles process job creation and monitoring. + Overrides the original execute operation, that will instead be handled by :meth:`execute_job` + following callback from :mod:`celery` worker, which handles :term:`Job` creation and monitoring. - If ``Accept`` is JSON, the result is directly returned from :meth:`_submit_job`. - If ``Accept`` is XML or undefined, :class:`WorkerExecuteResponse` converts the received JSON with XML template. + If ``Accept`` is :term:`JSON`, the result is directly returned from :meth:`_submit_job`. + If ``Accept`` is :term:`XML` or undefined, :class:`WorkerExecuteResponse` converts the + received :term:`JSON` with :term:`XML` template. """ result = self._submit_job(wps_request) if not isinstance(result, dict): diff --git a/weaver/wps_restapi/api.py b/weaver/wps_restapi/api.py index a736df12b..edb386710 100644 --- a/weaver/wps_restapi/api.py +++ b/weaver/wps_restapi/api.py @@ -97,6 +97,7 @@ def get_conformance(category, settings): ogcapi_proc_core = "http://www.opengis.net/spec/ogcapi-processes-1/1.0" ogcapi_proc_part2 = "http://www.opengis.net/spec/ogcapi-processes-2/1.0" ogcapi_proc_part3 = "http://www.opengis.net/spec/ogcapi-processes-3/0.0" + ogcapi_proc_part4 = "http://www.opengis.net/spec/ogcapi-processes-4/1.0" ogcapi_proc_apppkg = "http://www.opengis.net/spec/eoap-bp/1.0" # FIXME: https://github.com/crim-ca/weaver/issues/412 # ogcapi_proc_part3 = "http://www.opengis.net/spec/ogcapi-processes-3/1.0" @@ -475,6 +476,18 @@ def get_conformance(category, settings): # FIXME: support openEO processes (https://github.com/crim-ca/weaver/issues/564) # f"{ogcapi_proc_part3}/conf/openeo-workflows", # f"{ogcapi_proc_part3}/req/openeo-workflows", + f"{ogcapi_proc_part4}/conf/jm/create/post-op", + f"{ogcapi_proc_part4}/per/job-management/additional-status-codes", # see 'weaver.status.map_status' + f"{ogcapi_proc_part4}/per/job-management/create-body", # Weaver has XML for WPS + f"{ogcapi_proc_part4}/per/job-management/create-content-schema", + f"{ogcapi_proc_part4}/per/job-management/update-body", + f"{ogcapi_proc_part4}/per/job-management/update-content-schema", + # FIXME: support part 3: Nested Workflow Execution request (https://github.com/crim-ca/weaver/issues/412) + # f"{ogcapi_proc_part4}/rec/job-management/create-body-ogcapi-processes", + # FIXME: support openEO processes (https://github.com/crim-ca/weaver/issues/564) + # f"{ogcapi_proc_part4}/rec/job-management/create-body-openeo", + f"{ogcapi_proc_part4}/req/job-management/create/post-op", + f"{ogcapi_proc_part4}/req/job-management/update/response-locked", # FIXME: employ 'weaver.wps_restapi.quotation.utils.check_quotation_supported' to add below conditionally # FIXME: https://github.com/crim-ca/weaver/issues/156 (billing/quotation) # https://github.com/opengeospatial/ogcapi-processes/tree/master/extensions/billing diff --git a/weaver/wps_restapi/examples/job_status_created.json b/weaver/wps_restapi/examples/job_status_created.json new file mode 100644 index 000000000..bec621212 --- /dev/null +++ b/weaver/wps_restapi/examples/job_status_created.json @@ -0,0 +1,7 @@ +{ + "description": "Job successfully submitted for creation. Waiting on trigger request to being execution.", + "jobID": "797c0c5e-9bc2-4bf3-ab73-5f3df32044a8", + "processID": "Echo", + "status": "created", + "location": "http://schema-example.com/processes/Echo/jobs/797c0c5e-9bc2-4bf3-ab73-5f3df32044a8" +} diff --git a/weaver/wps_restapi/jobs/jobs.py b/weaver/wps_restapi/jobs/jobs.py index 749a338b1..9193903f4 100644 --- a/weaver/wps_restapi/jobs/jobs.py +++ b/weaver/wps_restapi/jobs/jobs.py @@ -3,18 +3,33 @@ from box import Box from celery.utils.log import get_task_logger from colander import Invalid -from pyramid.httpexceptions import HTTPBadRequest, HTTPOk, HTTPPermanentRedirect, HTTPUnprocessableEntity +from pyramid.httpexceptions import ( + HTTPBadRequest, + HTTPOk, + HTTPPermanentRedirect, + HTTPUnprocessableEntity, + HTTPUnsupportedMediaType +) +from weaver import xml_util from weaver.database import get_db from weaver.datatype import Job -from weaver.exceptions import JobNotFound, JobStatisticsNotFound, log_unhandled_exceptions -from weaver.formats import ContentType, OutputFormat, add_content_type_charset, guess_target_format, repr_json +from weaver.exceptions import JobNotFound, JobStatisticsNotFound, ProcessNotFound, log_unhandled_exceptions +from weaver.formats import ( + ContentType, + OutputFormat, + add_content_type_charset, + clean_media_type_format, + guess_target_format, + repr_json +) from weaver.processes.convert import convert_input_values_schema, convert_output_params_schema +from weaver.processes.execution import submit_job, submit_job_dispatch_wps, submit_job_handler from weaver.processes.utils import get_process from weaver.processes.wps_package import mask_process_inputs from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory from weaver.store.base import StoreJobs -from weaver.utils import get_settings +from weaver.utils import get_header, get_settings from weaver.wps_restapi import swagger_definitions as sd from weaver.wps_restapi.jobs.utils import ( dismiss_job_task, @@ -23,10 +38,12 @@ get_job_results_response, get_results, get_schema_query, - raise_job_bad_status, + raise_job_bad_status_locked, + raise_job_bad_status_success, raise_job_dismissed, validate_service_process ) +from weaver.wps_restapi.providers.utils import get_service from weaver.wps_restapi.swagger_definitions import datetime_interval_parser if TYPE_CHECKING: @@ -179,6 +196,96 @@ def _job_list(_jobs): # type: (Iterable[Job]) -> List[JSON] return Box(body) +@sd.jobs_service.post( + tags=[sd.TAG_EXECUTE, sd.TAG_JOBS], + content_type=list(ContentType.ANY_XML), + schema=sd.PostJobsEndpointXML(), + accept=ContentType.APP_JSON, + renderer=OutputFormat.JSON, + response_schemas=sd.post_jobs_responses, +) +@sd.jobs_service.post( + tags=[sd.TAG_EXECUTE, sd.TAG_JOBS, sd.TAG_PROCESSES], + content_type=ContentType.APP_JSON, + schema=sd.PostJobsEndpointJSON(), + accept=ContentType.APP_JSON, + renderer=OutputFormat.JSON, + response_schemas=sd.post_jobs_responses, +) +def create_job(request): + # type: (PyramidRequest) -> AnyViewResponse + """ + Create a new processing job with advanced management and execution capabilities. + """ + proc_id = None + prov_id = None + try: + ctype = get_header("Content-Type", request.headers, default=ContentType.APP_JSON) + ctype = clean_media_type_format(ctype, strip_parameters=True) + if ctype == ContentType.APP_JSON and "process" in request.json_body: + proc_url = request.json_body["process"] + proc_url = sd.ProcessURL().deserialize(proc_url) + prov_url, proc_id = proc_url.rsplit("/processes/", 1) + prov_parts = prov_url.rsplit("/providers/", 1) + prov_id = prov_parts[-1] if len(prov_parts) > 1 else None + elif ctype in ContentType.ANY_XML: + body_xml = xml_util.fromstring(request.text) + proc_id = body_xml.xpath("ows:Identifier", namespaces=body_xml.getroot().nsmap)[0].text + except Exception as exc: + raise ProcessNotFound(json={ + "title": "NoSuchProcess", + "type": "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/no-such-process", + "detail": "Process URL or identifier reference missing or invalid.", + "status": ProcessNotFound.code, + }) from exc + if not proc_id: + raise HTTPUnsupportedMediaType(json={ + "title": "Unsupported Media Type", + "type": "http://www.opengis.net/def/exceptions/ogcapi-processes-2/1.0/unsupported-media-type", + "detail": "Process URL or identifier reference missing or invalid.", + "status": HTTPUnsupportedMediaType.code, + "cause": {"headers": {"Content-Type": ctype}}, + }) + + if ctype in ContentType.ANY_XML: + process = get_process(process_id=proc_id) + return submit_job_dispatch_wps(request, process) + + if prov_id: + ref = get_service(request, provider_id=prov_id) + else: + ref = get_process(process_id=proc_id) + proc_id = None # ensure ref is used, process ID needed only for provider + return submit_job(request, ref, process_id=proc_id, tags=["wps-rest", "ogc-api"]) + + +@sd.process_results_service.post( + tags=[sd.TAG_JOBS, sd.TAG_EXECUTE, sd.TAG_RESULTS, sd.TAG_PROCESSES], + schema=sd.ProcessJobResultsTriggerExecutionEndpoint(), + accept=ContentType.APP_JSON, + renderer=OutputFormat.JSON, + response_schemas=sd.post_job_results_responses, +) +@sd.job_results_service.post( + tags=[sd.TAG_JOBS, sd.TAG_EXECUTE, sd.TAG_RESULTS], + schema=sd.JobResultsTriggerExecutionEndpoint(), + accept=ContentType.APP_JSON, + renderer=OutputFormat.JSON, + response_schemas=sd.post_job_results_responses, +) +def trigger_job_execution(request): + # type: (PyramidRequest) -> AnyResponseType + """ + Trigger the execution of a previously created job. + """ + job = get_job(request) + raise_job_dismissed(job, request) + raise_job_bad_status_locked(job, request) + # FIXME: reuse job, adjust function or map parameters from attributes + # FIXME: alt 202 code for accepted on async when triggered this way + return submit_job_handler(request, job) + + @sd.provider_job_service.get( tags=[sd.TAG_JOBS, sd.TAG_STATUS, sd.TAG_PROVIDERS], schema=sd.ProviderJobEndpoint(), @@ -211,6 +318,39 @@ def get_job_status(request): return HTTPOk(json=job_status) +@sd.provider_job_service.patch( + tags=[sd.TAG_JOBS, sd.TAG_PROVIDERS], + schema=sd.PatchProviderJobEndpoint(), + accept=ContentType.APP_JSON, + renderer=OutputFormat.JSON, + response_schemas=sd.patch_provider_job_responses, +) +@sd.process_job_service.patch( + tags=[sd.TAG_JOBS, sd.TAG_PROCESSES], + schema=sd.PatchProcessJobEndpoint(), + accept=ContentType.APP_JSON, + renderer=OutputFormat.JSON, + response_schemas=sd.patch_process_job_responses, +) +@sd.job_service.patch( + tags=[sd.TAG_JOBS], + schema=sd.PatchJobEndpoint(), + accept=ContentType.APP_JSON, + renderer=OutputFormat.JSON, + response_schemas=sd.patch_job_responses, +) +def update_job(request): + # type: (PyramidRequest) -> AnyResponseType + """ + Update a previously created job still pending execution. + """ + job = get_job(request) + raise_job_dismissed(job, request) + raise_job_bad_status_locked(job, request) + + raise NotImplementedError # FIXME + + @sd.provider_job_service.delete( tags=[sd.TAG_JOBS, sd.TAG_DISMISS, sd.TAG_PROVIDERS], schema=sd.ProviderJobEndpoint(), @@ -381,7 +521,7 @@ def get_job_outputs(request): """ job = get_job(request) raise_job_dismissed(job, request) - raise_job_bad_status(job, request) + raise_job_bad_status_success(job, request) schema = get_schema_query(request.params.get("schema")) results, _ = get_results(job, request, schema=schema, link_references=False) outputs = {"outputs": results} diff --git a/weaver/wps_restapi/jobs/utils.py b/weaver/wps_restapi/jobs/utils.py index 2f9c4e321..16410746e 100644 --- a/weaver/wps_restapi/jobs/utils.py +++ b/weaver/wps_restapi/jobs/utils.py @@ -11,6 +11,7 @@ HTTPCreated, HTTPForbidden, HTTPInternalServerError, + HTTPLocked, HTTPNoContent, HTTPNotFound, HTTPNotImplemented, @@ -48,7 +49,8 @@ get_secure_path, get_settings, get_weaver_url, - is_uuid + is_uuid, + make_link_header ) from weaver.visibility import Visibility from weaver.wps.utils import get_wps_output_dir, get_wps_output_url, map_wps_output_location @@ -461,7 +463,7 @@ def get_job_results_response(job, container, headers=None): :param headers: Additional headers to provide in the response. """ raise_job_dismissed(job, container) - raise_job_bad_status(job, container) + raise_job_bad_status_success(job, container) # when 'response=document', ignore 'transmissionMode=value|reference', respect it when 'response=raw' # See: @@ -580,8 +582,18 @@ def get_job_submission_response(body, headers, error=False): body["description"] = http_desc return http_class(json=body, headerlist=headers) - body["description"] = sd.CreatedLaunchJobResponse.description + if status == Status.CREATED: + body["description"] = ( + "Job successfully submitted for creation. " + "Waiting on trigger request to being execution." + ) + else: + body["description"] = ( + "Job successfully submitted to processing queue. " + "Execution should begin when resources are available." + ) body = sd.CreatedJobStatusSchema().deserialize(body) + headers.setdefault("Location", body["location"]) return HTTPCreated(json=body, headerlist=headers) @@ -663,13 +675,42 @@ def validate_service_process(request): return service_name, process_name -def raise_job_bad_status(job, container=None): +def raise_job_bad_status_locked(job, container=None): + # type: (Job, Optional[AnySettingsContainer]) -> None + """ + Raise the appropriate message for :term:`Job` unable to be modified. + """ + if job.status != Status.CREATED: + links = job.links(container=container) + headers = [("Link", make_link_header(link)) for link in links] + job_reason = "" + if job.status in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]: + job_reason = " It has already finished execution." + elif job.status in JOB_STATUS_CATEGORIES[StatusCategory.PENDING]: + job_reason = " It is already queued for execution." + elif job.status in JOB_STATUS_CATEGORIES[StatusCategory.RUNNING]: + job_reason = " It is already executing." + raise HTTPLocked( + headers=headers, + json={ + "title": "Job Locked for Execution", + "type": "http://www.opengis.net/def/exceptions/ogcapi-processes-4/1.0/locked", + "detail": f"Job cannot be modified.{job_reason}", + "status": HTTPLocked.code, + "cause": {"status": job.status}, + "links": links + } + ) + + +def raise_job_bad_status_success(job, container=None): # type: (Job, Optional[AnySettingsContainer]) -> None """ Raise the appropriate message for :term:`Job` not ready or unable to retrieve output results due to status. """ if job.status != Status.SUCCEEDED: links = job.links(container=container) + headers = [("Link", make_link_header(link)) for link in links] if job.status == Status.FAILED: err_code = None err_info = None @@ -697,26 +738,32 @@ def raise_job_bad_status(job, container=None): err_code = OWSNoApplicableCode.code err_info = "unknown" # /req/core/job-results-failed - raise HTTPBadRequest(json={ - "title": "JobResultsFailed", - "type": err_code, - "detail": "Job results not available because execution failed.", - "status": HTTPBadRequest.code, - "cause": err_info, - "links": links - }) + raise HTTPBadRequest( + headers=headers, + json={ + "title": "JobResultsFailed", + "type": err_code, + "detail": "Job results not available because execution failed.", + "status": HTTPBadRequest.code, + "cause": err_info, + "links": links + } + ) # /req/core/job-results-exception/results-not-ready # must use OWS instead of HTTP class to preserve provided JSON body # otherwise, pyramid considers it as not found view/path and rewrites contents in append slash handler - raise OWSNotFound(json={ - "title": "JobResultsNotReady", - "type": "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/result-not-ready", - "detail": "Job is not ready to obtain results.", - "status": HTTPNotFound.code, - "cause": {"status": job.status}, - "links": links - }) + raise OWSNotFound( + headers=headers, + json={ + "title": "JobResultsNotReady", + "type": "http://www.opengis.net/def/exceptions/ogcapi-processes-1/1.0/result-not-ready", + "detail": "Job is not ready to obtain results.", + "status": HTTPNotFound.code, + "cause": {"status": job.status}, + "links": links + } + ) def raise_job_dismissed(job, container=None): @@ -729,7 +776,9 @@ def raise_job_dismissed(job, container=None): settings = get_settings(container) job_links = job.links(settings) job_links = [link for link in job_links if link["rel"] in ["status", "alternate", "collection", "up"]] + headers = [("Link", make_link_header(link)) for link in job_links] raise JobGone( + headers=headers, json={ "title": "JobDismissed", "type": "JobDismissed", diff --git a/weaver/wps_restapi/processes/processes.py b/weaver/wps_restapi/processes/processes.py index 05248e626..7e1750205 100644 --- a/weaver/wps_restapi/processes/processes.py +++ b/weaver/wps_restapi/processes/processes.py @@ -14,7 +14,6 @@ ) from pyramid.response import Response from pyramid.settings import asbool -from werkzeug.wrappers.request import Request as WerkzeugRequest from weaver.database import get_db from weaver.exceptions import ProcessNotFound, ServiceException, log_unhandled_exceptions @@ -28,21 +27,12 @@ ) from weaver.processes import opensearch from weaver.processes.constants import ProcessSchema -from weaver.processes.execution import submit_job +from weaver.processes.execution import submit_job, submit_job_dispatch_wps from weaver.processes.utils import deploy_process_from_payload, get_process, update_process_metadata from weaver.status import Status from weaver.store.base import StoreJobs, StoreProcesses -from weaver.utils import ( - clean_json_text_body, - extend_instance, - fully_qualified_name, - get_any_id, - get_header, - get_path_kvp -) +from weaver.utils import clean_json_text_body, fully_qualified_name, get_any_id, get_header from weaver.visibility import Visibility -from weaver.wps.service import get_pywps_service -from weaver.wps.utils import get_wps_path from weaver.wps_restapi import swagger_definitions as sd from weaver.wps_restapi.processes.utils import get_process_list_links, get_processes_filtered_by_valid_schemas from weaver.wps_restapi.providers.utils import get_provider_services @@ -489,20 +479,11 @@ def submit_local_job(request): Execution location and method is according to deployed Application Package. """ process = get_process(request=request) - ctype = clean_media_type_format(get_header("content-type", request.headers, default=None), strip_parameters=True) + ctype = get_header("Content-Type", request.headers, default=None) + ctype = clean_media_type_format(ctype, strip_parameters=True) if ctype in ContentType.ANY_XML: - # Send the XML request to the WPS endpoint which knows how to parse it properly. - # Execution will end up in the same 'submit_job_handler' function as other branch for JSON. - service = get_pywps_service() - wps_params = {"version": "1.0.0", "request": "Execute", "service": "WPS", "identifier": process.id} - request.path_info = get_wps_path(request) - request.query_string = get_path_kvp("", **wps_params)[1:] - location = request.application_url + request.path_info + request.query_string - LOGGER.warning("Route redirection [%s] -> [%s] for WPS-XML support.", request.url, location) - http_request = extend_instance(request, WerkzeugRequest) - http_request.shallow = False - return service.call(http_request) - return submit_job(request, process, tags=["wps-rest"]) + return submit_job_dispatch_wps(request, process) + return submit_job(request, process, tags=["wps-rest", "ogc-api"]) def includeme(config): diff --git a/weaver/wps_restapi/providers/providers.py b/weaver/wps_restapi/providers/providers.py index 9aa752ead..57cbacdb0 100644 --- a/weaver/wps_restapi/providers/providers.py +++ b/weaver/wps_restapi/providers/providers.py @@ -17,8 +17,9 @@ from weaver.exceptions import ServiceNotFound, ServiceParsingError, log_unhandled_exceptions from weaver.formats import ContentType, OutputFormat from weaver.owsexceptions import OWSMissingParameterValue, OWSNotImplemented +from weaver.processes.execution import submit_job from weaver.store.base import StoreServices -from weaver.utils import get_any_id, get_settings +from weaver.utils import get_any_id from weaver.wps.utils import get_wps_client from weaver.wps_restapi import swagger_definitions as sd from weaver.wps_restapi.processes.utils import get_process_list_links @@ -141,7 +142,8 @@ def remove_provider(request): """ Remove an existing service provider. """ - service, store = get_service(request) + store = get_db(request).get_store(StoreServices) + service = get_service(request) try: store.delete_service(service.name) @@ -165,7 +167,7 @@ def get_provider(request): """ Get a provider definition (GetCapabilities). """ - service, _ = get_service(request) + service = get_service(request) data = get_schema_ref(sd.ProviderSummarySchema, request, ref_name=False) info = service.summary(request) data.update(info) @@ -208,14 +210,12 @@ def describe_provider_process(request): Note: this processes won't be stored to the local process storage. """ - provider_id = request.matchdict.get("provider_id") - process_id = request.matchdict.get("process_id") - store = get_db(request).get_store(StoreServices) - service = store.fetch_by_name(provider_id) + service = get_service(request) # FIXME: support other providers (https://github.com/crim-ca/weaver/issues/130) wps = get_wps_client(service.url, request) - process = wps.describeprocess(process_id) - return Process.convert(process, service, get_settings(request)) + proc_id = request.matchdict.get("process_id") + process = wps.describeprocess(proc_id) + return Process.convert(process, service, container=request) @sd.provider_process_service.get( @@ -278,11 +278,7 @@ def submit_provider_job(request): """ Execute a remote provider process. """ - from weaver.processes.execution import submit_job # isort:skip # noqa: E402 # pylint: disable=C0413 - - store = get_db(request).get_store(StoreServices) - provider_id = request.matchdict.get("provider_id") - service = store.fetch_by_name(provider_id) + service = get_service(request) return submit_job(request, service, tags=["wps-rest"]) diff --git a/weaver/wps_restapi/providers/utils.py b/weaver/wps_restapi/providers/utils.py index 41e7f6d82..feee53a02 100644 --- a/weaver/wps_restapi/providers/utils.py +++ b/weaver/wps_restapi/providers/utils.py @@ -11,7 +11,7 @@ from weaver.utils import get_settings if TYPE_CHECKING: - from typing import Any, Callable, List, Tuple + from typing import Any, Callable, List, Optional from weaver.datatype import Service from weaver.typedefs import AnyRequestType, AnySettingsContainer @@ -68,15 +68,15 @@ def forbid_local(container): return forbid_local -def get_service(request): - # type: (AnyRequestType) -> Tuple[Service, StoreServices] +def get_service(request, provider_id=None): + # type: (AnyRequestType, Optional[str]) -> Service """ Get the request service using provider_id from the service store. """ store = get_db(request).get_store(StoreServices) - provider_id = request.matchdict.get("provider_id") + prov_id = provider_id or request.matchdict.get("provider_id") try: - service = store.fetch_by_name(provider_id) + service = store.fetch_by_name(prov_id) except ServiceNotFound: - raise HTTPNotFound(f"Provider {provider_id} cannot be found.") - return service, store + raise HTTPNotFound(f"Provider {prov_id} cannot be found.") + return service diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index 1d7fa8fb3..a9f2a2fe9 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -2143,6 +2143,12 @@ class JobStatusEnum(ExtendedSchemaNode): validator = OneOf(JOB_STATUS_CODE_API) +class JobStatusCreate(ExtendedSchemaNode): + schema_type = String + title = "JobStatus" + validator = OneOf(["create"]) + + class JobStatusSearchEnum(ExtendedSchemaNode): schema_type = String title = "JobStatusSearch" @@ -2550,9 +2556,13 @@ class OWSIdentifier(ExtendedSchemaNode, OWSNamespace): name = "Identifier" -class OWSIdentifierList(ExtendedSequenceSchema, OWSNamespace): +class OWSProcessIdentifier(ProcessIdentifier, OWSNamespace): + pass + + +class OWSProcessIdentifierList(ExtendedSequenceSchema, OWSNamespace): name = "Identifiers" - item = OWSIdentifier() + item = OWSProcessIdentifier() class OWSTitle(ExtendedSchemaNode, OWSNamespace): @@ -2585,7 +2595,7 @@ class WPSDescribeProcessPost(WPSOperationPost, WPSNamespace): _schema = f"{OGC_WPS_1_SCHEMAS}/wpsDescribeProcess_request.xsd" name = "DescribeProcess" title = "DescribeProcess" - identifier = OWSIdentifierList( + identifier = OWSProcessIdentifierList( description="Single or comma-separated list of process identifier to describe.", example="example" ) @@ -2602,7 +2612,7 @@ class WPSExecutePost(WPSOperationPost, WPSNamespace): _schema = f"{OGC_WPS_1_SCHEMAS}/wpsExecute_request.xsd" name = "Execute" title = "Execute" - identifier = OWSIdentifier(description="Identifier of the process to execute with data inputs.") + identifier = OWSProcessIdentifier(description="Identifier of the process to execute with data inputs.") dataInputs = WPSExecuteDataInputs(description="Data inputs to be provided for process execution.") @@ -2776,7 +2786,7 @@ class ProcessVersion(ExtendedSchemaNode, WPSNamespace): class OWSProcessSummary(ExtendedMappingSchema, WPSNamespace): version = ProcessVersion(name="processVersion", default="None", example="1.2", description="Version of the corresponding process summary.") - identifier = OWSIdentifier(example="example", description="Identifier to refer to the process.") + identifier = OWSProcessIdentifier(example="example", description="Identifier to refer to the process.") _title = OWSTitle(example="Example Process", description="Title of the process.") abstract = OWSAbstract(example="Process for example schema.", description="Detail about the process.") @@ -3014,7 +3024,7 @@ class WPSStatus(ExtendedMappingSchema, WPSNamespace): class WPSProcessSummary(ExtendedMappingSchema, WPSNamespace): name = "Process" title = "Process" - identifier = OWSIdentifier() + identifier = OWSProcessIdentifier() _title = OWSTitle() abstract = OWSAbstract(missing=drop) @@ -3328,10 +3338,19 @@ class ProviderResultsEndpoint(ProviderProcessPath, JobPath): header = RequestHeaders() -class JobResultsEndpoint(ProviderProcessPath, JobPath): +class JobResultsEndpoint(JobPath): header = RequestHeaders() +class JobResultsTriggerExecutionEndpoint(JobResultsEndpoint): + header = RequestHeaders() + body = NoContent() + + +class ProcessJobResultsTriggerExecutionEndpoint(JobResultsTriggerExecutionEndpoint, LocalProcessPath): + pass + + class ProviderExceptionsEndpoint(ProviderProcessPath, JobPath): header = RequestHeaders() @@ -4162,6 +4181,14 @@ class Execute(ExecuteInputOutputs): "value": EXAMPLES["job_execute.json"], }, } + status = JobStatusCreate( + description=( + "Status to request creation of the job without submitting it to processing queue " + "and leave it pending until triggered by another results request to start it " + "(see *OGC API - Processes* - Part 4: Job Management)." + ), + missing=drop, + ) mode = JobExecuteModeEnum( missing=drop, default=ExecuteMode.AUTO, @@ -6444,13 +6471,17 @@ class ExecuteHeadersXML(ExecuteHeadersBase): ) -class PostProcessJobsEndpointJSON(LocalProcessPath): +class PostJobsEndpointJSON(ExtendedMappingSchema): header = ExecuteHeadersJSON() querystring = LocalProcessQuery() body = Execute() -class PostProcessJobsEndpointXML(LocalProcessPath): +class PostProcessJobsEndpointJSON(PostJobsEndpointJSON, LocalProcessPath): + pass + + +class PostJobsEndpointXML(ExtendedMappingSchema): header = ExecuteHeadersXML() querystring = LocalProcessQuery() body = WPSExecutePost( @@ -6467,6 +6498,10 @@ class PostProcessJobsEndpointXML(LocalProcessPath): ) +class PostProcessJobsEndpointXML(PostJobsEndpointXML, LocalProcessPath): + pass + + class PagingQueries(ExtendedMappingSchema): page = ExtendedSchemaNode(Integer(allow_string=True), missing=0, default=0, validator=Range(min=0)) limit = ExtendedSchemaNode(Integer(allow_string=True), missing=10, default=10, validator=Range(min=1, max=1000), @@ -6734,7 +6769,7 @@ def __new__(cls, *, name, description, **kwargs): # pylint: disable=W0221 "New schema name must be provided to avoid invalid mixed use of $ref pointers. " f"Name '{name}' is invalid." ) - obj = super().__new__(cls) + obj = super().__new__(cls) # type: ExtendedSchemaNode obj.__init__(name=name, description=description) obj.__class__.__name__ = name obj.children = [ @@ -7129,11 +7164,18 @@ class CreatedJobLocationHeader(ResponseHeaders): class CreatedLaunchJobResponse(ExtendedMappingSchema): - description = "Job successfully submitted to processing queue. Execution should begin when resources are available." + description = ( + "Job successfully submitted. " + "Execution should begin when resources are available or when triggered, according to requested execution mode." + ) examples = { "JobAccepted": { - "summary": "Job accepted for execution.", + "summary": "Job accepted for execution asynchronously.", "value": EXAMPLES["job_status_accepted.json"] + }, + "JobCreated": { + "summary": "Job created for later execution by trigger.", + "value": EXAMPLES["job_status_created.json"] } } header = CreatedJobLocationHeader() @@ -7257,6 +7299,7 @@ class NoContentJobResultsHeaders(NoContent): class NoContentJobResultsResponse(ExtendedMappingSchema): + description = "Job completed execution synchronously with results returned in Link headers." header = NoContentJobResultsHeaders() body = NoContent(default="") @@ -7695,9 +7738,9 @@ class GoneVaultFileDownloadResponse(ExtendedMappingSchema): "501": NotImplementedPostProviderResponse(), } post_provider_process_job_responses = { - "200": CompletedJobResponse(description="success"), - "201": CreatedLaunchJobResponse(description="success"), - "204": NoContentJobResultsResponse(description="success"), + "200": CompletedJobResponse(), + "201": CreatedLaunchJobResponse(), + "204": NoContentJobResultsResponse(), "400": InvalidJobParametersResponse(), "403": ForbiddenProviderAccessResponseSchema(), "405": MethodNotAllowedErrorResponseSchema(), @@ -7705,15 +7748,21 @@ class GoneVaultFileDownloadResponse(ExtendedMappingSchema): "500": InternalServerErrorResponseSchema(), } post_process_jobs_responses = { - "200": CompletedJobResponse(description="success"), - "201": CreatedLaunchJobResponse(description="success"), - "204": NoContentJobResultsResponse(description="success"), + "200": CompletedJobResponse(), + "201": CreatedLaunchJobResponse(), + "204": NoContentJobResultsResponse(), "400": InvalidJobParametersResponse(), "403": ForbiddenProviderAccessResponseSchema(), "405": MethodNotAllowedErrorResponseSchema(), "406": NotAcceptableErrorResponseSchema(), "500": InternalServerErrorResponseSchema(), } +post_jobs_responses = copy(post_process_jobs_responses) +post_job_results_responses = copy(post_process_jobs_responses) +post_job_results_responses.pop("201") # job already created, therefore invalid +post_job_results_responses.update({ + "202": CreatedLaunchJobResponse(), # alternate to '201' for async case since job already exists +}) get_all_jobs_responses = { "200": OkGetQueriedJobsResponse(description="success", examples={ "JobListing": { diff --git a/weaver/xml_util.py b/weaver/xml_util.py index b0605a281..8e19991f9 100644 --- a/weaver/xml_util.py +++ b/weaver/xml_util.py @@ -53,7 +53,7 @@ def fromstring(text, parser=XML_PARSER): - # type: (AnyStr, lxml_etree.XMLParser) -> XML + # type: (AnyStr, lxml_etree.XMLParser) -> XMLTree from weaver.utils import str2bytes return _lxml_fromstring(str2bytes(text), parser=parser) # nosec: B410