diff --git a/CHANGES.rst b/CHANGES.rst index 8d95ddc5d..481c34a23 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -13,6 +13,10 @@ Changes: - Add reference link to ReadTheDocs URL of `Weaver` in API landing page. - Add references to `OGC-API Processes` requirements and recommendations for eventual conformance listing (relates to `#231 `_). +- Add ``datetime`` query parameter for job searches queries + (relates to `#236 `_). +- Add ``limit`` query parameter validation and integration for jobs in retrieve queries + (relates to `#237 `_). Fixes: ------ diff --git a/tests/wps_restapi/test_jobs.py b/tests/wps_restapi/test_jobs.py index e7d967da6..86f3e6b66 100644 --- a/tests/wps_restapi/test_jobs.py +++ b/tests/wps_restapi/test_jobs.py @@ -3,12 +3,14 @@ import unittest import warnings from collections import OrderedDict +from datetime import date from distutils.version import LooseVersion from typing import TYPE_CHECKING import mock import pyramid.testing import pytest +from dateutil import parser as dateparser from tests.utils import ( get_module_version, @@ -35,6 +37,11 @@ from weaver.visibility import VISIBILITY_PRIVATE, VISIBILITY_PUBLIC from weaver.warning import TimeZoneInfoAlreadySetWarning from weaver.wps_restapi import swagger_definitions as sd +from weaver.wps_restapi.swagger_definitions import ( + DATETIME_INTERVAL_CLOSED_SYMBOL, + DATETIME_INTERVAL_OPEN_END_SYMBOL, + DATETIME_INTERVAL_OPEN_START_SYMBOL +) if TYPE_CHECKING: from typing import Iterable, List, Tuple, Union @@ -51,6 +58,7 @@ def setUpClass(cls): cls.config = setup_config_with_mongodb(settings=settings) cls.app = get_test_weaver_app(config=cls.config) cls.json_headers = {"Accept": CONTENT_TYPE_APP_JSON, "Content-Type": CONTENT_TYPE_APP_JSON} + cls.datetime_interval = cls.generate_test_datetimes() @classmethod def tearDownClass(cls): @@ -94,22 +102,25 @@ def setUp(self): self.make_job(task_id="4444-4444-4444-4444", process=self.process_public.identifier, service=None, user_id=self.user_admin_id, status=STATUS_FAILED, progress=55, access=VISIBILITY_PRIVATE) # job public/private service/process combinations - self.make_job(task_id="5555-5555-5555-5555", - process=self.process_public.identifier, service=self.service_public.name, + self.make_job(task_id="5555-5555-5555-5555", process=self.process_public.identifier, + service=self.service_public.name, created=self.datetime_interval[0], user_id=self.user_editor1_id, status=STATUS_FAILED, progress=99, access=VISIBILITY_PUBLIC) - self.make_job(task_id="6666-6666-6666-6666", - process=self.process_private.identifier, service=self.service_public.name, + self.make_job(task_id="6666-6666-6666-6666", process=self.process_private.identifier, + service=self.service_public.name, created=self.datetime_interval[1], user_id=self.user_editor1_id, status=STATUS_FAILED, progress=99, access=VISIBILITY_PUBLIC) - self.make_job(task_id="7777-7777-7777-7777", - process=self.process_public.identifier, service=self.service_private.name, + self.make_job(task_id="7777-7777-7777-7777", process=self.process_public.identifier, + service=self.service_private.name, created=self.datetime_interval[2], user_id=self.user_editor1_id, status=STATUS_FAILED, progress=99, access=VISIBILITY_PUBLIC) - self.make_job(task_id="8888-8888-8888-8888", - process=self.process_private.identifier, service=self.service_private.name, + self.make_job(task_id="8888-8888-8888-8888", process=self.process_private.identifier, + service=self.service_private.name, created=self.datetime_interval[3], user_id=self.user_editor1_id, status=STATUS_FAILED, progress=99, access=VISIBILITY_PUBLIC) - def make_job(self, task_id, process, service, user_id, status, progress, access): + def make_job(self, task_id, process, service, user_id, status, progress, access, created=None): + + created = dateparser.parse(created) if created else None + job = self.job_store.save_job(task_id=task_id, process=process, service=service, is_workflow=False, - user_id=user_id, execute_async=True, access=access) + user_id=user_id, execute_async=True, access=access, created=created) job.status = status if status in JOB_STATUS_CATEGORIES[STATUS_CATEGORY_FINISHED]: job.mark_finished() @@ -144,6 +155,15 @@ def get_job_request_auth_mock(self, user_id): mock.patch("{}.has_permission".format(authz_policy_class), return_value=is_admin), ]) + @staticmethod + def generate_test_datetimes(): + # type: () -> List[str] + """ + Generates a list of dummy datetimes for testing. + """ + year = date.today().year + 1 + return ["{}-0{}-02T03:32:38.487000+00:00".format(year, month) for month in range(1, 5)] + @staticmethod def check_job_format(job): assert isinstance(job, dict) @@ -503,3 +523,162 @@ def filter_service(jobs): # type: (Iterable[Job]) -> List[Job] job_match = all(job in job_ids for job in resp.json["jobs"]) test_values = dict(path=path, access=access, user_id=user_id) assert job_match, self.message_with_jobs_diffs(resp.json["jobs"], job_ids, test_values, index=i) + + def test_jobs_list_with_limit_api(self): + """ + .. seealso:: + - `/req/collections/rc-limit-response + `_ + """ + limit_parameter = 20 + path = get_path_kvp(sd.jobs_service.path, limit=limit_parameter) + resp = self.app.get(path, headers=self.json_headers) + assert resp.status_code == 200 + assert resp.content_type == CONTENT_TYPE_APP_JSON + assert "limit" in resp.json and isinstance(resp.json["limit"], int) + assert resp.json["limit"] == limit_parameter + assert len(resp.json["jobs"]) <= limit_parameter + + def test_jobs_list_with_limit_openapi_schema(self): + """ + .. seealso:: + - `/req/collections/rc-limit-response + `_ + """ + resp = self.app.get(sd.jobs_service.path, headers=self.json_headers) + assert resp.status_code == 200 + assert resp.content_type == CONTENT_TYPE_APP_JSON + assert "limit" in resp.json and isinstance(resp.json["limit"], int) + assert len(resp.json["jobs"]) <= resp.json["limit"] + + def test_not_required_fields(self): + uri = sd.openapi_json_service.path + resp = self.app.get(uri, headers=self.json_headers) + assert not resp.json["parameters"]["page"]["required"] + assert not resp.json["parameters"]["limit"]["required"] + + def test_jobs_datetime_before(self): + """ + .. seealso:: + - `/req/collections/rc-time-collections-response + `_ + """ + datetime_before = DATETIME_INTERVAL_OPEN_START_SYMBOL + self.datetime_interval[0] + path = get_path_kvp(sd.jobs_service.path, datetime=datetime_before) + resp = self.app.get(path, headers=self.json_headers) + assert resp.status_code == 200 + assert resp.content_type == CONTENT_TYPE_APP_JSON + assert len(resp.json["jobs"]) == 4 + for job in resp.json["jobs"]: + base_uri = sd.jobs_service.path + "/{}".format(job) + path = get_path_kvp(base_uri) + resp = self.app.get(path, headers=self.json_headers) + assert resp.status_code == 200 + assert resp.content_type == CONTENT_TYPE_APP_JSON + assert dateparser.parse(resp.json["created"]) <= dateparser.parse( + datetime_before.replace(DATETIME_INTERVAL_OPEN_START_SYMBOL, "")) + + def test_jobs_datetime_after(self): + """ + .. seealso:: + - `/req/collections/rc-time-collections-response + `_ + """ + datetime_after = str(self.datetime_interval[2] + DATETIME_INTERVAL_OPEN_END_SYMBOL) + path = get_path_kvp(sd.jobs_service.path, datetime=datetime_after) + resp = self.app.get(path, headers=self.json_headers) + assert resp.status_code == 200 + assert resp.content_type == CONTENT_TYPE_APP_JSON + assert len(resp.json["jobs"]) == 2 + for job in resp.json["jobs"]: + base_uri = sd.jobs_service.path + "/{}".format(job) + path = get_path_kvp(base_uri) + resp = self.app.get(path, headers=self.json_headers) + assert resp.status_code == 200 + assert resp.content_type == CONTENT_TYPE_APP_JSON + assert dateparser.parse(resp.json["created"]) >= dateparser.parse( + datetime_after.replace(DATETIME_INTERVAL_OPEN_END_SYMBOL, "")) + + def test_jobs_datetime_interval(self): + """ + .. seealso:: + - `/req/collections/rc-time-collections-response + `_ + """ + datetime_interval = self.datetime_interval[1] + DATETIME_INTERVAL_CLOSED_SYMBOL + self.datetime_interval[3] + path = get_path_kvp(sd.jobs_service.path, datetime=datetime_interval) + resp = self.app.get(path, headers=self.json_headers) + assert resp.status_code == 200 + assert resp.content_type == CONTENT_TYPE_APP_JSON + + datetime_after, datetime_before = datetime_interval.split(DATETIME_INTERVAL_CLOSED_SYMBOL) + assert len(resp.json["jobs"]) == 3 + for job in resp.json["jobs"]: + base_uri = sd.jobs_service.path + "/{}".format(job) + path = get_path_kvp(base_uri) + resp = self.app.get(path, headers=self.json_headers) + assert resp.status_code == 200 + assert resp.content_type == CONTENT_TYPE_APP_JSON + assert dateparser.parse(resp.json["created"]) >= dateparser.parse(datetime_after) + assert dateparser.parse(resp.json["created"]) <= dateparser.parse(datetime_before) + + def test_jobs_datetime_match(self): + """ + .. seealso:: + - `/req/collections/rc-time-collections-response + `_ + """ + datetime_match = self.datetime_interval[1] + path = get_path_kvp(sd.jobs_service.path, datetime=datetime_match) + resp = self.app.get(path, headers=self.json_headers) + assert resp.status_code == 200 + assert resp.content_type == CONTENT_TYPE_APP_JSON + assert len(resp.json["jobs"]) == 1 + for job in resp.json["jobs"]: + base_uri = sd.jobs_service.path + "/{}".format(job) + path = get_path_kvp(base_uri) + resp = self.app.get(path, headers=self.json_headers) + assert resp.status_code == 200 + assert resp.content_type == CONTENT_TYPE_APP_JSON + assert dateparser.parse(resp.json["created"]) == dateparser.parse(datetime_match) + + def test_jobs_datetime_invalid(self): + """ + .. seealso:: + - `/req/collections/rc-time-collections-response + `_ + + datetime_invalid is not formated against the rfc3339 datetime format, + for more details refer to https://datatracker.ietf.org/doc/html/rfc3339#section-5.6 + """ + datetime_invalid = "2022-31-12 23:59:59" + path = get_path_kvp(sd.jobs_service.path, datetime=datetime_invalid) + resp = self.app.get(path, headers=self.json_headers, expect_errors=True) + assert resp.status_code == 422 + + def test_jobs_datetime_interval_invalid(self): + """ + .. seealso:: + - `/req/collections/rc-time-collections-response + `_ + + datetime_invalid represents a datetime interval where the limit dates are inverted, + the minimun is greather than the maximum datetime limit + """ + datetime_interval = self.datetime_interval[3] + DATETIME_INTERVAL_CLOSED_SYMBOL + self.datetime_interval[1] + path = get_path_kvp(sd.jobs_service.path, datetime=datetime_interval) + resp = self.app.get(path, headers=self.json_headers, expect_errors=True) + assert resp.status_code == 422 + + def test_jobs_datetime_before_invalid(self): + """ + .. seealso:: + - `/req/collections/rc-time-collections-response + `_ + + datetime_before represents a bad open range datetime interval + """ + datetime_before = "./" + self.datetime_interval[3] + path = get_path_kvp(sd.jobs_service.path, datetime=datetime_before) + resp = self.app.get(path, headers=self.json_headers, expect_errors=True) + assert resp.status_code == 422 diff --git a/weaver/store/base.py b/weaver/store/base.py index d590c4a90..9a5b66877 100644 --- a/weaver/store/base.py +++ b/weaver/store/base.py @@ -2,11 +2,12 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: + import datetime from typing import Any, Dict, List, Optional, Tuple, Union from pyramid.request import Request from pywps import Process as ProcessWPS from weaver.datatype import Bill, Job, Process, Quote, Service - from weaver.typedefs import AnyValue + from weaver.typedefs import AnyValue, DatetimeIntervalType JobListAndCount = Tuple[List[Job], int] JobCategory = Dict[str, Union[AnyValue, Job]] @@ -111,6 +112,7 @@ def save_job(self, access=None, # type: Optional[str] notification_email=None, # type: Optional[str] accept_language=None, # type: Optional[str] + created=None, # type: Optional[datetime.datetime] ): # type: (...) -> Job raise NotImplementedError @@ -145,6 +147,7 @@ def find_jobs(self, sort=None, # type: Optional[str] page=0, # type: int limit=10, # type: int + datetime=None, # type: Optional[DatetimeIntervalType] group_by=None, # type: Optional[Union[str, List[str]]] request=None, # type: Optional[Request] ): # type: (...) -> Union[JobListAndCount, JobCategoriesAndCount] diff --git a/weaver/store/mongodb.py b/weaver/store/mongodb.py index 52931ec3e..9d55f571d 100644 --- a/weaver/store/mongodb.py +++ b/weaver/store/mongodb.py @@ -48,10 +48,11 @@ from weaver.wps.utils import get_wps_url if TYPE_CHECKING: + import datetime from typing import Any, Callable, Dict, List, Optional, Tuple, Union from pymongo.collection import Collection - from weaver.store.base import JobCategoriesAndCount, JobListAndCount + from weaver.store.base import DatetimeIntervalType, JobCategoriesAndCount, JobListAndCount from weaver.typedefs import AnyProcess, AnyProcessType LOGGER = logging.getLogger(__name__) @@ -405,6 +406,7 @@ def save_job(self, access=None, # type: Optional[str] notification_email=None, # type: Optional[str] accept_language=None, # type: Optional[str] + created=None, # type: Optional[datetime.datetime] ): # type: (...) -> Job """ Creates a new :class:`Job` and stores it in mongodb. @@ -422,6 +424,7 @@ def save_job(self, tags.append(EXECUTE_MODE_SYNC) if not access: access = VISIBILITY_PRIVATE + new_job = Job({ "task_id": task_id, "user_id": user_id, @@ -432,7 +435,7 @@ def save_job(self, "execute_async": execute_async, "is_workflow": is_workflow, "is_local": is_local, - "created": now(), + "created": created if created else now(), "tags": list(set(tags)), # remove duplicates "access": access, "notification_email": notification_email, @@ -499,6 +502,7 @@ def find_jobs(self, sort=None, # type: Optional[str] page=0, # type: int limit=10, # type: int + datetime=None, # type: Optional[DatetimeIntervalType] group_by=None, # type: Optional[Union[str, List[str]]] request=None, # type: Optional[Request] ): # type: (...) -> Union[JobListAndCount, JobCategoriesAndCount] @@ -515,6 +519,7 @@ def find_jobs(self, :param sort: field which is used for sorting results (default: creation date, descending). :param page: page number to return when using result paging (only when not using ``group_by``). :param limit: number of jobs per page when using result paging (only when not using ``group_by``). + :param datetime: field used for filtering data by creation date with a given date or interval of date. :param group_by: one or many fields specifying categories to form matching groups of jobs (paging disabled). :returns: (list of jobs matching paging OR list of {categories, list of jobs, count}) AND total of matched job @@ -582,6 +587,20 @@ def find_jobs(self, if service is not None: search_filters["service"] = service + if datetime is not None: + query = {} + + if datetime.get("after", False): + query["$gte"] = datetime["after"] + + if datetime.get("before", False): + query["$lte"] = datetime["before"] + + if datetime.get("match", False): + query = datetime["match"] + + search_filters["created"] = query + if sort is None: sort = SORT_CREATED elif sort == SORT_USER: diff --git a/weaver/typedefs.py b/weaver/typedefs.py index 19df11656..4e5b57acd 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -114,3 +114,8 @@ # update_status(provider, message, progress, status) UpdateStatusPartialFunction = Callable[[str, str, int, AnyStatusType], None] + + # others + DatetimeIntervalType = TypedDict("DatetimeIntervalType", + {"before": str, "after": str, + "match": str, }, total=False) diff --git a/weaver/wps_restapi/api.py b/weaver/wps_restapi/api.py index 40010fc67..71994c517 100644 --- a/weaver/wps_restapi/api.py +++ b/weaver/wps_restapi/api.py @@ -191,14 +191,18 @@ def api_conformance(request): # noqa: F811 # - https://github.com/opengeospatial/ogcapi-processes/tree/master/extensions/workflows/standard/requirements # - https://github.com/opengeospatial/ogcapi-processes/tree/master/extensions/workflows/standard/recommendations + ows_wps1 = "http://schemas.opengis.net/wps/1.0.0" + ows_wps2 = "http://www.opengis.net/spec/WPS/2.0" + ogcapi_common = "https://github.com/opengeospatial/ogcapi-common" + ogcapi_processes = "http://www.opengis.net/spec/ogcapi-processes-1/1.0" conformance = {"conformsTo": [ # "http://www.opengis.net/spec/wfs-1/3.0/req/core", # "http://www.opengis.net/spec/wfs-1/3.0/req/oas30", # "http://www.opengis.net/spec/wfs-1/3.0/req/html", # "http://www.opengis.net/spec/wfs-1/3.0/req/geojson", - "http://schemas.opengis.net/wps/1.0.0/", - "http://schemas.opengis.net/wps/2.0/", - "http://www.opengis.net/spec/WPS/2.0/req/service/binding/rest-json/core", + ows_wps1 + "/", + ows_wps2 + "/", + ows_wps2 + "/req/service/binding/rest-json/core", # "http://www.opengis.net/spec/WPS/2.0/req/service/binding/rest-json/oas30", # "http://www.opengis.net/spec/WPS/2.0/req/service/binding/rest-json/html" "https://github.com/opengeospatial/wps-rest-binding", # old reference for bw-compat @@ -207,21 +211,22 @@ def api_conformance(request): # noqa: F811 # see other references: # https://github.com/crim-ca/weaver/issues/53 # https://htmlpreview.github.io/?https://github.com/opengeospatial/ogcapi-processes/blob/master/docs/18-062.html - "http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/core", - "http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/ogc-process-description", - "http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/json", + ogcapi_processes + "/conf/core", + ogcapi_processes + "/conf/ogc-process-description", + ogcapi_processes + "/conf/json", # FIXME: https://github.com/crim-ca/weaver/issues/210 # "http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/html", - "http://www.opengis.net/spec/ogcapi-processes-1/1.0/req/oas30", # OpenAPI 3.0 - "http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/job-list", + ogcapi_processes + "/req/oas30", # OpenAPI 3.0 + ogcapi_processes + "/conf/job-list", # FIXME: https://github.com/crim-ca/weaver/issues/230 # "http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/callback", # FIXME: https://github.com/crim-ca/weaver/issues/228 # "http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/dismiss", # FIXME: https://github.com/crim-ca/weaver/issues/231 # List all supported requirements, recommendations and abstract tests - "http://www.opengis.net/spec/ogcapi-processes-1/1.0/req/core/process", - + ogcapi_processes + "/req/core/process", + ogcapi_common + "/req/collections/rc-limit-response", + ogcapi_common + "/req/collections/rc-time-collections-response" ]} return HTTPOk(json=conformance) diff --git a/weaver/wps_restapi/jobs/jobs.py b/weaver/wps_restapi/jobs/jobs.py index cc138e183..0ce17527e 100644 --- a/weaver/wps_restapi/jobs/jobs.py +++ b/weaver/wps_restapi/jobs/jobs.py @@ -1,13 +1,21 @@ from typing import TYPE_CHECKING from celery.utils.log import get_task_logger -from pyramid.httpexceptions import HTTPBadRequest, HTTPNotFound, HTTPOk, HTTPPermanentRedirect, HTTPUnauthorized +from colander import Invalid +from pyramid.httpexceptions import ( + HTTPBadRequest, + HTTPNotFound, + HTTPOk, + HTTPPermanentRedirect, + HTTPUnauthorized, + HTTPUnprocessableEntity +) from pyramid.request import Request from pyramid.settings import asbool from pyramid_celery import celery_app as app from notify import encrypt_email -from weaver import sort, status +from weaver import status from weaver.database import get_db from weaver.datatype import Job from weaver.exceptions import ( @@ -27,6 +35,7 @@ from weaver.visibility import VISIBILITY_PUBLIC from weaver.wps.utils import get_wps_output_url from weaver.wps_restapi import swagger_definitions as sd +from weaver.wps_restapi.swagger_definitions import datetime_interval_parser if TYPE_CHECKING: from typing import List, Optional, Tuple, Union @@ -194,44 +203,65 @@ def get_queried_jobs(request): """ Retrieve the list of jobs which can be filtered, sorted, paged and categorized using query parameters. """ + settings = get_settings(request) service, process = validate_service_process(request) - detail = asbool(request.params.get("detail", False)) - page = request.params.get("page", "0") - page = int(page) if str.isnumeric(page) else 0 - limit = request.params.get("limit", "10") - limit = int(limit) if str.isnumeric(limit) else 10 - email = request.params.get("notification_email", None) - filters = { - "page": page, - "limit": limit, - # split by comma and filter empty stings - "tags": list(filter(lambda s: s, request.params.get("tags", "").split(","))), - "access": request.params.get("access", None), - "status": request.params.get("status", None), - "sort": request.params.get("sort", sort.SORT_CREATED), - "notification_email": encrypt_email(email, settings) if email else None, - # service and process can be specified by query (short route) or by path (full route) - "process": process, - "service": service, - } - groups = request.params.get("groups", "") - groups = groups.split(",") if groups else None - store = get_db(request).get_store(StoreJobs) - items, total = store.find_jobs(request=request, group_by=groups, **filters) - body = {"total": total} - def _job_list(jobs): - return [j.json(settings) if detail else j.id for j in jobs] + filters = {**request.params, "process": process, "provider": service} + + filters["detail"] = asbool(request.params.get("detail")) + + if request.params.get("datetime", False): + # replace white space with '+' since request.params replaces '+' with whitespaces when parsing + filters["datetime"] = request.params["datetime"].replace(" ", "+") + + try: + filters = sd.GetJobsQueries().deserialize(filters) + except Invalid as ex: + raise HTTPUnprocessableEntity(json={ + "code": Invalid.__name__, + "description": str(ex) + }) - if groups: - for grouped_jobs in items: - grouped_jobs["jobs"] = _job_list(grouped_jobs["jobs"]) - body.update({"groups": items}) else: - body.update({"jobs": _job_list(items), "page": page, "limit": limit}) - body = sd.GetQueriedJobsSchema().deserialize(body) - return HTTPOk(json=body) + + detail = filters.pop("detail", False) + groups = filters.pop("groups", "").split(",") if filters.get("groups", False) else filters.pop("groups", None) + + filters["tags"] = list(filter(lambda s: s, filters["tags"].split(",") if filters.get("tags", False) else "")) + filters["notification_email"] = ( + encrypt_email(filters["notification_email"], settings) + if filters.get("notification_email", False) else None + ) + filters["datetime"] = datetime_interval_parser(filters["datetime"]) if filters.get("datetime", False) else None + filters["service"] = filters.pop("provider", None) + + if ( + filters["datetime"] + and filters["datetime"].get("before", False) + and filters["datetime"].get("after", False) + and filters["datetime"]["after"] > filters["datetime"]["before"] + ): + raise HTTPUnprocessableEntity(json={ + "code": "InvalidDateFormat", + "description": "Datetime at the start of the interval must be less than the datetime at the end." + }) + + store = get_db(request).get_store(StoreJobs) + items, total = store.find_jobs(request=request, group_by=groups, **filters) + body = {"total": total} + + def _job_list(jobs): + return [j.json(settings) if detail else j.id for j in jobs] + + if groups: + for grouped_jobs in items: + grouped_jobs["jobs"] = _job_list(grouped_jobs["jobs"]) + body.update({"groups": items}) + else: + body.update({"jobs": _job_list(items), "page": filters["page"], "limit": filters["limit"]}) + body = sd.GetQueriedJobsSchema().deserialize(body) + return HTTPOk(json=body) @sd.provider_job_service.get(tags=[sd.TAG_JOBS, sd.TAG_STATUS, sd.TAG_PROVIDERS], renderer=OUTPUT_FORMAT_JSON, diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index dcfebe7a4..ddc8b4bf2 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -10,6 +10,7 @@ import yaml from colander import DateTime, Email, OneOf, Range, Regex, drop, required from cornice import Service +from dateutil import parser as dateparser from weaver import __meta__ from weaver.config import WEAVER_CONFIGURATION_EMS @@ -73,7 +74,7 @@ from weaver.wps_restapi.utils import wps_restapi_base_path if TYPE_CHECKING: - from weaver.typedefs import SettingsType, TypedDict + from weaver.typedefs import DatetimeIntervalType, SettingsType, TypedDict ViewInfo = TypedDict("ViewInfo", {"name": str, "pattern": str}) @@ -110,6 +111,10 @@ OGC_API_REPO_URL = "https://github.com/opengeospatial/ogcapi-processes" OGC_API_SCHEMA_URL = "https://raw.githubusercontent.com/opengeospatial/ogcapi-processes" +DATETIME_INTERVAL_CLOSED_SYMBOL = "/" +DATETIME_INTERVAL_OPEN_START_SYMBOL = "../" +DATETIME_INTERVAL_OPEN_END_SYMBOL = "/.." + ######################################################### # Examples ######################################################### @@ -228,6 +233,25 @@ class URL(ExtendedSchemaNode): format = "url" +class DateTimeInterval(ExtendedSchemaNode): + schema_type = String + description = ( + "DateTime format against OGC-API - Processes, " + "to get values before a certain date-time use '../' before the date-time, " + "to get values after a certain date-time use '/..' after the date-time like the example, " + "to get values between two date-times use '/' between the date-times, " + "to get values with a specific date-time just pass the datetime. " + ) + example = "2022-03-02T03:32:38.487000+00:00/.." + regex_datetime = r"(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d(\.\d+)?(([+-]\d\d:\d\d)|Z)?)" + regex_interval_closed = r"{i}\/{i}".format(i=regex_datetime) + regex_interval_open_start = r"\.\.\/{}".format(regex_datetime) + regex_interval_open_end = r"{}\/\.\.".format(regex_datetime) + + pattern = "^{}|{}|{}|{}$".format(regex_datetime, regex_interval_closed, + regex_interval_open_start, regex_interval_open_end) + + class S3Bucket(ExtendedSchemaNode): schema_type = String description = "S3 bucket shorthand URL representation [s3:////.ext]" @@ -885,6 +909,10 @@ class VisibilityValue(ExtendedSchemaNode): example = VISIBILITY_PUBLIC +class JobAccess(VisibilityValue): + pass + + class Visibility(ExtendedMappingSchema): value = VisibilityValue() @@ -1947,7 +1975,7 @@ class CreatedQuotedJobStatusSchema(CreatedJobStatusSchema): class GetPagingJobsSchema(ExtendedMappingSchema): jobs = JobCollection() - limit = ExtendedSchemaNode(Integer(), default=10) + limit = ExtendedSchemaNode(Integer(), missing=10, default=10, validator=Range(min=0, max=10000)) page = ExtendedSchemaNode(Integer(), validator=Range(min=0)) @@ -2792,12 +2820,15 @@ class GetJobsQueries(ExtendedMappingSchema): groups = ExtendedSchemaNode(String(), description="Comma-separated list of grouping fields with which to list jobs.", default=False, example="process,service", missing=drop) - page = ExtendedSchemaNode(Integer(), missing=drop, default=0, validator=Range(min=0)) - limit = ExtendedSchemaNode(Integer(), missing=drop, default=10) - status = JobStatusEnum(missing=drop) - process = AnyIdentifier(missing=None) + page = ExtendedSchemaNode(Integer(), missing=0, default=0, validator=Range(min=0)) + limit = ExtendedSchemaNode(Integer(), missing=10, default=10, validator=Range(min=0, max=10000)) + datetime = DateTimeInterval(missing=drop, default=None) + status = JobStatusEnum(missing=drop, default=None) + process = AnyIdentifier(missing=drop) provider = ExtendedSchemaNode(String(), missing=drop, default=None) sort = JobSortEnum(missing=drop) + access = JobAccess(missing=drop, default=None) + notification_email = ExtendedSchemaNode(String(), missing=drop, validator=Email()) tags = ExtendedSchemaNode(String(), missing=drop, default=None, description="Comma-separated values of tags assigned to jobs") @@ -2845,7 +2876,7 @@ class ProcessQuoteEndpoint(ProcessPath, QuotePath): class GetQuotesQueries(ExtendedMappingSchema): page = ExtendedSchemaNode(Integer(), missing=drop, default=0) - limit = ExtendedSchemaNode(Integer(), missing=drop, default=10) + limit = ExtendedSchemaNode(Integer(), missing=10, default=10, validator=Range(min=0, max=10000)) process = AnyIdentifier(missing=None) sort = QuoteSortEnum(missing=drop) @@ -2935,6 +2966,12 @@ class ErrorJsonResponseBodySchema(ExtendedMappingSchema): exception = OWSExceptionResponse(missing=drop) +class UnprocessableEntityResponseSchema(ExtendedMappingSchema): + description = "Wrong format of given parameters." + header = ResponseHeaders() + body = ErrorJsonResponseBodySchema() + + class ForbiddenProcessAccessResponseSchema(ExtendedMappingSchema): description = "Referenced process is not accessible." header = ResponseHeaders() @@ -3355,6 +3392,7 @@ class OkGetJobLogsResponse(ExtendedMappingSchema): "value": EXAMPLES["jobs_listing.json"] } }), + "422": UnprocessableEntityResponseSchema(), "500": InternalServerErrorResponseSchema(), } get_single_job_status_responses = { @@ -3486,3 +3524,26 @@ def service_api_route_info(service_api, settings): # type: (Service, SettingsType) -> ViewInfo api_base = wps_restapi_base_path(settings) return {"name": service_api.name, "pattern": "{base}{path}".format(base=api_base, path=service_api.path)} + + +def datetime_interval_parser(datetime_interval): + # type: (str) -> DatetimeIntervalType + """This function parses a given datetime or interval into a dictionary that will be easy for database process.""" + parsed_datetime = {} + + if datetime_interval.startswith(DATETIME_INTERVAL_OPEN_START_SYMBOL): + datetime_interval = datetime_interval.replace(DATETIME_INTERVAL_OPEN_START_SYMBOL, "") + parsed_datetime["before"] = dateparser.parse(datetime_interval) + + elif datetime_interval.endswith(DATETIME_INTERVAL_OPEN_END_SYMBOL): + datetime_interval = datetime_interval.replace(DATETIME_INTERVAL_OPEN_END_SYMBOL, "") + parsed_datetime["after"] = dateparser.parse(datetime_interval) + + elif DATETIME_INTERVAL_CLOSED_SYMBOL in datetime_interval: + datetime_interval = datetime_interval.split(DATETIME_INTERVAL_CLOSED_SYMBOL) + parsed_datetime["after"] = dateparser.parse(datetime_interval[0]) + parsed_datetime["before"] = dateparser.parse(datetime_interval[-1]) + else: + parsed_datetime["match"] = dateparser.parse(datetime_interval) + + return parsed_datetime