Skip to content

Commit

Permalink
[wip] implement part 4 job management - job creation and results trig…
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Oct 11, 2024
1 parent d4ffd34 commit f816fa0
Show file tree
Hide file tree
Showing 17 changed files with 524 additions and 158 deletions.
3 changes: 2 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ Changes

Changes:
--------
- No change.
- Add support of *OGC API - Processes - Part 4: Job Management* endpoints for `Job` creation and execution
(fixes `#716 <https://github.com/crim-ca/weaver/issues/716>`_).

Fixes:
------
Expand Down
46 changes: 20 additions & 26 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@
AWS_S3_REGION_SUBSET_WITH_MOCK = {MOCK_AWS_REGION} | AWS_S3_REGION_SUBSET
AWS_S3_REGION_NON_DEFAULT = list(AWS_S3_REGION_SUBSET_WITH_MOCK - {MOCK_AWS_REGION})[0]

KNOWN_STATUSES = set(Status.values()) - {Status.UNKNOWN}

# pylint: disable=R1732,W1514 # not using with open + encoding


Expand Down Expand Up @@ -391,32 +393,24 @@ def test_pass_http_error_raises_other_error_with_multi_pyramid_error():
pass_http_error(ex, [HTTPConflict, HTTPInternalServerError])


def get_status_variations(status_value):
return [status_value.lower(),
status_value.upper(),
status_value.capitalize(),
f"Process{status_value.capitalize()}"]


def test_map_status_ogc_compliant():
known_statuses = set(Status.values()) - {Status.UNKNOWN}
for sv in known_statuses:
for s in get_status_variations(sv):
assert map_status(s, StatusCompliant.OGC) in JOB_STATUS_CATEGORIES[StatusCompliant.OGC]


def test_map_status_pywps_compliant():
known_statuses = set(Status.values()) - {Status.UNKNOWN}
for sv in known_statuses:
for s in get_status_variations(sv):
assert map_status(s, StatusCompliant.PYWPS) in JOB_STATUS_CATEGORIES[StatusCompliant.PYWPS]


def test_map_status_owslib_compliant():
known_statuses = set(Status.values()) - {Status.UNKNOWN}
for sv in known_statuses:
for s in get_status_variations(sv):
assert map_status(s, StatusCompliant.OWSLIB) in JOB_STATUS_CATEGORIES[StatusCompliant.OWSLIB]
@pytest.mark.parametrize(
["compliance", "status"],
itertools.product(
list(StatusCompliant),
itertools.chain.from_iterable(
[
status.lower(),
status.upper(),
status.capitalize(),
f"Process{status.capitalize()}"
]
for status in KNOWN_STATUSES
)
)
)
def test_map_status_compliant(compliance, status):
# type: (StatusCompliant, str) -> None
assert map_status(status, compliance) in JOB_STATUS_CATEGORIES[compliance]


def test_map_status_back_compatibility_and_special_cases():
Expand Down
76 changes: 56 additions & 20 deletions weaver/processes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from owslib.wps import BoundingBoxDataInput, ComplexDataInput
from pyramid.httpexceptions import HTTPBadRequest, HTTPNotAcceptable
from pyramid_celery import celery_app as app
from werkzeug.wrappers.request import Request as WerkzeugRequest

from weaver.database import get_db
from weaver.datatype import Process, Service
Expand All @@ -34,10 +35,12 @@
from weaver.utils import (
apply_number_with_unit,
as_int,
extend_instance,
fully_qualified_name,
get_any_id,
get_any_value,
get_header,
get_path_kvp,
get_registry,
get_settings,
now,
Expand All @@ -48,6 +51,7 @@
wait_secs
)
from weaver.visibility import Visibility
from weaver.wps.service import get_pywps_service
from weaver.wps.utils import (
check_wps_status,
get_wps_client,
Expand All @@ -56,6 +60,7 @@
get_wps_output_dir,
get_wps_output_path,
get_wps_output_url,
get_wps_path,
load_pywps_config
)
from weaver.wps_restapi import swagger_definitions as sd
Expand All @@ -79,6 +84,7 @@
AnyProcessRef,
AnyResponseType,
AnyServiceRef,
AnyViewResponse,
AnyValueType,
CeleryResult,
HeaderCookiesType,
Expand Down Expand Up @@ -116,8 +122,6 @@ def execute_process(task, job_id, wps_url, headers=None):
"""
Celery task that executes the WPS process job monitoring as status updates (local and remote).
"""
from weaver.wps.service import get_pywps_service

LOGGER.debug("Job execute process called.")

task_process = get_celery_process()
Expand Down Expand Up @@ -660,8 +664,30 @@ def map_locations(job, settings):
os.symlink(wps_ref, job_ref)


def submit_job(request, reference, tags=None):
# type: (Request, Union[Service, Process], Optional[List[str]]) -> AnyResponseType
def submit_job_dispatch_wps(request, process):
# type: (Request, Process) -> AnyViewResponse
"""
Dispatch a :term:`XML` request to the relevant :term:`Process` handler using the :term:`WPS` endpoint.
Sends the :term:`XML` request to the :term:`WPS` endpoint which knows how to parse it properly.
Execution will end up in the same :func:`submit_job_handler` function as for :term:`OGC API` :term:`JSON` execution.
.. warning::
The function assumes that :term:`XML` was pre-validated as present in the :paramref:`request`.
"""
service = get_pywps_service()
wps_params = {"version": "1.0.0", "request": "Execute", "service": "WPS", "identifier": process.id}
request.path_info = get_wps_path(request)
request.query_string = get_path_kvp("", **wps_params)[1:]
location = request.application_url + request.path_info + request.query_string
LOGGER.warning("Route redirection [%s] -> [%s] for WPS-XML support.", request.url, location)
http_request = extend_instance(request, WerkzeugRequest)
http_request.shallow = False
return service.call(http_request)


