Skip to content

Commit

Permalink
Merge branch 'master' into properties-field-modifiers
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault authored Nov 16, 2024
2 parents 3c62377 + a9bda01 commit af81111
Show file tree
Hide file tree
Showing 12 changed files with 984 additions and 417 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ Changes:
relates to `#412 <https://github.com/crim-ca/weaver/issues/412>`_).
- Add support of *OGC API - Processes - Part 4: Job Management* endpoints for `Job` creation and execution
(fixes `#716 <https://github.com/crim-ca/weaver/issues/716>`_).
- Add `CLI` operations ``update_job``, ``trigger_job`` and ``inputs`` corresponding to the required `Job` operations
defined by *OGC API - Processes - Part 4: Job Management*.
- Add ``headers``, ``mode`` and ``response`` parameters along the ``inputs`` and ``outputs`` returned by
the ``GET /jobs/{jobID}/inputs`` endpoint to better describe the expected resolution strategy of the
multiple `Job` execution options according to submitted request parameters.
Expand Down
115 changes: 105 additions & 10 deletions tests/functional/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import smtplib
import tempfile
import uuid
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast

import mock
import pytest
Expand All @@ -38,6 +38,7 @@
from weaver.base import classproperty
from weaver.cli import AuthHandler, BearerAuthHandler, WeaverClient, main as weaver_cli
from weaver.datatype import DockerAuthentication, Service
from weaver.execute import ExecuteReturnPreference
from weaver.formats import ContentType, OutputFormat, get_cwl_file_format, repr_json
from weaver.notify import decrypt_email
from weaver.processes.constants import CWL_REQUIREMENT_APP_DOCKER, ProcessSchema
Expand All @@ -48,9 +49,11 @@
from weaver.wps.utils import get_wps_output_url, map_wps_output_location

if TYPE_CHECKING:
from typing import Dict, Optional
from typing import Any, Callable, Dict, Optional, Union

from weaver.typedefs import AnyRequestType, AnyResponseType, CWL
from weaver.cli import OperationResult
from weaver.status import AnyStatusType
from weaver.typedefs import AnyRequestType, AnyResponseType, CWL, JSON


class FakeAuthHandler(object):
Expand Down Expand Up @@ -134,6 +137,7 @@ def setup_test_file(self, original_file, substitutions):
return test_file_path

def process_listing_op(self, operation, **op_kwargs):
# type: (Callable[[Any, ...], OperationResult], **Any) -> OperationResult
result = mocked_sub_requests(self.app, operation, only_local=True, **op_kwargs)
assert result.success
assert "processes" in result.body
Expand Down Expand Up @@ -452,9 +456,17 @@ def test_describe(self):
"Dummy process that simply echo's back the input message for testing purposes."
), "CLI should not have overridden the process description field."

def run_execute_inputs_schema_variant(self, inputs_param, process="Echo",
preload=False, location=False, expect_success=True,
mock_exec=True, **exec_kwargs):
def run_execute_inputs_schema_variant(
self,
inputs_param, # type: Union[JSON, str]
process="Echo", # type: str
preload=False, # type: bool
location=False, # type: Optional[str]
expect_success=True, # type: bool
expect_status=None, # type: Optional[AnyStatusType]
mock_exec=True, # type: bool
**exec_kwargs, # type: Any
): # type: (...) -> OperationResult
if isinstance(inputs_param, str):
ref = {"location": inputs_param} if location else {"ref_name": inputs_param}
if preload:
Expand All @@ -469,16 +481,19 @@ def run_execute_inputs_schema_variant(self, inputs_param, process="Echo",
mock_exec_func = None
for mock_exec_proc in mocked_execute_celery(func_execute_task=mock_exec_func):
stack_exec.enter_context(mock_exec_proc)
result = mocked_sub_requests(self.app, self.client.execute, self.test_process[process],
inputs=inputs_param, **exec_kwargs)
result = cast(
"OperationResult",
mocked_sub_requests(self.app, self.client.execute, self.test_process[process],
inputs=inputs_param, **exec_kwargs)
)
if expect_success:
assert result.success, result.message + (result.text if result.text else "")
assert "jobID" in result.body
assert "processID" in result.body
assert "status" in result.body
assert "location" in result.body
assert result.body["processID"] == self.test_process[process]
assert result.body["status"] == Status.ACCEPTED
assert result.body["status"] == expect_status or Status.ACCEPTED
assert result.body["location"] == result.headers["Location"]
assert "undefined" not in result.message
else:
Expand Down Expand Up @@ -722,6 +737,86 @@ def test_execute_inputs_representation_literal_schema_auto_resolve_vault(self):
]:
self.run_execute_inputs_with_vault_file(input_data, "CatFile", preload=False, embed=True)

def test_execute_trigger(self):
result = self.run_execute_inputs_schema_variant(
"Execute_Echo_cwl_schema.yml",
preload=True,
pending=True, # this is the parameter of interest for this test
expect_status=Status.CREATED,
)
assert result.success
assert result.message == (
"Job successfully submitted for creation. "
"Waiting on trigger request to being execution."
)
job_id = result.body["jobID"]

# technically, trigger only need to submit the job to the execution queue
# however, because we do not have an actual celery worker queue configured in tests, mock the execution inline
# the response will be as if we only "accepted" the submission, but the job will be completed for next steps
with contextlib.ExitStack() as stack_exec:
for mock_exec_proc in mocked_execute_celery():
stack_exec.enter_context(mock_exec_proc)
result = mocked_sub_requests(self.app, self.client.trigger_job, job_id)

