Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions cat-cafe-skills/schedule-tasks/SKILL.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
---
name: schedule-tasks
description: >
定时任务注册、管理、能力指南。
Use when: 用户想设定时任务、定期提醒、周期巡检、定时发送内容。
Not for: 一次性即时操作、已有 builtin 任务的手动触发。
定时任务注册、管理、能力指南。支持周期任务和一次性延迟任务。
Use when: 用户想设定时任务、定期提醒、周期巡检、定时发送内容、延迟执行一次性操作
Not for: 已有 builtin 任务的手动触发。
Output: 注册/管理定时任务,任务到点唤醒猫执行。
triggers:
- "定时"
Expand All @@ -17,6 +17,12 @@ triggers:
- "定期"
- "周期"
- "定时任务"
- "分钟后"
- "小时后"
- "之后"
- "later"
- "in 5 minutes"
- "once"
---

# Schedule Tasks — 定时任务注册与管理
Expand Down Expand Up @@ -72,6 +78,8 @@ triggers:

## Trigger 语法速查

### 周期触发(recurring)

| 用户说 | trigger JSON |
|--------|-------------|
| 每天早上 9 点 | `{"type":"cron","expression":"0 9 * * *"}` |
Expand All @@ -80,6 +88,17 @@ triggers:
| 每周一早上 10 点 | `{"type":"cron","expression":"0 10 * * 1"}` |
| 每 5 分钟 | `{"type":"interval","ms":300000}` |

### 一次性触发(once — #415)

| 用户说 | trigger JSON |
|--------|-------------|
| 2 分钟后提醒我 | `{"type":"once","delayMs":120000}` |
| 1 小时后查天气 | `{"type":"once","delayMs":3600000}` |
| 30 秒后通知我 | `{"type":"once","delayMs":30000}` |

一次性任务执行后会**自动退役**(从 runtime 注销 + 从 SQLite 删除),不会重复触发。
路由层会将 `delayMs` 归一化为绝对时间 `fireAt`(epoch ms),确保重启后触发时间不漂移。

## 管理

| 操作 | 工具 |
Expand Down
2 changes: 2 additions & 0 deletions packages/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ async function main(): Promise<void> {
const { DynamicTaskStore } = await import('./infrastructure/scheduler/DynamicTaskStore.js');
const { templateRegistry } = await import('./infrastructure/scheduler/templates/registry.js');
const dynamicTaskStore = new DynamicTaskStore(schedulerDb);
taskRunnerV2.setDynamicTaskStore(dynamicTaskStore); // #415: wire store for once-trigger auto-retirement

// ── F139 Phase 2+3A+3B: Schedule panel API routes ──
const { scheduleRoutes } = await import('./routes/schedule.js');
Expand All @@ -543,6 +544,7 @@ async function main(): Promise<void> {
globalControlStore,
packTemplateStore,
taskStore,
deliver: schedulerDeliver,
});

