Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add intervals for scheduled task #399

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 75 additions & 47 deletions taskiq/cli/scheduler/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import sys
from datetime import datetime, timedelta
from logging import basicConfig, getLevelName, getLogger
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union

import pycron
import pytz
from pycron import is_now

from taskiq.abc.schedule_source import ScheduleSource
from taskiq.cli.scheduler.args import SchedulerArgs
Expand Down Expand Up @@ -73,6 +73,35 @@ async def get_all_schedules(
return dict(zip(scheduler.sources, schedules))


class _CronParseError(Exception):
pass


def is_cron_now(
value: str,
offset: Union[str, timedelta, None] = None,
) -> bool:
"""Determines whether a cron-like string is valid for the current date and time.

:param value: cron-like string.
:param offset: cron offset.
"""
now = datetime.now(tz=pytz.UTC)
# If user specified cron offset we apply it.
# If it's timedelta, we simply add the delta to current time.
if offset and isinstance(offset, timedelta):
now += offset
# If timezone was specified as string we convert it timezone
# offset and then apply.
elif offset and isinstance(offset, str):
now = now.astimezone(pytz.timezone(offset))

try:
return pycron.is_now(value, now)
except ValueError as e:
raise _CronParseError(f"Cannot parse cron: '{value}'") from e


def get_task_delay(task: ScheduledTask) -> Optional[int]:
"""
Get delay of the task in seconds.
Expand All @@ -82,17 +111,7 @@ def get_task_delay(task: ScheduledTask) -> Optional[int]:
"""
now = datetime.now(tz=pytz.UTC)
if task.cron is not None:
# If user specified cron offset we apply it.
# If it's timedelta, we simply add the delta to current time.
if task.cron_offset and isinstance(task.cron_offset, timedelta):
now += task.cron_offset
# If timezone was specified as string we convert it timzone
# offset and then apply.
elif task.cron_offset and isinstance(task.cron_offset, str):
now = now.astimezone(pytz.timezone(task.cron_offset))
if is_now(task.cron, now):
return 0
return None
return 0 if is_cron_now(task.cron, task.cron_offset) else None
if task.time is not None:
task_time = to_tz_aware(task.time).replace(microsecond=0)
if task_time <= now:
Expand All @@ -103,30 +122,18 @@ def get_task_delay(task: ScheduledTask) -> Optional[int]:
return None


async def delayed_send(
async def send(
scheduler: TaskiqScheduler,
source: ScheduleSource,
task: ScheduledTask,
delay: int,
) -> None:
"""
Send a task with a delay.

This function waits for some time and then
sends a task.

The main idea is that scheduler gathers
tasks every minute and some of them have
specfic time. To respect the time, we calculate
the delay and send the task after some delay.
Send a task.

:param scheduler: current scheduler.
:param source: source of the task.
:param task: task to send.
:param delay: how long to wait.
"""
if delay > 0:
await asyncio.sleep(delay)
logger.info("Sending task %s.", task.task_name)
await scheduler.on_ready(source, task)

Expand All @@ -142,32 +149,53 @@ async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None:
"""
loop = asyncio.get_event_loop()
running_schedules = set()

interval_tasks_last_run: "Dict[str, datetime]" = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's a dict[str, datetime] it seems like if you are going to have multiple intervals, you solution will most probably crumble apart. You need to have a nested list (because intervals can have the same value). Each list item will be an interval value and last run.

now = datetime.now()
next_minute = now.replace(second=0, microsecond=0)
next_second = now.replace(microsecond=0) + timedelta(seconds=1)

await asyncio.sleep((next_second - now).total_seconds())

while True:
# We use this method to correctly sleep for one minute.
# We use this method to correctly sleep.
now = datetime.now()
scheduled_tasks = await get_all_schedules(scheduler)
for source, task_list in scheduled_tasks.items():
for task in task_list:
try:
task_delay = get_task_delay(task)
except ValueError:
logger.warning(
"Cannot parse cron: %s for task: %s, schedule_id: %s",
task.cron,
task.task_name,
task.schedule_id,
)
continue
if task_delay is not None:
send_task = loop.create_task(
delayed_send(scheduler, source, task, task_delay),
)
to_send = False
if task.time is not None and now >= task.time:
to_send = True
elif task.interval is not None:
last_call = interval_tasks_last_run.get(task.task_name)
if (
last_call is not None
and round((now - last_call).total_seconds()) < task.interval
):
to_send = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to_send is False by default. You can negate this if condition and only leave the second branch.

else:
to_send = True
interval_tasks_last_run[task.task_name] = now
elif now > next_minute and task.cron is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this condition will work as expected, and most likely, it won't.

Currently, the now value is updated on each iteration of the while loop, and so is next_minute. This could cause an issue, especially if it takes slightly longer than 1 second to obtain all scheduled tasks due to factors like network lag. In that case, there would be no delay between setting the next_minute and updating the now value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Easier solution would be to have a dict of last cron runs and remember which tasks have run in this very minute. Similarly as you've done with interval_tasks.

try:
to_send = is_cron_now(task.cron, task.cron_offset)
except _CronParseError:
logger.warning(
"Cannot parse cron: %s for task: %s, schedule_id: %s",
task.cron,
task.task_name,
task.schedule_id,
)

if to_send:
send_task = loop.create_task(send(scheduler, source, task))
running_schedules.add(send_task)
send_task.add_done_callback(running_schedules.discard)
next_minute = datetime.now().replace(second=0, microsecond=0) + timedelta(
minutes=1,
)
delay = next_minute - datetime.now()
await asyncio.sleep(delay.total_seconds())

now = datetime.now()
next_second = now.replace(microsecond=0) + timedelta(seconds=1)
next_minute = now.replace(second=0, microsecond=0) + timedelta(minutes=1)
await asyncio.sleep((next_second - now).total_seconds())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will give us too many loop invocations, It will be spamming scheduling sources every second. Maybe it would be really nice if we could make a configurable delays between getting tasks from scheduling sources.

By default I'd gather tasks to send every 5 seconds.



async def run_scheduler(args: SchedulerArgs) -> None:
Expand Down
4 changes: 3 additions & 1 deletion taskiq/receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,9 @@ async def prefetcher(
"""
fetched_tasks: int = 0
iterator = self.broker.listen()
current_message: asyncio.Task[bytes | AckableMessage] = asyncio.create_task(
current_message: asyncio.Task[
Union[bytes, AckableMessage]
] = asyncio.create_task(
iterator.__anext__(), # type: ignore
)

Expand Down
3 changes: 2 additions & 1 deletion taskiq/schedule_sources/label_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def get_schedules(self) -> List["ScheduledTask"]:
if task.broker != self.broker:
continue
for schedule in task.labels.get("schedule", []):
if "cron" not in schedule and "time" not in schedule:
if not {"cron", "interval", "time"} & set(schedule.keys()):
continue
labels = schedule.get("labels", {})
labels.update(task.labels)
Expand All @@ -44,6 +44,7 @@ async def get_schedules(self) -> List["ScheduledTask"]:
cron=schedule.get("cron"),
time=schedule.get("time"),
cron_offset=schedule.get("cron_offset"),
interval=schedule.get("interval"),
),
)
return schedules
Expand Down
12 changes: 8 additions & 4 deletions taskiq/scheduler/scheduled_task/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@ class ScheduledTask(BaseModel):
cron: Optional[str] = None
cron_offset: Optional[Union[str, timedelta]] = None
time: Optional[datetime] = None
interval: Optional[int] = None

@root_validator(pre=False) # type: ignore
@classmethod
def __check(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""
This method validates, that either `cron` or `time` field is present.
Validate values.

:raises ValueError: if cron and time are none.
This method validates, that either
`cron`, `interval` or `time` field is present.

:raises ValueError: if cron, interval and time are none.
"""
if values.get("cron") is None and values.get("time") is None:
raise ValueError("Either cron or datetime must be present.")
if all(values.get(v) is None for v in ("cron", "interval", "time")):
raise ValueError("Either cron, interval or datetime must be present.")
return values
12 changes: 8 additions & 4 deletions taskiq/scheduler/scheduled_task/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@ class ScheduledTask(BaseModel):
cron: Optional[str] = None
cron_offset: Optional[Union[str, timedelta]] = None
time: Optional[datetime] = None
interval: Optional[int] = None

@model_validator(mode="after")
def __check(self) -> Self:
"""
This method validates, that either `cron` or `time` field is present.
Validate values.

:raises ValueError: if cron and time are none.
This method validates, that either
`cron`, `interval` or `time` field is present.

:raises ValueError: if cron, interval and time are none.
"""
if self.cron is None and self.time is None:
raise ValueError("Either cron or datetime must be present.")
if all(v is None for v in (self.cron, self.interval, self.time)):
raise ValueError("Either cron, interval or datetime must be present.")
return self
2 changes: 2 additions & 0 deletions tests/api/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async def test_successful() -> None:
broker = AsyncQueueBroker()
scheduler = TaskiqScheduler(broker, sources=[LabelScheduleSource(broker)])
scheduler_task = asyncio.create_task(run_scheduler_task(scheduler))
await asyncio.sleep(1) # waiting start

@broker.task(schedule=[{"time": datetime.utcnow() - timedelta(seconds=1)}])
def _() -> None:
Expand All @@ -36,6 +37,7 @@ def _() -> None:
...

scheduler_task = asyncio.create_task(run_scheduler_task(scheduler))
await asyncio.sleep(1) # waiting start

msg = await asyncio.wait_for(broker.queue.get(), 0.3)
assert msg
Expand Down