assert result.success
assert result.code == 202
result = mocked_sub_requests(self.app, self.client.monitor, job_id, timeout=5, interval=1)
assert result.success

result = mocked_sub_requests(self.app, self.client.results, job_id)
assert result.success

output = result.body["output"]["href"]
output = map_wps_output_location(output, self.settings, exists=True)
assert os.path.isfile(output)
with open(output, mode="r", encoding="utf-8") as out_file:
out_data = out_file.read().strip()
assert out_data == "Test message"

def test_update_job(self):
result = self.run_execute_inputs_schema_variant(
"Execute_Echo_cwl_schema.yml",
preload=True,
pending=True, # pre-requirement for updating job is that it must not already be queued/running
expect_status=Status.CREATED,
)
assert result.success
assert result.message == (
"Job successfully submitted for creation. "
"Waiting on trigger request to being execution."
)
job_id = result.body["jobID"]

result = mocked_sub_requests(self.app, self.client.status, job_id)
assert result.success
assert "title" not in result.body

result = mocked_sub_requests(
self.app,
self.client.update_job,
job_id,
title="Random Title",
headers={"Prefer": f"return={ExecuteReturnPreference.REPRESENTATION}"},
inputs={"message": "new message"},
output_filter={"output": {}},
output_context="test",
subscribers={"successUri": "https://example.com"},
)
assert result.success
assert result.code == 204
assert result.body is None

result = mocked_sub_requests(self.app, self.client.status, job_id)
assert result.success
assert result.body["title"] == "Random Title"

result = mocked_sub_requests(self.app, self.client.inputs, job_id)
assert result.success
assert result.body["inputs"] == {"message": "new message"}
assert result.body["outputs"] == {"output": {}}
assert result.body["headers"]["Prefer"] == f"return={ExecuteReturnPreference.REPRESENTATION}; respond-async"

@mocked_dismiss_process()
def test_dismiss(self):
for status in [Status.ACCEPTED, Status.FAILED, Status.RUNNING, Status.SUCCEEDED]:
Expand Down Expand Up @@ -2375,7 +2470,7 @@ def auth_view(request):

def proxy_view(request):
# type: (AnyRequestType) -> AnyResponseType
auth = request.headers.get("Authorization") # should be added by a auth-handler called inline of operation
auth = request.headers.get("Authorization") # should be added by an auth-handler called inline of operation
if not auth:
return HTTPUnauthorized()
token = auth.split(" ")[-1]
Expand Down
2 changes: 1 addition & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ def read(self, chuck_size=None): # noqa # E811, parameter not used, but must b


def mocked_sub_requests(app, # type: TestApp
method_function, # type: Union[AnyRequestMethod, Callable[[Any], MockReturnType]]
method_function, # type: Union[AnyRequestMethod, Callable[[Any, ...], MockReturnType]]
*args, # type: Any
only_local=False, # type: bool
**kwargs, # type: Any
Expand Down
71 changes: 71 additions & 0 deletions tests/wps_restapi/test_colander_extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -1528,3 +1528,74 @@ def test_none_type_schema():
node = ce.ExtendedSchemaNode(ce.NoneType(), title="test-null")
schema = ce.NoneTypeConverter(None).convert_type(node)
assert schema == {"type": "null", "title": "test-null"}


def test_bind_keyword_schema():
"""
Test binding feature extended to custom keyword schemas.
.. seealso::
- https://docs.pylonsproject.org/projects/colander/en/latest/binding.html
"""
@colander.deferred
def get_title(node, kw): # noqa
return Field(name="title", default="Field!")

@colander.deferred
def get_missing_dynamic(node, kw): # noqa
if kw.get("missing"):
return colander.drop
return colander.required

class Field(ce.ExtendedSchemaNode):
schema_type = ce.ExtendedString

class Map(ce.ExtendedMappingSchema):
name_dont_care = get_title
other = Field()

class OneOf(ce.OneOfKeywordSchema):
_one_of = [Map()]

def after_bind(self, node, kw):
return

class MappingFieldDeferred(ce.ExtendedMappingSchema):
field = OneOf()

class OneOfDeferred(ce.OneOfKeywordSchema):
_one_of = [get_title]

class MappingKeywordDeferred(ce.ExtendedMappingSchema):
field = OneOfDeferred()

class PropertyKeywordDeferred(ce.ExtendedMappingSchema):
name = OneOf(missing=get_missing_dynamic)

schema = MappingFieldDeferred()
result = schema.deserialize({"field": {"other": "normal"}})
assert result == {"field": {"other": "normal"}}

schema.bind(**{})
result = schema.deserialize({"field": {"other": "normal"}})
assert result == {"field": {"title": "Field!", "other": "normal"}}

schema = MappingKeywordDeferred()
with pytest.raises(ce.ConversionTypeError):
# no bind applied not supported if directly under keyword
schema.deserialize({"field": ""})

schema.bind(**{})
result = schema.deserialize({"field": ""})
assert result == {"field": "Field!"}

schema = PropertyKeywordDeferred()
result = schema.deserialize({"name": {"other": "normal"}})
assert result == {"name": {"other": "normal", "title": "Field!"}} # normal behavior

with pytest.raises(colander.Invalid):
schema.deserialize({}) # 'missing' property by default set as required

schema = schema.bind(missing=True)
result = schema.deserialize({}) # allowed because dynamic bind applied missing drop
assert result == {}
Loading

0 comments on commit af81111

Please sign in to comment.