diff --git a/taskiq/cli/scheduler/run.py b/taskiq/cli/scheduler/run.py index 9155324d..e44e2da0 100644 --- a/taskiq/cli/scheduler/run.py +++ b/taskiq/cli/scheduler/run.py @@ -2,10 +2,10 @@ import sys from datetime import datetime, timedelta from logging import basicConfig, getLevelName, getLogger -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Union +import pycron import pytz -from pycron import is_now from taskiq.abc.schedule_source import ScheduleSource from taskiq.cli.scheduler.args import SchedulerArgs @@ -73,6 +73,35 @@ async def get_all_schedules( return dict(zip(scheduler.sources, schedules)) +class _CronParseError(Exception): + pass + + +def is_cron_now( + value: str, + offset: Union[str, timedelta, None] = None, +) -> bool: + """Determines whether a cron-like string is valid for the current date and time. + + :param value: cron-like string. + :param offset: cron offset. + """ + now = datetime.now(tz=pytz.UTC) + # If user specified cron offset we apply it. + # If it's timedelta, we simply add the delta to current time. + if offset and isinstance(offset, timedelta): + now += offset + # If timezone was specified as string we convert it timezone + # offset and then apply. + elif offset and isinstance(offset, str): + now = now.astimezone(pytz.timezone(offset)) + + try: + return pycron.is_now(value, now) + except ValueError as e: + raise _CronParseError(f"Cannot parse cron: '{value}'") from e + + def get_task_delay(task: ScheduledTask) -> Optional[int]: """ Get delay of the task in seconds. @@ -82,17 +111,7 @@ def get_task_delay(task: ScheduledTask) -> Optional[int]: """ now = datetime.now(tz=pytz.UTC) if task.cron is not None: - # If user specified cron offset we apply it. - # If it's timedelta, we simply add the delta to current time. - if task.cron_offset and isinstance(task.cron_offset, timedelta): - now += task.cron_offset - # If timezone was specified as string we convert it timzone - # offset and then apply. - elif task.cron_offset and isinstance(task.cron_offset, str): - now = now.astimezone(pytz.timezone(task.cron_offset)) - if is_now(task.cron, now): - return 0 - return None + return 0 if is_cron_now(task.cron, task.cron_offset) else None if task.time is not None: task_time = to_tz_aware(task.time).replace(microsecond=0) if task_time <= now: @@ -103,30 +122,18 @@ def get_task_delay(task: ScheduledTask) -> Optional[int]: return None -async def delayed_send( +async def send( scheduler: TaskiqScheduler, source: ScheduleSource, task: ScheduledTask, - delay: int, ) -> None: """ - Send a task with a delay. - - This function waits for some time and then - sends a task. - - The main idea is that scheduler gathers - tasks every minute and some of them have - specfic time. To respect the time, we calculate - the delay and send the task after some delay. + Send a task. :param scheduler: current scheduler. :param source: source of the task. :param task: task to send. - :param delay: how long to wait. """ - if delay > 0: - await asyncio.sleep(delay) logger.info("Sending task %s.", task.task_name) await scheduler.on_ready(source, task) @@ -142,32 +149,53 @@ async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None: """ loop = asyncio.get_event_loop() running_schedules = set() + + interval_tasks_last_run: "Dict[str, datetime]" = {} + now = datetime.now() + next_minute = now.replace(second=0, microsecond=0) + next_second = now.replace(microsecond=0) + timedelta(seconds=1) + + await asyncio.sleep((next_second - now).total_seconds()) + while True: - # We use this method to correctly sleep for one minute. + # We use this method to correctly sleep. + now = datetime.now() scheduled_tasks = await get_all_schedules(scheduler) for source, task_list in scheduled_tasks.items(): for task in task_list: - try: - task_delay = get_task_delay(task) - except ValueError: - logger.warning( - "Cannot parse cron: %s for task: %s, schedule_id: %s", - task.cron, - task.task_name, - task.schedule_id, - ) - continue - if task_delay is not None: - send_task = loop.create_task( - delayed_send(scheduler, source, task, task_delay), - ) + to_send = False + if task.time is not None and now >= task.time: + to_send = True + elif task.interval is not None: + last_call = interval_tasks_last_run.get(task.task_name) + if ( + last_call is not None + and round((now - last_call).total_seconds()) < task.interval + ): + to_send = False + else: + to_send = True + interval_tasks_last_run[task.task_name] = now + elif now > next_minute and task.cron is not None: + try: + to_send = is_cron_now(task.cron, task.cron_offset) + except _CronParseError: + logger.warning( + "Cannot parse cron: %s for task: %s, schedule_id: %s", + task.cron, + task.task_name, + task.schedule_id, + ) + + if to_send: + send_task = loop.create_task(send(scheduler, source, task)) running_schedules.add(send_task) send_task.add_done_callback(running_schedules.discard) - next_minute = datetime.now().replace(second=0, microsecond=0) + timedelta( - minutes=1, - ) - delay = next_minute - datetime.now() - await asyncio.sleep(delay.total_seconds()) + + now = datetime.now() + next_second = now.replace(microsecond=0) + timedelta(seconds=1) + next_minute = now.replace(second=0, microsecond=0) + timedelta(minutes=1) + await asyncio.sleep((next_second - now).total_seconds()) async def run_scheduler(args: SchedulerArgs) -> None: diff --git a/taskiq/receiver/receiver.py b/taskiq/receiver/receiver.py index afa5d4c9..41e687a6 100644 --- a/taskiq/receiver/receiver.py +++ b/taskiq/receiver/receiver.py @@ -353,7 +353,9 @@ async def prefetcher( """ fetched_tasks: int = 0 iterator = self.broker.listen() - current_message: asyncio.Task[bytes | AckableMessage] = asyncio.create_task( + current_message: asyncio.Task[ + Union[bytes, AckableMessage] + ] = asyncio.create_task( iterator.__anext__(), # type: ignore ) diff --git a/taskiq/schedule_sources/label_based.py b/taskiq/schedule_sources/label_based.py index e9116fd9..3580aea9 100644 --- a/taskiq/schedule_sources/label_based.py +++ b/taskiq/schedule_sources/label_based.py @@ -31,7 +31,7 @@ async def get_schedules(self) -> List["ScheduledTask"]: if task.broker != self.broker: continue for schedule in task.labels.get("schedule", []): - if "cron" not in schedule and "time" not in schedule: + if not {"cron", "interval", "time"} & set(schedule.keys()): continue labels = schedule.get("labels", {}) labels.update(task.labels) @@ -44,6 +44,7 @@ async def get_schedules(self) -> List["ScheduledTask"]: cron=schedule.get("cron"), time=schedule.get("time"), cron_offset=schedule.get("cron_offset"), + interval=schedule.get("interval"), ), ) return schedules diff --git a/taskiq/scheduler/scheduled_task/v1.py b/taskiq/scheduler/scheduled_task/v1.py index 5209f61e..724529db 100644 --- a/taskiq/scheduler/scheduled_task/v1.py +++ b/taskiq/scheduler/scheduled_task/v1.py @@ -16,15 +16,19 @@ class ScheduledTask(BaseModel): cron: Optional[str] = None cron_offset: Optional[Union[str, timedelta]] = None time: Optional[datetime] = None + interval: Optional[int] = None @root_validator(pre=False) # type: ignore @classmethod def __check(cls, values: Dict[str, Any]) -> Dict[str, Any]: """ - This method validates, that either `cron` or `time` field is present. + Validate values. - :raises ValueError: if cron and time are none. + This method validates, that either + `cron`, `interval` or `time` field is present. + + :raises ValueError: if cron, interval and time are none. """ - if values.get("cron") is None and values.get("time") is None: - raise ValueError("Either cron or datetime must be present.") + if all(values.get(v) is None for v in ("cron", "interval", "time")): + raise ValueError("Either cron, interval or datetime must be present.") return values diff --git a/taskiq/scheduler/scheduled_task/v2.py b/taskiq/scheduler/scheduled_task/v2.py index 332dce5d..b8743fd0 100644 --- a/taskiq/scheduler/scheduled_task/v2.py +++ b/taskiq/scheduler/scheduled_task/v2.py @@ -17,14 +17,18 @@ class ScheduledTask(BaseModel): cron: Optional[str] = None cron_offset: Optional[Union[str, timedelta]] = None time: Optional[datetime] = None + interval: Optional[int] = None @model_validator(mode="after") def __check(self) -> Self: """ - This method validates, that either `cron` or `time` field is present. + Validate values. - :raises ValueError: if cron and time are none. + This method validates, that either + `cron`, `interval` or `time` field is present. + + :raises ValueError: if cron, interval and time are none. """ - if self.cron is None and self.time is None: - raise ValueError("Either cron or datetime must be present.") + if all(v is None for v in (self.cron, self.interval, self.time)): + raise ValueError("Either cron, interval or datetime must be present.") return self diff --git a/tests/api/test_scheduler.py b/tests/api/test_scheduler.py index ce09bab6..dcfae5af 100644 --- a/tests/api/test_scheduler.py +++ b/tests/api/test_scheduler.py @@ -15,6 +15,7 @@ async def test_successful() -> None: broker = AsyncQueueBroker() scheduler = TaskiqScheduler(broker, sources=[LabelScheduleSource(broker)]) scheduler_task = asyncio.create_task(run_scheduler_task(scheduler)) + await asyncio.sleep(1) # waiting start @broker.task(schedule=[{"time": datetime.utcnow() - timedelta(seconds=1)}]) def _() -> None: @@ -36,6 +37,7 @@ def _() -> None: ... scheduler_task = asyncio.create_task(run_scheduler_task(scheduler)) + await asyncio.sleep(1) # waiting start msg = await asyncio.wait_for(broker.queue.get(), 0.3) assert msg