Skip to content

Commit badb9ef

Browse files
committed
feat: add schedule kanban
Signed-off-by: Chojan Shang <psiace@apache.org>
1 parent d50e5bd commit badb9ef

File tree

7 files changed

+520
-13
lines changed

7 files changed

+520
-13
lines changed

contrib/bubseek-schedule/README.md

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,24 @@ dependencies = [
3333

3434
## Runtime Behavior
3535

36-
- Scheduler starts when the plugin channel starts.
36+
- The plugin uses **APScheduler `BackgroundScheduler`** (see also upstream [bub-schedule](https://github.com/bubbuild/bub-contrib) JSON store pattern: persistence must not depend on a specific channel being enabled).
37+
- **`load_state` starts the scheduler** on the first inbound message. That way `bub chat` (CLI-only: only the `cli` channel is enabled) still persists jobs to SeekDB. Previously, `AsyncIOScheduler` was only started by the `schedule` channel, so CLI chat left jobs in memory-only `_pending_jobs` and **nothing was written to `apscheduler_jobs`**.
38+
- The channel name is `schedule`. Enabling it in `bub gateway` is optional for persistence; it still starts/stops the scheduler cleanly when you use gateway with that channel.
3739
- Jobs are persisted to:
38-
- **OceanBase/SeekDB**: When `BUB_TAPESTORE_SQLALCHEMY_URL` uses `mysql+oceanbase`, jobs go to the same database (table `apscheduler_jobs`).
40+
- **OceanBase/SeekDB**: Same URL as the tape store (`BUB_TAPESTORE_SQLALCHEMY_URL` / `OCEANBASE_*`), table `apscheduler_jobs`.
3941

4042
## Provided Tools
4143

4244
- `schedule.add`: Add a scheduled job with cron, interval, or one-shot.
4345
- `schedule.remove`: Remove a scheduled job by ID.
4446
- `schedule.list`: List all scheduled jobs.
47+
48+
## Debug: job in chat but not in Marimo kanban / DB
49+
50+
The gateway resolves the job store URL from `BUB_TAPESTORE_SQLALCHEMY_URL` or workspace `.env` (`OCEANBASE_*`). Marimo must use the **same** URL. If `insights/schedule_kanban.py` pointed at the default `127.0.0.1:2881/bub` while your `.env` uses another host/db, the table will look empty.
51+
52+
From the bubseek repo root:
53+
54+
```bash
55+
uv run python scripts/query_apscheduler_jobs.py --job-id <id>
56+
```
Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,33 @@
11
import asyncio
2+
import contextlib
23
from asyncio import Event
4+
from typing import ClassVar
35

6+
from apscheduler.schedulers import SchedulerAlreadyRunningError
47
from apscheduler.schedulers.base import BaseScheduler
58
from bub.channels import Channel
69
from loguru import logger
710

811

912
class ScheduleChannel(Channel):
13+
"""Starts/stops BackgroundScheduler when this channel is enabled (e.g. gateway)."""
14+
15+
name: ClassVar[str] = "schedule"
16+
1017
def __init__(self, scheduler: BaseScheduler) -> None:
1118
self.scheduler = scheduler
1219

1320
async def start(self, stop_event: Event) -> None:
14-
loop = asyncio.get_running_loop()
15-
loop.call_soon_threadsafe(self.scheduler.start)
21+
if not self.scheduler.running:
22+
with contextlib.suppress(SchedulerAlreadyRunningError):
23+
self.scheduler.start()
1624
logger.info("schedule.start complete")
1725

1826
async def stop(self) -> None:
27+
if not self.scheduler.running:
28+
logger.info("schedule.stop complete (idle)")
29+
return
30+
# BackgroundScheduler.shutdown() blocks until the worker thread stops.
1931
loop = asyncio.get_running_loop()
20-
loop.call_soon_threadsafe(self.scheduler.shutdown)
32+
await loop.run_in_executor(None, lambda: self.scheduler.shutdown(wait=True))
2133
logger.info("schedule.stop complete")

contrib/bubseek-schedule/src/bubseek_schedule/jobs.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import asyncio
34
from pathlib import Path
45

56
from bub import BubFramework
@@ -13,7 +14,7 @@ def _noop() -> None:
1314
pass
1415

1516

16-
async def run_scheduled_reminder(message: str, session_id: str, workspace: str | None = None) -> None:
17+
async def _run_scheduled_reminder_async(message: str, session_id: str, workspace: str | None = None) -> None:
1718
framework = BubFramework()
1819
framework.load_hooks()
1920
if workspace:
@@ -30,3 +31,12 @@ async def run_scheduled_reminder(message: str, session_id: str, workspace: str |
3031
chat_id=chat_id,
3132
)
3233
await framework.process_inbound(payload)
34+
35+
36+
def run_scheduled_reminder(message: str, session_id: str, workspace: str | None = None) -> None:
37+
"""Synchronous job target for BackgroundScheduler (runs in a worker thread).
38+
39+
APScheduler's BackgroundScheduler executes jobs in a thread pool; ``async def`` targets are
40+
not supported with the default executor. Each run uses ``asyncio.run()`` with a fresh loop.
41+
"""
42+
asyncio.run(_run_scheduled_reminder_async(message, session_id, workspace))

contrib/bubseek-schedule/src/bubseek_schedule/jobstore.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
from sqlalchemy.orm.session import sessionmaker as sessionmaker_type
1616

1717
import bubseek.oceanbase # noqa: F401 - register mysql+oceanbase dialect
18-
from bubseek.config import BubSeekSettings
1918

2019

2120
def _get_jobstore_url() -> str:
22-
"""Get SQLAlchemy URL for job store from the configured SeekDB/OceanBase tapestore."""
23-
return BubSeekSettings().db.resolved_tapestore_url
21+
"""Resolve tapestore URL (workspace .env, BUB_WORKSPACE_PATH, cwd) like the rest of bubseek."""
22+
from bubseek.config import resolve_tapestore_url
23+
24+
return resolve_tapestore_url()
2425

2526

2627
def _normalize_url(url: str) -> str:
@@ -41,17 +42,22 @@ class OceanBaseJobStore(BaseJobStore):
4142

4243
def __init__(self, url: str | None = None, tablename: str = "apscheduler_jobs"):
4344
super().__init__()
44-
self._url = _normalize_url(url or _get_jobstore_url())
45+
self._url_explicit = url
4546
self._tablename = tablename
4647
self._engine: Engine | None = None
4748
self._session_factory: sessionmaker_type | None = None
4849
self._table: Table | None = None
4950
self._lock = threading.RLock()
5051

52+
def _connection_url(self) -> str:
53+
if self._url_explicit is not None:
54+
return _normalize_url(self._url_explicit)
55+
return _normalize_url(_get_jobstore_url())
56+
5157
def _ensure_initialized(self) -> None:
5258
if self._engine is not None and self._session_factory is not None and self._table is not None:
5359
return
54-
self._engine = create_engine(self._url, pool_pre_ping=True)
60+
self._engine = create_engine(self._connection_url(), pool_pre_ping=True)
5561
self._session_factory = sessionmaker(bind=self._engine, expire_on_commit=False)
5662
self._init_table()
5763

contrib/bubseek-schedule/src/bubseek_schedule/plugin.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
from apscheduler.schedulers.asyncio import AsyncIOScheduler
1+
import contextlib
2+
3+
from apscheduler.schedulers import SchedulerAlreadyRunningError
4+
from apscheduler.schedulers.background import BackgroundScheduler
25
from apscheduler.schedulers.base import BaseScheduler
36
from bub import hookimpl
47
from bub.types import Envelope, MessageHandler, State
@@ -8,17 +11,32 @@
811

912
def default_scheduler() -> BaseScheduler:
1013
job_store = OceanBaseJobStore()
11-
return AsyncIOScheduler(jobstores={"default": job_store})
14+
return BackgroundScheduler(jobstores={"default": job_store})
1215

1316

1417
class ScheduleImpl:
18+
"""Schedule plugin: persist jobs to SeekDB via OceanBaseJobStore.
19+
20+
Uses BackgroundScheduler so the scheduler can start without the ``schedule`` channel.
21+
``bub chat`` only enables the ``cli`` channel; previously AsyncIOScheduler never started,
22+
so APScheduler kept jobs in memory-only ``_pending_jobs`` and nothing reached the DB.
23+
"""
24+
1525
def __init__(self) -> None:
1626
from bubseek_schedule import tools # noqa: F401
1727

1828
self.scheduler = default_scheduler()
1929

30+
def _ensure_scheduler_started(self) -> None:
31+
if self.scheduler.running:
32+
return
33+
with contextlib.suppress(SchedulerAlreadyRunningError):
34+
self.scheduler.start()
35+
2036
@hookimpl
2137
def load_state(self, message: Envelope, session_id: str) -> State:
38+
# Runs before tools on every inbound message — covers CLI-only ``bub chat``.
39+
self._ensure_scheduler_started()
2240
return {"scheduler": self.scheduler}
2341

2442
@hookimpl

0 commit comments

Comments
 (0)