Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix limit integration (fixes #237) #256

Merged
merged 26 commits into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import uuid
import warnings
from configparser import ConfigParser
from datetime import date
from inspect import isclass
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -549,3 +550,12 @@ def mocked_aws_s3_bucket_test_file(bucket_name, file_name, file_content="Test fi
tmp_file.flush()
s3.upload_file(Bucket=bucket_name, Filename=tmp_file.name, Key=file_name)
return "s3://{}/{}".format(bucket_name, file_name)


def generate_test_datetimes():
# type: () -> List[str]
"""
Generates a list of dummy datetimes for testing.
"""
year = date.today().year + 1
return ["{}-0{}-02T03:32:38.487000+00:00".format(year, month) for month in range(1, 5)]
trapsidanadir marked this conversation as resolved.
Show resolved Hide resolved
117 changes: 107 additions & 10 deletions tests/wps_restapi/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import mock
import pyramid.testing
import pytest
from dateutil import parser as dateparser

from tests.utils import (
generate_test_datetimes,
get_module_version,
get_test_weaver_app,
mocked_process_job_runner,
Expand All @@ -31,6 +33,11 @@
STATUS_FAILED,
STATUS_SUCCEEDED
)
from weaver.store import (
DATETIME_INTERVAL_CLOSED_SYMBOL,
DATETIME_INTERVAL_OPEN_END_SYMBOL,
DATETIME_INTERVAL_OPEN_START_SYMBOL
)
from weaver.utils import get_path_kvp
from weaver.visibility import VISIBILITY_PRIVATE, VISIBILITY_PUBLIC
from weaver.warning import TimeZoneInfoAlreadySetWarning
Expand All @@ -39,6 +46,8 @@
if TYPE_CHECKING:
from typing import Iterable, List, Tuple, Union

TEST_DATE_INTERVALL = generate_test_datetimes()
fmigneault marked this conversation as resolved.
Show resolved Hide resolved


class WpsRestApiJobsTest(unittest.TestCase):
@classmethod
Expand Down Expand Up @@ -94,22 +103,22 @@ def setUp(self):
self.make_job(task_id="4444-4444-4444-4444", process=self.process_public.identifier, service=None,
user_id=self.user_admin_id, status=STATUS_FAILED, progress=55, access=VISIBILITY_PRIVATE)
# job public/private service/process combinations
self.make_job(task_id="5555-5555-5555-5555",
process=self.process_public.identifier, service=self.service_public.name,
self.make_job(task_id="5555-5555-5555-5555", process=self.process_public.identifier,
service=self.service_public.name, created_date=TEST_DATE_INTERVALL[0],
user_id=self.user_editor1_id, status=STATUS_FAILED, progress=99, access=VISIBILITY_PUBLIC)
self.make_job(task_id="6666-6666-6666-6666",
process=self.process_private.identifier, service=self.service_public.name,
self.make_job(task_id="6666-6666-6666-6666", process=self.process_private.identifier,
service=self.service_public.name, created_date=TEST_DATE_INTERVALL[1],
user_id=self.user_editor1_id, status=STATUS_FAILED, progress=99, access=VISIBILITY_PUBLIC)
self.make_job(task_id="7777-7777-7777-7777",
process=self.process_public.identifier, service=self.service_private.name,
self.make_job(task_id="7777-7777-7777-7777", process=self.process_public.identifier,
service=self.service_private.name, created_date=TEST_DATE_INTERVALL[2],
user_id=self.user_editor1_id, status=STATUS_FAILED, progress=99, access=VISIBILITY_PUBLIC)
self.make_job(task_id="8888-8888-8888-8888",
process=self.process_private.identifier, service=self.service_private.name,
self.make_job(task_id="8888-8888-8888-8888", process=self.process_private.identifier,
service=self.service_private.name, created_date=TEST_DATE_INTERVALL[3],
user_id=self.user_editor1_id, status=STATUS_FAILED, progress=99, access=VISIBILITY_PUBLIC)

def make_job(self, task_id, process, service, user_id, status, progress, access):
def make_job(self, task_id, process, service, user_id, status, progress, access, created_date=None):
job = self.job_store.save_job(task_id=task_id, process=process, service=service, is_workflow=False,
user_id=user_id, execute_async=True, access=access)
user_id=user_id, execute_async=True, access=access, created_date=created_date)
job.status = status
if status in JOB_STATUS_CATEGORIES[STATUS_CATEGORY_FINISHED]:
job.mark_finished()
Expand Down Expand Up @@ -503,3 +512,91 @@ def filter_service(jobs): # type: (Iterable[Job]) -> List[Job]
job_match = all(job in job_ids for job in resp.json["jobs"])
test_values = dict(path=path, access=access, user_id=user_id)
assert job_match, self.message_with_jobs_diffs(resp.json["jobs"], job_ids, test_values, index=i)

def test_jobs_list_with_limit_client(self):
trapsidanadir marked this conversation as resolved.
Show resolved Hide resolved
limit_parameter = 20
path = get_path_kvp(sd.jobs_service.path, limit=limit_parameter)
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON
assert "limit" in resp.json and isinstance(resp.json["limit"], int)
assert resp.json["limit"] == limit_parameter
assert len(resp.json["jobs"]) <= limit_parameter

def test_jobs_list_with_limit_api(self):
trapsidanadir marked this conversation as resolved.
Show resolved Hide resolved
resp = self.app.get(sd.jobs_service.path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON
assert "limit" in resp.json and isinstance(resp.json["limit"], int)
assert len(resp.json["jobs"]) <= resp.json["limit"]

def test_not_required_fields(self):
uri = sd.openapi_json_service.path
resp = self.app.get(uri, headers=self.json_headers)
assert not resp.json["parameters"]["page"]["required"]
assert not resp.json["parameters"]["limit"]["required"]

def test_jobs_datetime_before(self):
datetime_before = DATETIME_INTERVAL_OPEN_START_SYMBOL+TEST_DATE_INTERVALL[0]
trapsidanadir marked this conversation as resolved.
Show resolved Hide resolved
path = get_path_kvp(sd.jobs_service.path, datetime_interval=datetime_before)
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON
assert len(resp.json["jobs"]) == 4
for job in resp.json["jobs"]:
base_uri = sd.jobs_service.path + "/{}".format(job)
path = get_path_kvp(base_uri)
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON
assert dateparser.parse(resp.json["created"]) <= dateparser.parse(
datetime_before.replace(DATETIME_INTERVAL_OPEN_START_SYMBOL, ""))

def test_jobs_datetime_after(self):
datetime_after = str(TEST_DATE_INTERVALL[2]+DATETIME_INTERVAL_OPEN_END_SYMBOL)
trapsidanadir marked this conversation as resolved.
Show resolved Hide resolved
path = get_path_kvp(sd.jobs_service.path, datetime_interval=datetime_after)
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON
assert len(resp.json["jobs"]) == 2
for job in resp.json["jobs"]:
base_uri = sd.jobs_service.path + "/{}".format(job)
path = get_path_kvp(base_uri)
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON
assert dateparser.parse(resp.json["created"]) >= dateparser.parse(
datetime_after.replace(DATETIME_INTERVAL_OPEN_END_SYMBOL, ""))

def test_jobs_datetime_interval(self):
datetime_interval = TEST_DATE_INTERVALL[1]+DATETIME_INTERVAL_CLOSED_SYMBOL+TEST_DATE_INTERVALL[3]
fmigneault marked this conversation as resolved.
Show resolved Hide resolved
path = get_path_kvp(sd.jobs_service.path, datetime_interval=datetime_interval)
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON

datetime_after, datetime_before = datetime_interval.split(DATETIME_INTERVAL_CLOSED_SYMBOL)
assert len(resp.json["jobs"]) == 3
for job in resp.json["jobs"]:
base_uri = sd.jobs_service.path + "/{}".format(job)
path = get_path_kvp(base_uri)
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON
assert dateparser.parse(resp.json["created"]) >= dateparser.parse(datetime_after)
assert dateparser.parse(resp.json["created"]) <= dateparser.parse(datetime_before)

def test_jobs_datetime_match(self):
datetime_match = TEST_DATE_INTERVALL[1]
path = get_path_kvp(sd.jobs_service.path, datetime_interval=datetime_match)
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON
assert len(resp.json["jobs"]) == 1
for job in resp.json["jobs"]:
base_uri = sd.jobs_service.path + "/{}".format(job)
path = get_path_kvp(base_uri)
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.content_type == CONTENT_TYPE_APP_JSON
assert dateparser.parse(resp.json["created"]) == dateparser.parse(datetime_match)
fmigneault marked this conversation as resolved.
Show resolved Hide resolved
27 changes: 27 additions & 0 deletions weaver/store/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from dateutil import parser as dateparser

DATETIME_INTERVAL_CLOSED_SYMBOL = "/"
DATETIME_INTERVAL_OPEN_START_SYMBOL = "../"
DATETIME_INTERVAL_OPEN_END_SYMBOL = "/.."
fmigneault marked this conversation as resolved.
Show resolved Hide resolved


def datetime_interval_parser(datetime_interval):

parsed_datetime = {}

if datetime_interval.startswith(DATETIME_INTERVAL_OPEN_START_SYMBOL):
datetime_interval = datetime_interval.replace(DATETIME_INTERVAL_OPEN_START_SYMBOL, "")
parsed_datetime["before"] = dateparser.parse(datetime_interval)

elif datetime_interval.endswith(DATETIME_INTERVAL_OPEN_END_SYMBOL):
datetime_interval = datetime_interval.replace(DATETIME_INTERVAL_OPEN_END_SYMBOL, "")
parsed_datetime["after"] = dateparser.parse(datetime_interval)

elif DATETIME_INTERVAL_CLOSED_SYMBOL in datetime_interval:
datetime_interval = datetime_interval.split(DATETIME_INTERVAL_CLOSED_SYMBOL)
parsed_datetime["after"] = dateparser.parse(datetime_interval[0])
parsed_datetime["before"] = dateparser.parse(datetime_interval[-1])
else:
parsed_datetime["match"] = dateparser.parse(datetime_interval)

return parsed_datetime
fmigneault marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions weaver/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def save_job(self,
access=None, # type: Optional[str]
notification_email=None, # type: Optional[str]
accept_language=None, # type: Optional[str]
created_date=None, # type: Optional[str]
): # type: (...) -> Job
raise NotImplementedError

Expand Down Expand Up @@ -145,6 +146,7 @@ def find_jobs(self,
sort=None, # type: Optional[str]
page=0, # type: int
limit=10, # type: int
datetime_interval=None, # type: Optional[str]
group_by=None, # type: Optional[Union[str, List[str]]]
request=None, # type: Optional[Request]
): # type: (...) -> Union[JobListAndCount, JobCategoriesAndCount]
Expand Down
29 changes: 28 additions & 1 deletion weaver/store/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import TYPE_CHECKING

import pymongo
from dateutil import parser as dateparser
from pymongo import ASCENDING, DESCENDING
from pymongo.errors import DuplicateKeyError
from pyramid.request import Request
Expand Down Expand Up @@ -42,6 +43,7 @@
SORT_USER
)
from weaver.status import JOB_STATUS_CATEGORIES, STATUS_ACCEPTED, map_status
from weaver.store import datetime_interval_parser
from weaver.store.base import StoreBills, StoreJobs, StoreProcesses, StoreQuotes, StoreServices
from weaver.utils import get_base_url, get_sane_name, get_weaver_url, islambda, now
from weaver.visibility import VISIBILITY_PRIVATE, VISIBILITY_PUBLIC, VISIBILITY_VALUES
Expand Down Expand Up @@ -405,6 +407,7 @@ def save_job(self,
access=None, # type: Optional[str]
notification_email=None, # type: Optional[str]
accept_language=None, # type: Optional[str]
created_date=None, # type: Optional[str]
fmigneault marked this conversation as resolved.
Show resolved Hide resolved
): # type: (...) -> Job
"""
Creates a new :class:`Job` and stores it in mongodb.
Expand All @@ -422,6 +425,9 @@ def save_job(self,
tags.append(EXECUTE_MODE_SYNC)
if not access:
access = VISIBILITY_PRIVATE

created = dateparser.parse(created_date) if created_date else now()
fmigneault marked this conversation as resolved.
Show resolved Hide resolved

new_job = Job({
"task_id": task_id,
"user_id": user_id,
Expand All @@ -432,7 +438,7 @@ def save_job(self,
"execute_async": execute_async,
"is_workflow": is_workflow,
"is_local": is_local,
"created": now(),
"created": created,
"tags": list(set(tags)), # remove duplicates
"access": access,
"notification_email": notification_email,
Expand Down Expand Up @@ -499,6 +505,7 @@ def find_jobs(self,
sort=None, # type: Optional[str]
page=0, # type: int
limit=10, # type: int
datetime_interval=None, # type: Optional[str]
group_by=None, # type: Optional[Union[str, List[str]]]
request=None, # type: Optional[Request]
): # type: (...) -> Union[JobListAndCount, JobCategoriesAndCount]
Expand All @@ -515,6 +522,7 @@ def find_jobs(self,
:param sort: field which is used for sorting results (default: creation date, descending).
:param page: page number to return when using result paging (only when not using ``group_by``).
:param limit: number of jobs per page when using result paging (only when not using ``group_by``).
:param datetime_interval: field used for filtering data by creation date with a given date or interval of date.
:param group_by: one or many fields specifying categories to form matching groups of jobs (paging disabled).

:returns: (list of jobs matching paging OR list of {categories, list of jobs, count}) AND total of matched job
Expand Down Expand Up @@ -582,6 +590,25 @@ def find_jobs(self,
if service is not None:
search_filters["service"] = service

if datetime_interval is not None:
datetime_interval = datetime_interval_parser(datetime_interval)
query = {}

try:
if datetime_interval.get("after", False):
query["$gte"] = datetime_interval["after"]

if datetime_interval.get("before", False):
query["$lte"] = datetime_interval["before"]

if datetime_interval.get("match", False):
query = datetime_interval["match"]
trapsidanadir marked this conversation as resolved.
Show resolved Hide resolved

except Exception as ex:
raise JobRegistrationError("Error occurred during datetime job filtering: [{}]".format(repr(ex)))
trapsidanadir marked this conversation as resolved.
Show resolved Hide resolved

search_filters["created"] = query
fmigneault marked this conversation as resolved.
Show resolved Hide resolved

if sort is None:
sort = SORT_CREATED
elif sort == SORT_USER:
Expand Down
51 changes: 30 additions & 21 deletions weaver/wps_restapi/jobs/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pyramid_celery import celery_app as app

from notify import encrypt_email
from weaver import sort, status
from weaver import status
from weaver.database import get_db
from weaver.datatype import Job
from weaver.exceptions import (
Expand Down Expand Up @@ -194,28 +194,37 @@ def get_queried_jobs(request):
"""
Retrieve the list of jobs which can be filtered, sorted, paged and categorized using query parameters.
"""

settings = get_settings(request)
service, process = validate_service_process(request)
detail = asbool(request.params.get("detail", False))
page = request.params.get("page", "0")
page = int(page) if str.isnumeric(page) else 0
limit = request.params.get("limit", "10")
limit = int(limit) if str.isnumeric(limit) else 10
email = request.params.get("notification_email", None)
filters = {
"page": page,
"limit": limit,
# split by comma and filter empty stings
"tags": list(filter(lambda s: s, request.params.get("tags", "").split(","))),
"access": request.params.get("access", None),
"status": request.params.get("status", None),
"sort": request.params.get("sort", sort.SORT_CREATED),
"notification_email": encrypt_email(email, settings) if email else None,
# service and process can be specified by query (short route) or by path (full route)
"process": process,
"service": service,
}
groups = request.params.get("groups", "")

filters = {**request.params, "process": process}

filters["detail"] = asbool(request.params.get("detail"))

if request.params.get("notification_email", False):
filters["notification_email"] = request.params["notification_email"]
trapsidanadir marked this conversation as resolved.
Show resolved Hide resolved

if request.params.get("datetime_interval", False):
filters["datetime_interval"] = request.params["datetime_interval"].replace(" ", "+")
trapsidanadir marked this conversation as resolved.
Show resolved Hide resolved

filters = sd.GetJobsQueries().deserialize(filters)

limit = filters["limit"]
page = filters["page"]
detail = asbool(filters.get("detail", False))
groups = filters["groups"]
fmigneault marked this conversation as resolved.
Show resolved Hide resolved

filters["tags"] = list(filter(lambda s: s, filters.get("tags").split(",")))
filters["notification_email"] = encrypt_email(
filters["notification_email"], settings) if filters.get("notification_email", False) else None
filters["service"] = service
filters["access"] = request.params.get("access", None)
filters["status"] = request.params.get("status", None)
fmigneault marked this conversation as resolved.
Show resolved Hide resolved

del filters["detail"]
del filters["groups"]
trapsidanadir marked this conversation as resolved.
Show resolved Hide resolved

groups = groups.split(",") if groups else None
store = get_db(request).get_store(StoreJobs)
items, total = store.find_jobs(request=request, group_by=groups, **filters)
Expand Down
Loading