// ── Phase G: Summary Compaction (registers into unified scheduler) ──
Expand Down
121 changes: 120 additions & 1 deletion packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { getNextCronMs } from './cron-utils.js';
import type { DynamicTaskStore } from './DynamicTaskStore.js';
import type { DynamicTaskDef, DynamicTaskStore } from './DynamicTaskStore.js';
import { executeTaskPipeline } from './execute-pipeline.js';
import type { RunLedger } from './RunLedger.js';
import { notifyTaskFailed, notifyTaskSucceeded } from './schedule-notify.js';
import type { TaskTemplate } from './templates/types.js';
import type {
ActorRole,
Expand Down Expand Up @@ -31,6 +32,8 @@ export interface TaskRunnerV2Options {
fetchContent?: (url: string) => Promise<FetchResult>;
/** Phase 4b: invoke a cat to handle a scheduled task (fire-and-forget) */
invokeTrigger?: ScheduleInvokeTrigger;
/** #415: dynamic task store — needed for once-trigger auto-retirement */
dynamicTaskStore?: DynamicTaskStore;
}

/** Phase 2.5: Compute human-readable subject preview from subjectKind + lastRun (AC-E2) */
Expand Down Expand Up @@ -92,6 +95,7 @@ export class TaskRunnerV2 {
private deliver: TaskRunnerV2Options['deliver'];
private fetchContent: TaskRunnerV2Options['fetchContent'];
private invokeTrigger: TaskRunnerV2Options['invokeTrigger'];
private dynamicTaskStore: TaskRunnerV2Options['dynamicTaskStore'];

constructor(opts: TaskRunnerV2Options) {
this.logger = opts.logger;
Expand All @@ -102,13 +106,19 @@ export class TaskRunnerV2 {
this.deliver = opts.deliver;
this.fetchContent = opts.fetchContent;
this.invokeTrigger = opts.invokeTrigger;
this.dynamicTaskStore = opts.dynamicTaskStore;
}

/** Late-bind invokeTrigger (constructed after TaskRunnerV2 in boot sequence) */
setInvokeTrigger(trigger: ScheduleInvokeTrigger): void {
this.invokeTrigger = trigger;
}

/** #415: Late-bind dynamicTaskStore (constructed after TaskRunnerV2 in boot sequence) */
setDynamicTaskStore(store: DynamicTaskStore): void {
this.dynamicTaskStore = store;
}

register(task: AnyTaskSpec): void {
if (this.tasks.some((t) => t.id === task.id)) {
throw new Error(`TaskRunnerV2: duplicate task id "${task.id}"`);
Expand Down Expand Up @@ -149,6 +159,12 @@ export class TaskRunnerV2 {
const defs = store.getAll().filter((d) => d.enabled);
let loaded = 0;
for (const def of defs) {
// #415: once tasks with past fireAt → missed window, cancel + notify + retire
if (def.trigger.type === 'once' && def.trigger.fireAt < Date.now()) {
this.handleMissedOnceTask(def, store);
continue;
}

const template = templateGetter.get(def.templateId);
if (!template) {
this.logger.error(`[scheduler] hydrate: unknown template "${def.templateId}" for def ${def.id}`);
Expand Down Expand Up @@ -190,6 +206,8 @@ export class TaskRunnerV2 {

if (task.trigger.type === 'cron') {
this.scheduleCronTick(task);
} else if (task.trigger.type === 'once') {
this.scheduleOnceTick(task);
} else {
const runTick = () => {
// Guard: skip if task was unregistered before tick fires (防幽灵执行)
Expand Down Expand Up @@ -232,6 +250,99 @@ export class TaskRunnerV2 {
);
}

/** Max safe setTimeout delay — Node clamps values above 2^31-1 ms to ~1ms */
private static MAX_TIMER_DELAY = 2_147_483_647;

/** #415: Schedule a one-shot task — fires once at fireAt, then auto-retires */
private scheduleOnceTick(task: AnyTaskSpec): void {
if (task.trigger.type !== 'once') return;
const remaining = Math.max(0, task.trigger.fireAt - Date.now());
// Node setTimeout overflows at 2^31-1 ms — chunk long delays into safe steps
if (remaining > TaskRunnerV2.MAX_TIMER_DELAY) {
const timer = setTimeout(() => this.scheduleOnceTick(task), TaskRunnerV2.MAX_TIMER_DELAY);
if (typeof timer === 'object' && 'unref' in timer) timer.unref();
this.timers.set(task.id, timer);
return;
}
const timer = setTimeout(() => {
// Guard: skip if task was unregistered before timeout fires
if (!this.timers.has(task.id)) return;
this.executePipeline(task)
.catch((err) => {
this.logger.error(`[scheduler] ${task.id}: pipeline error`, err);
})
.finally(() => {
// Check ledger to distinguish governance skip from actual execution/other skips
const entries = this.ledger.query(task.id, 1);
const lastOutcome = entries[0]?.outcome;
const isGovernanceSkip = lastOutcome === 'SKIP_GLOBAL_PAUSE' || lastOutcome === 'SKIP_TASK_OVERRIDE';
if (isGovernanceSkip) {
this.logger.info(`[scheduler] ${task.id}: once task governance-skipped, retrying in 30s`);
const retryTimer = setTimeout(() => {
if (!this.started || !this.tasks.some((t) => t.id === task.id)) return;
this.scheduleOnceTick(task);
}, 30_000);
if (typeof retryTimer === 'object' && 'unref' in retryTimer) retryTimer.unref();
this.timers.set(task.id, retryTimer);
} else {
this.retireOnceTask(task.id);
}
});
}, remaining);
if (typeof timer === 'object' && 'unref' in timer) timer.unref();
this.timers.set(task.id, timer);
this.logger.info(
`[scheduler] ${task.id}: registered (profile=${task.profile}, once, fireAt=${new Date(task.trigger.fireAt).toISOString()}, delay=${remaining}ms)`,
);
}

/** #415: Remove a once-task from runtime + persistent store after execution */
private retireOnceTask(taskId: string): void {
// Use taskId directly — for dynamic tasks, taskId === dynDefId
if (this.dynamicTaskStore) {
this.dynamicTaskStore.remove(taskId);
}
this.unregister(taskId);
this.logger.info(`[scheduler] ${taskId}: retired (once task completed)`);
}

/** #415: Handle once-task that missed its execution window (hydrated after restart) */
private handleMissedOnceTask(def: DynamicTaskDef, store: DynamicTaskStore): void {
const fireAt = def.trigger.type === 'once' ? def.trigger.fireAt : 0;
const fireAtIso = new Date(fireAt).toISOString();
this.logger.info(`[scheduler] ${def.id}: once task missed window (fireAt=${fireAtIso}), retiring`);

// Record in ledger for audit trail
this.ledger.record({
task_id: def.id,
subject_key: def.id,
outcome: 'SKIP_MISSED_WINDOW',
signal_summary: `Execution window missed: fireAt=${fireAtIso}`,
duration_ms: 0,
started_at: new Date().toISOString(),
assigned_cat_id: null,
error_summary: null,
});

// Notify user via delivery thread (fire-and-forget)
if (def.deliveryThreadId && this.deliver) {
const label = def.display?.label ?? def.templateId;
const content =
`⏰ 定时任务「${label}」的执行时间窗已错过(原定 ${fireAtIso}),` + '服务在该时间段未运行。任务已自动取消。';
this.deliver({
threadId: def.deliveryThreadId,
content,
catId: def.createdBy,
userId: ((def.params as Record<string, unknown>).triggerUserId as string) ?? 'system',
}).catch((err) => {
this.logger.error(`[scheduler] ${def.id}: failed to send missed-window notification`, err);
});
}

// Remove from persistent store
store.remove(def.id);
}

stop(): void {
for (const [id, timer] of this.timers) {
clearTimeout(timer);
Expand Down Expand Up @@ -305,6 +416,14 @@ export class TaskRunnerV2 {
deliver: this.deliver,
fetchContent: this.fetchContent,
invokeTrigger: this.invokeTrigger,
onItemOutcome: (taskId, _subjectKey, outcome, errorSummary) => {
const dynDefId = this.dynamicTaskIds.get(taskId);
if (!dynDefId || !this.dynamicTaskStore) return;
const def = this.dynamicTaskStore.getById(dynDefId);
if (!def) return;
if (outcome === 'RUN_FAILED') notifyTaskFailed(this.deliver, def, errorSummary);
if (outcome === 'RUN_DELIVERED') notifyTaskSucceeded(this.deliver, def);
},
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ export interface PipelineContext {
fetchContent?: (url: string) => Promise<FetchResult>;
/** Phase 4b: invoke a cat to handle a scheduled task (fire-and-forget) */
invokeTrigger?: ScheduleInvokeTrigger;
/** #415: per-workItem outcome callback (used for failure notifications) */
onItemOutcome?: (taskId: string, subjectKey: string, outcome: RunOutcome, errorSummary: string | null) => void;
}

function withTimeout(promise: Promise<void>, ms: number, taskId: string): Promise<void> {
Expand Down Expand Up @@ -70,6 +72,7 @@ export async function executeTaskPipeline(ctx: PipelineContext): Promise<void> {
deliver,
fetchContent,
invokeTrigger,
onItemOutcome,
} = ctx;
const startMs = Date.now();
const tickCount = (tickCounts.get(task.id) ?? 0) + 1;
Expand Down Expand Up @@ -209,6 +212,9 @@ export async function executeTaskPipeline(ctx: PipelineContext): Promise<void> {
error_summary: errorSummary,
});

// #415: notify on outcome (used for failure notifications)
if (onItemOutcome) onItemOutcome(task.id, item.subjectKey, outcome, errorSummary);

// AC-D2: Record emission after successful thread-scoped delivery for self-echo suppression
if (outcome === 'RUN_DELIVERED' && emissionStore && item.subjectKey.startsWith('thread-')) {
const threadId = item.subjectKey.slice(7);
Expand Down
77 changes: 77 additions & 0 deletions packages/api/src/infrastructure/scheduler/schedule-notify.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* #415 Phase 2: Task lifecycle notifications
*
* Fire-and-forget notifications to delivery threads for lifecycle events:
* registered, paused, resumed, deleted, failed, missed-window.
*/

import { getNextCronMs } from './cron-utils.js';
import type { DynamicTaskDef } from './DynamicTaskStore.js';
import type { DeliverOpts, TriggerSpec } from './types.js';

type DeliverFn = (opts: DeliverOpts) => Promise<string>;

/** Compute epoch ms of next fire time for a trigger */
export function computeNextFireTime(trigger: TriggerSpec): number | null {
if (trigger.type === 'once') return trigger.fireAt;
if (trigger.type === 'cron') return Date.now() + getNextCronMs(trigger.expression, trigger.timezone);
if (trigger.type === 'interval') return Date.now() + trigger.ms;
return null;
}

function formatTime(epoch: number): string {
return new Date(epoch).toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai', hour12: false });
}

function resolveUserId(def: DynamicTaskDef): string {
return ((def.params as Record<string, unknown>).triggerUserId as string) ?? 'system';
}

function label(def: DynamicTaskDef): string {
return def.display?.label ?? def.templateId;
}

function fire(deliver: DeliverFn | undefined, def: DynamicTaskDef, content: string): void {
if (!deliver || !def.deliveryThreadId) return;
deliver({ threadId: def.deliveryThreadId, content, catId: 'system', userId: resolveUserId(def) }).catch(() => {});
}

export function notifyTaskRegistered(deliver: DeliverFn | undefined, def: DynamicTaskDef): void {
const nextFire = computeNextFireTime(def.trigger);
const timeStr = nextFire ? formatTime(nextFire) : '未知';
const once = def.trigger.type === 'once' ? '(一次性,执行后自动退役)' : '';
fire(deliver, def, `✅ 定时任务「${label(def)}」已创建,下次执行时间:${timeStr}${once}`);
}

export function notifyTaskPaused(deliver: DeliverFn | undefined, def: DynamicTaskDef): void {
fire(deliver, def, `⏸️ 定时任务「${label(def)}」已暂停`);
}

export function notifyTaskResumed(deliver: DeliverFn | undefined, def: DynamicTaskDef): void {
const nextFire = computeNextFireTime(def.trigger);
const timeStr = nextFire ? formatTime(nextFire) : '未知';
fire(deliver, def, `▶️ 定时任务「${label(def)}」已恢复,下次执行时间:${timeStr}`);
}

export function notifyTaskDeleted(deliver: DeliverFn | undefined, def: DynamicTaskDef): void {
fire(deliver, def, `🗑️ 定时任务「${label(def)}」已删除`);
}

export function notifyTaskSucceeded(deliver: DeliverFn | undefined, def: DynamicTaskDef): void {
if (def.trigger.type === 'once') {
fire(deliver, def, `✅ 定时任务「${label(def)}」已执行完成,任务已自动结束`);
} else {
const nextFire = computeNextFireTime(def.trigger);
const timeStr = nextFire ? formatTime(nextFire) : '未知';
fire(deliver, def, `✅ 定时任务「${label(def)}」本次执行完成,下次执行时间:${timeStr}`);
}
}

export function notifyTaskFailed(
deliver: DeliverFn | undefined,
def: DynamicTaskDef,
errorSummary: string | null,
): void {
const reason = errorSummary ? `:${errorSummary.slice(0, 200)}` : '';
fire(deliver, def, `❌ 定时任务「${label(def)}」执行失败${reason}`);
}
8 changes: 6 additions & 2 deletions packages/api/src/infrastructure/scheduler/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ export interface GateCtx {
/** Task profile presets (ADR-022 KD-1) */
export type TaskProfile = 'awareness' | 'poller';

/** Phase 2: Trigger spec — interval or cron */
export type TriggerSpec = { type: 'interval'; ms: number } | { type: 'cron'; expression: string; timezone?: string };
/** Phase 2: Trigger spec — interval, cron, or once (#415) */
export type TriggerSpec =
| { type: 'interval'; ms: number }
| { type: 'cron'; expression: string; timezone?: string }
| { type: 'once'; fireAt: number };

/** Phase 2: Context dimension — session × materialization */
export interface ContextSpec {
Expand All @@ -39,6 +42,7 @@ export type RunOutcome =
| 'SKIP_GLOBAL_PAUSE'
| 'SKIP_TASK_OVERRIDE'
| 'SKIP_SELF_ECHO'
| 'SKIP_MISSED_WINDOW'
| 'RUN_DELIVERED'
| 'RUN_FAILED';

Expand Down
Loading
Loading