Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions background.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@
from tasks.hello import hello
from tasks.welcome import welcome


app = Celery("sidekick", broker=config.redis_url, backend="rpc://")
app.config_from_object("celery_config")


# app.conf.beat_schedule = {
# "hi-message": {"task": "tasks.hi.hi", "schedule": 5, "args": (2,)},
# }

# tasks
hello = app.task(hello, queue="hello")
hi = app.task(hi, queue="hi")
Expand Down
31 changes: 31 additions & 0 deletions beat/PersistentScheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import celery.beat as parent_beat


class PersistentScheduler(parent_beat.PersistentScheduler):
_store = {"tz": "UTC", "utc_enabled": True}

# def _next_instance(self, last_run_at=None):
# print("this is it")
# return None
# __next__ = next = _next_instance

# def scheduler(self):
# return self

def tick(self, *args, **kwargs):
# print("checking")
super().tick(self, *args, **kwargs)

def setup_schedule(self):
self._store.setdefault("entries", {})
self.schedule["db-task"] = self.Entry(
app=self.app, name="db-task", task="tasks.hi.hi", schedule=5, args=(2,)
)

# self.merge_inplace(self.app.conf.beat_schedule)

# known_suffixes = ".db"
# persistence = StoreManager

# def setup_schedule(self):
# print("setup scheduler called")
59 changes: 59 additions & 0 deletions beat/ScheduleEntry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from dataclasses import dataclass, field
import celery
import datetime
from typing import Union, Type
from celery.schedules import solar
from celery.schedules import crontab, maybe_schedule
from celery.beat import ScheduleEntry


schedule_type = Union[datetime.datetime, datetime.timedelta, crontab, solar, int, float]


@dataclass
class Entry:
app: celery.Celery = field(default_factory=lambda: None)
name: str = field(default_factory=lambda: None)
schedule: schedule_type = field(default_factory=lambda: None)
relative: bool = field(default_factory=lambda: False)
args: list[any] = field(default_factory=lambda: [])
kwargs: dict[str, any] = field(default_factory=lambda: {})
options: dict[str, any] = field(default_factory=lambda: {})
last_run_at: datetime.datetime = field(default_factory=lambda: None)
total_run_count: int = field(default_factory=lambda: 0)


class ScheduleEntry(ScheduleEntry, Entry):
def __init__(self, **kwargs) -> None:
self.set_constants(**{**Entry().__dict__, **kwargs})
self.set_variables(**{**Entry().__dict__, **kwargs})

def set_constants(
self,
app: celery.Celery,
task: celery.Task,
name: str,
args: list[any],
options: dict,
kwargs: dict,
last_run_at: datetime.datetime,
total_run_count: int,
**_
):
self.app = app
self.task = task
self.name = name
self.args = args or []
self.kwargs = kwargs or {}
self.options = options or {}
self.last_run_at = last_run_at or self.default_last_run_time()
self.total_run_count = total_run_count or 0

def set_variables(self, schedule: schedule_type, relative: bool, **_):
self.schedule = maybe_schedule(schedule, relative, app=self.app)

def default_last_run_time(self):
return self.schedule.now() if self.schedule else self.app.now()

def default_now(self):
return self.schedule.now() if self.schedule else self.app.now()
5 changes: 5 additions & 0 deletions beat/Scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import celery.beat as parent_beat


class Scheduler(parent_beat.Scheduler):
...
7 changes: 7 additions & 0 deletions beat/StoreScheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from beat.Scheduler import Scheduler
from beat.PersistentScheduler import PersistentScheduler
from beat.ScheduleEntry import ScheduleEntry


class StoreScheduler(PersistentScheduler, Scheduler):
Entry = ScheduleEntry
File renamed without changes.
4 changes: 3 additions & 1 deletion celery_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from beat.StoreScheduler import StoreScheduler
from kombu import Exchange, Queue


CELERY_QUEUES = (
beat_scheduler = f"{StoreScheduler.__module__}:{StoreScheduler.__name__}"
task_queues = (
Queue("hi", Exchange("hi"), routing_key="hi"),
Queue("bye", Exchange("bye"), routing_key="bye"),
Queue("hello", Exchange("hello"), routing_key="hello"),
Expand Down
15 changes: 9 additions & 6 deletions start-workers.sh
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#!/bin/bash

celery -A background worker --loglevel=info -Q hi &
celery -A background worker --loglevel=info -Q bye &
celery -A background worker --loglevel=info -Q hello &
celery -A background worker --loglevel=info -Q welcome &
# to lock the terminal so that docker can continuously look into logs asynchronously
tail -f ${tty}
# celery -A background worker --loglevel=info -Q hi &
# celery -A background worker --loglevel=info -Q bye &
# celery -A background worker --loglevel=info -Q hello &
# celery -A background worker --loglevel=info -Q welcome &
# # to lock the terminal so that docker can continuously look into logs asynchronously
# tail -f ${tty}


celery -A background worker --loglevel=info -B
2 changes: 2 additions & 0 deletions store/manager/StoreManager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class StoreManager:
conn: str
Empty file added store/manager/__init__.py
Empty file.
26 changes: 26 additions & 0 deletions store/models/DBScheduleEntry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from Config import config
import sqlalchemy
from sqlalchemy import Column, String, Boolean, Integer, Timestamp
from sqlalchemy.dialects.postgresql import JSONB, UUID


class DBScheduleEntry(object):
__table_args__ = {"schema": config.app_name}

id = Column(
"id",
UUID(as_uuid=False),
server_default=sqlalchemy.text("uuid_generate_v4()"),
primary_key=True,
index=True,
)

name = Column(String(255))
args = Column(JSONB)
kwargs = Column(JSONB)
options = Column(JSONB)
schedule = Column(JSONB)

last_run_at = Column(Timestamp, server_default="now()")
total_run_count = Column(Integer, server_default=1)
relative = Column(Boolean, server_default="f")
2 changes: 2 additions & 0 deletions store/models/ModelBase.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class ModelBase:
...
Empty file added store/models/__init__.py
Empty file.