def submit_job(request, reference, tags=None, process_id=None):
# type: (Request, Union[Service, Process], Optional[List[str]], Optional[str]) -> AnyResponseType
"""
Generates the job submission from details retrieved in the request.
Expand All @@ -683,13 +709,13 @@ def submit_job(request, reference, tags=None):
# validate context if needed later on by the job for early failure
context = get_wps_output_context(request)

provider_id = None # None OK if local
process_id = None # None OK if remote, but can be found as well if available from WPS-REST path # noqa
prov_id = None # None OK if local
proc_id = None # None OK if remote, but can be found as well if available from WPS-REST path # noqa
tags = tags or []
lang = request.accept_language.header_value # can only preemptively check if local process
if isinstance(reference, Process):
service_url = reference.processEndpointWPS1
process_id = reference.identifier # explicit 'id:version' process revision if available, otherwise simply 'id'
proc_id = reference.identifier # explicit 'id:version' process revision if available, otherwise simply 'id'
visibility = reference.visibility
is_workflow = reference.type == ProcessType.WORKFLOW
is_local = True
Expand All @@ -702,8 +728,8 @@ def submit_job(request, reference, tags=None):
lang = matched_lang
elif isinstance(reference, Service):
service_url = reference.url
provider_id = reference.id
process_id = resolve_process_tag(request)
prov_id = reference.id
proc_id = process_id or resolve_process_tag(request)
visibility = Visibility.PUBLIC
is_workflow = False
is_local = False
Expand All @@ -716,7 +742,7 @@ def submit_job(request, reference, tags=None):
user = request.authenticated_userid # FIXME: consider other methods to provide the user
headers = dict(request.headers)
settings = get_settings(request)
return submit_job_handler(json_body, settings, service_url, provider_id, process_id, is_workflow, is_local,
return submit_job_handler(json_body, settings, service_url, prov_id, proc_id, is_workflow, is_local,
visibility, language=lang, headers=headers, tags=tags, user=user, context=context)


Expand Down Expand Up @@ -787,28 +813,38 @@ def submit_job_handler(payload, # type: ProcessExecution
exec_resp = json_body.get("response")
subscribers = map_job_subscribers(json_body, settings)

job_pending_created = payload.get("status") == "create"
if job_pending_created:
job_status = Status.CREATED
job_message = "Job created with pending trigger."
else:
job_status = Status.ACCEPTED
job_message = "Job task submitted for execution."

store = db.get_store(StoreJobs) # type: StoreJobs
job = store.save_job(task_id=Status.ACCEPTED, process=process, service=provider_id,
job = store.save_job(task_id=job_status, process=process, service=provider_id,
inputs=json_body.get("inputs"), outputs=json_body.get("outputs"),
is_local=is_local, is_workflow=is_workflow, access=visibility, user_id=user, context=context,
execute_async=is_execute_async, execute_response=exec_resp,
custom_tags=tags, accept_language=language, subscribers=subscribers)
job.save_log(logger=LOGGER, message="Job task submitted for execution.", status=Status.ACCEPTED, progress=0)
job.save_log(logger=LOGGER, message=job_message, status=job_status, progress=0)
job = store.update_job(job)
location_url = job.status_url(settings)
resp_headers = {"Location": location_url}
resp_headers.update(applied)

wps_url = clean_ows_url(service_url)
result = execute_process.delay(job_id=job.id, wps_url=wps_url, headers=headers) # type: CeleryResult
LOGGER.debug("Celery pending task [%s] for job [%s].", result.id, job.id)
if not is_execute_async:
task_result = None # type: Optional[CeleryResult]
if not job_pending_created:
wps_url = clean_ows_url(service_url)
task_result = execute_process.delay(job_id=job.id, wps_url=wps_url, headers=headers)
LOGGER.debug("Celery pending task [%s] for job [%s].", task_result.id, job.id)
if not job_pending_created and not is_execute_async:
LOGGER.debug("Celery task requested as sync if it completes before (wait=%ss)", wait)
try:
result.wait(timeout=wait)
task_result.wait(timeout=wait)
except CeleryTaskTimeoutError:
pass
if result.ready():
if task_result.ready():
job = store.fetch_by_id(job.id)
# when sync is successful, it must return the results direct instead of status info
# see: https://docs.ogc.org/is/18-062r2/18-062r2.html#sc_execute_response
Expand All @@ -833,8 +869,8 @@ def submit_job_handler(payload, # type: ProcessExecution
"jobID": job.id,
"processID": job.process,
"providerID": provider_id, # dropped by validator if not applicable
"status": map_status(Status.ACCEPTED),
"location": location_url
"status": map_status(job_status),
"location": location_url, # for convenience/backward compatibility, but official is Location *header*
}
resp = get_job_submission_response(body, resp_headers)
return resp
Expand Down
3 changes: 0 additions & 3 deletions weaver/processes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,6 @@
UpdateFields = List[Union[str, UpdateFieldListMethod]]


# FIXME:
# https://github.com/crim-ca/weaver/issues/215
# define common Exception classes that won't require this type of conversion
def get_process(process_id=None, request=None, settings=None, store=None, revision=True):
# type: (Optional[str], Optional[PyramidRequest], Optional[SettingsType], Optional[StoreProcesses], bool) -> Process
"""
Expand Down
Loading

0 comments on commit f816fa0

Please sign in to comment.