Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add intervals for scheduled task #399

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

so-saf
Copy link

@so-saf so-saf commented Jan 11, 2025

#290 #348

Hi! First, thank you for the project!

In this PR, I want to add intervals for scheduled tasks. I am attaching one commit as a draft to coordinate further changes with you.
I have implemented MVP, and a lot of related code has yet to be written, including documentation and tests.

The implementation is quite simple:
We go through all scheduled tasks once a second and send them to the broker.

  • For cron tasks, the frequency remained around a minute.

Of the additional advantages, it can be noted that tasks marked with the "time" label are scheduled with less delay.

What do you think?

@so-saf so-saf changed the title Draft: Add intervals for scheduled task #290 #348 Draft: Add intervals for scheduled task Jan 11, 2025
@so-saf so-saf changed the title Draft: Add intervals for scheduled task Add intervals for scheduled task Jan 14, 2025
@Mothilal-hire10x
Copy link

#290 #348

Hi! First, thank you for the project!

In this PR, I want to add intervals for scheduled tasks. I am attaching one commit as a draft to coordinate further changes with you. I have implemented MVP, and a lot of related code has yet to be written, including documentation and tests.

The implementation is quite simple: We go through all scheduled tasks once a second and send them to the broker.

  • For cron tasks, the frequency remained around a minute.

Of the additional advantages, it can be noted that tasks marked with the "time" label are scheduled with less delay.

What do you think?

Need to be merge

now = datetime.now()
next_second = now.replace(microsecond=0) + timedelta(seconds=1)
next_minute = now.replace(second=0, microsecond=0) + timedelta(minutes=1)
await asyncio.sleep((next_second - now).total_seconds())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will give us too many loop invocations, It will be spamming scheduling sources every second. Maybe it would be really nice if we could make a configurable delays between getting tasks from scheduling sources.

By default I'd gather tasks to send every 5 seconds.

last_call is not None
and round((now - last_call).total_seconds()) < task.interval
):
to_send = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to_send is False by default. You can negate this if condition and only leave the second branch.

else:
to_send = True
interval_tasks_last_run[task.task_name] = now
elif now > next_minute and task.cron is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this condition will work as expected, and most likely, it won't.

Currently, the now value is updated on each iteration of the while loop, and so is next_minute. This could cause an issue, especially if it takes slightly longer than 1 second to obtain all scheduled tasks due to factors like network lag. In that case, there would be no delay between setting the next_minute and updating the now value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Easier solution would be to have a dict of last cron runs and remember which tasks have run in this very minute. Similarly as you've done with interval_tasks.

@@ -142,32 +149,53 @@ async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None:
"""
loop = asyncio.get_event_loop()
running_schedules = set()

interval_tasks_last_run: "Dict[str, datetime]" = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's a dict[str, datetime] it seems like if you are going to have multiple intervals, you solution will most probably crumble apart. You need to have a nested list (because intervals can have the same value). Each list item will be an interval value and last run.

@s3rius
Copy link
Member

s3rius commented Mar 7, 2025

Also, please rebase your branch onto master. There were some changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants