Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix limit integration (fixes #237) #256

Merged
merged 26 commits into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ Changes:
- Add reference link to ReadTheDocs URL of `Weaver` in API landing page.
- Add references to `OGC-API Processes` requirements and recommendations for eventual conformance listing
(relates to `#231 <https://github.com/crim-ca/weaver/issues/231>`_).
- Add datetime parameters for job searches queries
(relates to `#236 <https://github.com/crim-ca/weaver/issues/236>`_).
- Add limit validation and integration for jobs in retrieve queries
(relates to `#237 <https://github.com/crim-ca/weaver/issues/237>`_).
trapsidanadir marked this conversation as resolved.
Show resolved Hide resolved

Fixes:
------
Expand Down
199 changes: 189 additions & 10 deletions tests/wps_restapi/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import unittest
import warnings
from collections import OrderedDict
from datetime import date
from distutils.version import LooseVersion
from typing import TYPE_CHECKING

import mock
import pyramid.testing
import pytest
from dateutil import parser as dateparser

from tests.utils import (
get_module_version,
Expand All @@ -35,6 +37,11 @@
from weaver.visibility import VISIBILITY_PRIVATE, VISIBILITY_PUBLIC
from weaver.warning import TimeZoneInfoAlreadySetWarning
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.swagger_definitions import (
DATETIME_INTERVAL_CLOSED_SYMBOL,
DATETIME_INTERVAL_OPEN_END_SYMBOL,
DATETIME_INTERVAL_OPEN_START_SYMBOL
)

if TYPE_CHECKING:
from typing import Iterable, List, Tuple, Union
Expand All @@ -51,6 +58,7 @@ def setUpClass(cls):
cls.config = setup_config_with_mongodb(settings=settings)
cls.app = get_test_weaver_app(config=cls.config)
cls.json_headers = {"Accept": CONTENT_TYPE_APP_JSON, "Content-Type": CONTENT_TYPE_APP_JSON}
cls.datetime_interval = cls.generate_test_datetimes()

@classmethod
def tearDownClass(cls):
Expand Down Expand Up @@ -94,22 +102,25 @@ def setUp(self):
self.make_job(task_id="4444-4444-4444-4444", process=self.process_public.identifier, service=None,
user_id=self.user_admin_id, status=STATUS_FAILED, progress=55, access=VISIBILITY_PRIVATE)
# job public/private service/process combinations
self.make_job(task_id="5555-5555-5555-5555",
process=self.process_public.identifier, service=self.service_public.name,
self.make_job(task_id="5555-5555-5555-5555", process=self.process_public.identifier,
service=self.service_public.name, created=self.datetime_interval[0],
user_id=self.user_editor1_id, status=STATUS_FAILED, progress=99, access=VISIBILITY_PUBLIC)
self.make_job(task_id="6666-6666-6666-6666",
process=self.process_private.identifier, service=self.service_public.name,
self.make_job(task_id="6666-6666-6666-6666", process=self.process_private.identifier,
service=self.service_public.name, created=self.datetime_interval[1],
user_id=self.user_editor1_id, status=STATUS_FAILED, progress=99, access=VISIBILITY_PUBLIC)
self.make_job(task_id="7777-7777-7777-7777",
process=self.process_public.identifier, service=self.service_private.name,
self.make_job(task_id="7777-7777-7777-7777", process=self.process_public.identifier,
service=self.service_private.name, created=self.datetime_interval[2],
user_id=self.user_editor1_id, status=STATUS_FAILED, progress=99, access=VISIBILITY_PUBLIC)
self.make_job(task_id="8888-8888-8888-8888",
process=self.process_private.identifier, service=self.service_private.name,
self.make_job(task_id="8888-8888-8888-8888", process=self.process_private.identifier,
service=self.service_private.name, created=self.datetime_interval[3],
user_id=self.user_editor1_id, status=STATUS_FAILED, progress=99, access=VISIBILITY_PUBLIC)

def make_job(self, task_id, process, service, user_id, status, progress, access):
def make_job(self, task_id, process, service, user_id, status, progress, access, created=None):

created = dateparser.parse(created) if created else None

job = self.job_store.save_job(task_id=task_id, process=process, service=service, is_workflow=False,
user_id=user_id, execute_async=True, access=access)
user_id=user_id, execute_async=True, access=access, created=created)
job.status = status
if status in JOB_STATUS_CATEGORIES[STATUS_CATEGORY_FINISHED]:
job.mark_finished()
Expand Down Expand Up @@ -144,6 +155,15 @@ def get_job_request_auth_mock(self, user_id):
mock.patch("{}.has_permission".format(authz_policy_class), return_value=is_admin),
])

@staticmethod
def generate_test_datetimes():
# type: () -> List[str]
"""
Generates a list of dummy datetimes for testing.
"""
year = date.today().year + 1
return ["{}-0{}-02T03:32:38.487000+00:00".format(year, month) for month in range(1, 5)]

@staticmethod
def check_job_format(job):
assert isinstance(job, dict)
Expand Down Expand Up @@ -503,3 +523,162 @@ def filter_service(jobs): # type: (Iterable[Job]) -> List[Job]
job_match = all(job in job_ids for job in resp.json["jobs"])
test_values = dict(path=path, access=access, user_id=user_id)
assert job_match, self.message_with_jobs_diffs(resp.json["jobs"], job_ids, test_values, index=i)

def test_jobs_list_with_limit_api(self):
"""
.. seealso::
- `/req/collections/rc-limit-response
<https://github.com/opengeospatial/ogcapi-common/blob/master/collections/requirements/collections/REQ_rc-limit-response.adoc>`_
"""
limit_parameter = 20
path = get_path_kvp(sd.jobs_service.path, limit=limit_parameter)
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON
assert "limit" in resp.json and isinstance(resp.json["limit"], int)
assert resp.json["limit"] == limit_parameter
assert len(resp.json["jobs"]) <= limit_parameter

def test_jobs_list_with_limit_openapi_schema(self):
"""
.. seealso::
- `/req/collections/rc-limit-response
<https://github.com/opengeospatial/ogcapi-common/blob/master/collections/requirements/collections/REQ_rc-limit-response.adoc>`_
"""
resp = self.app.get(sd.jobs_service.path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON
assert "limit" in resp.json and isinstance(resp.json["limit"], int)
assert len(resp.json["jobs"]) <= resp.json["limit"]

def test_not_required_fields(self):
uri = sd.openapi_json_service.path
resp = self.app.get(uri, headers=self.json_headers)
assert not resp.json["parameters"]["page"]["required"]
assert not resp.json["parameters"]["limit"]["required"]

def test_jobs_datetime_before(self):
"""
.. seealso::
- `/req/collections/rc-time-collections-response
<https://github.com/opengeospatial/ogcapi-common/blob/master/collections/requirements/collections/REQ_rc-time-collections-response.adoc>`_
"""
datetime_before = DATETIME_INTERVAL_OPEN_START_SYMBOL + self.datetime_interval[0]
path = get_path_kvp(sd.jobs_service.path, datetime=datetime_before)
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON
assert len(resp.json["jobs"]) == 4
for job in resp.json["jobs"]:
base_uri = sd.jobs_service.path + "/{}".format(job)
path = get_path_kvp(base_uri)
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON
assert dateparser.parse(resp.json["created"]) <= dateparser.parse(
datetime_before.replace(DATETIME_INTERVAL_OPEN_START_SYMBOL, ""))

def test_jobs_datetime_after(self):
"""
.. seealso::
- `/req/collections/rc-time-collections-response
<https://github.com/opengeospatial/ogcapi-common/blob/master/collections/requirements/collections/REQ_rc-time-collections-response.adoc>`_
"""
datetime_after = str(self.datetime_interval[2] + DATETIME_INTERVAL_OPEN_END_SYMBOL)
path = get_path_kvp(sd.jobs_service.path, datetime=datetime_after)
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON
assert len(resp.json["jobs"]) == 2
for job in resp.json["jobs"]:
base_uri = sd.jobs_service.path + "/{}".format(job)
path = get_path_kvp(base_uri)
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON
assert dateparser.parse(resp.json["created"]) >= dateparser.parse(
datetime_after.replace(DATETIME_INTERVAL_OPEN_END_SYMBOL, ""))

def test_jobs_datetime_interval(self):
"""
.. seealso::
- `/req/collections/rc-time-collections-response
<https://github.com/opengeospatial/ogcapi-common/blob/master/collections/requirements/collections/REQ_rc-time-collections-response.adoc>`_
"""
datetime_interval = self.datetime_interval[1] + DATETIME_INTERVAL_CLOSED_SYMBOL + self.datetime_interval[3]
path = get_path_kvp(sd.jobs_service.path, datetime=datetime_interval)
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON

datetime_after, datetime_before = datetime_interval.split(DATETIME_INTERVAL_CLOSED_SYMBOL)
assert len(resp.json["jobs"]) == 3
for job in resp.json["jobs"]:
base_uri = sd.jobs_service.path + "/{}".format(job)
path = get_path_kvp(base_uri)
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON
assert dateparser.parse(resp.json["created"]) >= dateparser.parse(datetime_after)
assert dateparser.parse(resp.json["created"]) <= dateparser.parse(datetime_before)

def test_jobs_datetime_match(self):
"""
.. seealso::
- `/req/collections/rc-time-collections-response
<https://github.com/opengeospatial/ogcapi-common/blob/master/collections/requirements/collections/REQ_rc-time-collections-response.adoc>`_
"""
datetime_match = self.datetime_interval[1]
path = get_path_kvp(sd.jobs_service.path, datetime=datetime_match)
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON
assert len(resp.json["jobs"]) == 1
for job in resp.json["jobs"]:
base_uri = sd.jobs_service.path + "/{}".format(job)
path = get_path_kvp(base_uri)
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON
assert dateparser.parse(resp.json["created"]) == dateparser.parse(datetime_match)
fmigneault marked this conversation as resolved.
Show resolved Hide resolved

def test_jobs_datetime_invalid(self):
"""
.. seealso::
- `/req/collections/rc-time-collections-response
<https://github.com/opengeospatial/ogcapi-common/blob/master/collections/requirements/collections/REQ_rc-time-collections-response.adoc>`_

datetime_invalid is not formated against the rfc3339 datetime format,
for more details refer to https://datatracker.ietf.org/doc/html/rfc3339#section-5.6
"""
datetime_invalid = "2022-31-12 23:59:59"
trapsidanadir marked this conversation as resolved.
Show resolved Hide resolved
path = get_path_kvp(sd.jobs_service.path, datetime=datetime_invalid)
resp = self.app.get(path, headers=self.json_headers, expect_errors=True)
assert resp.status_code == 422

def test_jobs_datetime_interval_invalid(self):
"""
.. seealso::
- `/req/collections/rc-time-collections-response
<https://github.com/opengeospatial/ogcapi-common/blob/master/collections/requirements/collections/REQ_rc-time-collections-response.adoc>`_

datetime_invalid represents a datetime interval where the limit dates are inverted,
the minimun is greather than the maximum datetime limit
"""
datetime_interval = self.datetime_interval[3] + DATETIME_INTERVAL_CLOSED_SYMBOL + self.datetime_interval[1]
trapsidanadir marked this conversation as resolved.
Show resolved Hide resolved
path = get_path_kvp(sd.jobs_service.path, datetime=datetime_interval)
resp = self.app.get(path, headers=self.json_headers, expect_errors=True)
assert resp.status_code == 422

def test_jobs_datetime_before_invalid(self):
"""
.. seealso::
- `/req/collections/rc-time-collections-response
<https://github.com/opengeospatial/ogcapi-common/blob/master/collections/requirements/collections/REQ_rc-time-collections-response.adoc>`_

datetime_before represents a bad open range datetime interval
"""
datetime_before = "./" + self.datetime_interval[3]
path = get_path_kvp(sd.jobs_service.path, datetime=datetime_before)
resp = self.app.get(path, headers=self.json_headers, expect_errors=True)
assert resp.status_code == 422
5 changes: 4 additions & 1 deletion weaver/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
from typing import TYPE_CHECKING

if TYPE_CHECKING:
import datetime
from typing import Any, Dict, List, Optional, Tuple, Union
from pyramid.request import Request
from pywps import Process as ProcessWPS
from weaver.datatype import Bill, Job, Process, Quote, Service
from weaver.typedefs import AnyValue
from weaver.typedefs import AnyValue, DatetimeIntervalType

JobListAndCount = Tuple[List[Job], int]
JobCategory = Dict[str, Union[AnyValue, Job]]
Expand Down Expand Up @@ -111,6 +112,7 @@ def save_job(self,
access=None, # type: Optional[str]
notification_email=None, # type: Optional[str]
accept_language=None, # type: Optional[str]
created=None, # type: Optional[datetime.datetime]
): # type: (...) -> Job
raise NotImplementedError

Expand Down Expand Up @@ -145,6 +147,7 @@ def find_jobs(self,
sort=None, # type: Optional[str]
page=0, # type: int
limit=10, # type: int
datetime=None, # type: Optional[DatetimeIntervalType]
trapsidanadir marked this conversation as resolved.
Show resolved Hide resolved
group_by=None, # type: Optional[Union[str, List[str]]]
request=None, # type: Optional[Request]
): # type: (...) -> Union[JobListAndCount, JobCategoriesAndCount]
Expand Down
23 changes: 21 additions & 2 deletions weaver/store/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@
from weaver.wps.utils import get_wps_url

if TYPE_CHECKING:
import datetime
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from pymongo.collection import Collection

from weaver.store.base import JobCategoriesAndCount, JobListAndCount
from weaver.store.base import DatetimeIntervalType, JobCategoriesAndCount, JobListAndCount
from weaver.typedefs import AnyProcess, AnyProcessType

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -405,6 +406,7 @@ def save_job(self,
access=None, # type: Optional[str]
notification_email=None, # type: Optional[str]
accept_language=None, # type: Optional[str]
created=None, # type: Optional[datetime.datetime]
): # type: (...) -> Job
"""
Creates a new :class:`Job` and stores it in mongodb.
Expand All @@ -422,6 +424,7 @@ def save_job(self,
tags.append(EXECUTE_MODE_SYNC)
if not access:
access = VISIBILITY_PRIVATE

new_job = Job({
"task_id": task_id,
"user_id": user_id,
Expand All @@ -432,7 +435,7 @@ def save_job(self,
"execute_async": execute_async,
"is_workflow": is_workflow,
"is_local": is_local,
"created": now(),
"created": created if created else now(),
"tags": list(set(tags)), # remove duplicates
"access": access,
"notification_email": notification_email,
Expand Down Expand Up @@ -499,6 +502,7 @@ def find_jobs(self,
sort=None, # type: Optional[str]
page=0, # type: int
limit=10, # type: int
datetime=None, # type: Optional[DatetimeIntervalType]
group_by=None, # type: Optional[Union[str, List[str]]]
request=None, # type: Optional[Request]
): # type: (...) -> Union[JobListAndCount, JobCategoriesAndCount]
Expand All @@ -515,6 +519,7 @@ def find_jobs(self,
:param sort: field which is used for sorting results (default: creation date, descending).
:param page: page number to return when using result paging (only when not using ``group_by``).
:param limit: number of jobs per page when using result paging (only when not using ``group_by``).
:param datetime: field used for filtering data by creation date with a given date or interval of date.
:param group_by: one or many fields specifying categories to form matching groups of jobs (paging disabled).

:returns: (list of jobs matching paging OR list of {categories, list of jobs, count}) AND total of matched job
Expand Down Expand Up @@ -582,6 +587,20 @@ def find_jobs(self,
if service is not None:
search_filters["service"] = service

if datetime is not None:
query = {}

if datetime.get("after", False):
query["$gte"] = datetime["after"]

if datetime.get("before", False):
query["$lte"] = datetime["before"]

if datetime.get("match", False):
query = datetime["match"]

search_filters["created"] = query
fmigneault marked this conversation as resolved.
Show resolved Hide resolved

if sort is None:
sort = SORT_CREATED
elif sort == SORT_USER:
Expand Down
5 changes: 5 additions & 0 deletions weaver/typedefs.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,8 @@

# update_status(provider, message, progress, status)
UpdateStatusPartialFunction = Callable[[str, str, int, AnyStatusType], None]

# others
DatetimeIntervalType = TypedDict("DatetimeIntervalType",
{"before": str, "after": str,
"match": str, }, total=False)
Loading