From 44e6ab910cd96439c38e59abfdeefdfafc4b528d Mon Sep 17 00:00:00 2001 From: mindfn Date: Fri, 10 Apr 2026 10:45:03 +0800 Subject: [PATCH 01/12] feat(scheduler): support one-shot scheduled tasks (`once` trigger) (#415) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `{ type: 'once', fireAt: number }` trigger type for one-time delayed execution. Tasks auto-retire after firing (unregister from runtime + delete from SQLite). Route layer normalizes `delayMs` → absolute `fireAt` to prevent restart drift. [宪宪/Opus-46🐾] Co-Authored-By: Claude Opus 4.6 --- cat-cafe-skills/schedule-tasks/SKILL.md | 25 ++- packages/api/src/index.ts | 1 + .../infrastructure/scheduler/TaskRunnerV2.ts | 41 +++++ .../api/src/infrastructure/scheduler/types.ts | 7 +- packages/api/src/routes/schedule.ts | 41 ++++- .../test/scheduler/dynamic-task-store.test.js | 13 ++ .../api/test/scheduler/task-runner-v2.test.js | 158 ++++++++++++++++++ .../mcp-server/src/tools/schedule-tools.ts | 3 +- .../components/workspace/schedule-helpers.ts | 14 +- 9 files changed, 294 insertions(+), 9 deletions(-) diff --git a/cat-cafe-skills/schedule-tasks/SKILL.md b/cat-cafe-skills/schedule-tasks/SKILL.md index 9895d0261..c815d2626 100644 --- a/cat-cafe-skills/schedule-tasks/SKILL.md +++ b/cat-cafe-skills/schedule-tasks/SKILL.md @@ -1,9 +1,9 @@ --- name: schedule-tasks description: > - 定时任务注册、管理、能力指南。 - Use when: 用户想设定时任务、定期提醒、周期巡检、定时发送内容。 - Not for: 一次性即时操作、已有 builtin 任务的手动触发。 + 定时任务注册、管理、能力指南。支持周期任务和一次性延迟任务。 + Use when: 用户想设定时任务、定期提醒、周期巡检、定时发送内容、延迟执行一次性操作。 + Not for: 已有 builtin 任务的手动触发。 Output: 注册/管理定时任务,任务到点唤醒猫执行。 triggers: - "定时" @@ -17,6 +17,12 @@ triggers: - "定期" - "周期" - "定时任务" + - "分钟后" + - "小时后" + - "之后" + - "later" + - "in 5 minutes" + - "once" --- # Schedule Tasks — 定时任务注册与管理 @@ -72,6 +78,8 @@ triggers: ## Trigger 语法速查 +### 周期触发(recurring) + | 用户说 | trigger JSON | |--------|-------------| | 每天早上 9 点 | `{"type":"cron","expression":"0 9 * * *"}` | @@ -80,6 +88,17 @@ triggers: | 每周一早上 10 点 | `{"type":"cron","expression":"0 10 * * 1"}` | | 每 5 分钟 | `{"type":"interval","ms":300000}` | +### 一次性触发(once — #415) + +| 用户说 | trigger JSON | +|--------|-------------| +| 2 分钟后提醒我 | `{"type":"once","delayMs":120000}` | +| 1 小时后查天气 | `{"type":"once","delayMs":3600000}` | +| 30 秒后通知我 | `{"type":"once","delayMs":30000}` | + +一次性任务执行后会**自动退役**(从 runtime 注销 + 从 SQLite 删除),不会重复触发。 +路由层会将 `delayMs` 归一化为绝对时间 `fireAt`(epoch ms),确保重启后触发时间不漂移。 + ## 管理 | 操作 | 工具 | diff --git a/packages/api/src/index.ts b/packages/api/src/index.ts index 1ce43ffcc..6ae427623 100644 --- a/packages/api/src/index.ts +++ b/packages/api/src/index.ts @@ -533,6 +533,7 @@ async function main(): Promise { const { DynamicTaskStore } = await import('./infrastructure/scheduler/DynamicTaskStore.js'); const { templateRegistry } = await import('./infrastructure/scheduler/templates/registry.js'); const dynamicTaskStore = new DynamicTaskStore(schedulerDb); + taskRunnerV2.setDynamicTaskStore(dynamicTaskStore); // #415: wire store for once-trigger auto-retirement // ── F139 Phase 2+3A+3B: Schedule panel API routes ── const { scheduleRoutes } = await import('./routes/schedule.js'); diff --git a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts index e727d8732..170a405ba 100644 --- a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts +++ b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts @@ -31,6 +31,8 @@ export interface TaskRunnerV2Options { fetchContent?: (url: string) => Promise; /** Phase 4b: invoke a cat to handle a scheduled task (fire-and-forget) */ invokeTrigger?: ScheduleInvokeTrigger; + /** #415: dynamic task store — needed for once-trigger auto-retirement */ + dynamicTaskStore?: DynamicTaskStore; } /** Phase 2.5: Compute human-readable subject preview from subjectKind + lastRun (AC-E2) */ @@ -92,6 +94,7 @@ export class TaskRunnerV2 { private deliver: TaskRunnerV2Options['deliver']; private fetchContent: TaskRunnerV2Options['fetchContent']; private invokeTrigger: TaskRunnerV2Options['invokeTrigger']; + private dynamicTaskStore: TaskRunnerV2Options['dynamicTaskStore']; constructor(opts: TaskRunnerV2Options) { this.logger = opts.logger; @@ -102,6 +105,7 @@ export class TaskRunnerV2 { this.deliver = opts.deliver; this.fetchContent = opts.fetchContent; this.invokeTrigger = opts.invokeTrigger; + this.dynamicTaskStore = opts.dynamicTaskStore; } /** Late-bind invokeTrigger (constructed after TaskRunnerV2 in boot sequence) */ @@ -109,6 +113,11 @@ export class TaskRunnerV2 { this.invokeTrigger = trigger; } + /** #415: Late-bind dynamicTaskStore (constructed after TaskRunnerV2 in boot sequence) */ + setDynamicTaskStore(store: DynamicTaskStore): void { + this.dynamicTaskStore = store; + } + register(task: AnyTaskSpec): void { if (this.tasks.some((t) => t.id === task.id)) { throw new Error(`TaskRunnerV2: duplicate task id "${task.id}"`); @@ -190,6 +199,8 @@ export class TaskRunnerV2 { if (task.trigger.type === 'cron') { this.scheduleCronTick(task); + } else if (task.trigger.type === 'once') { + this.scheduleOnceTick(task); } else { const runTick = () => { // Guard: skip if task was unregistered before tick fires (防幽灵执行) @@ -232,6 +243,36 @@ export class TaskRunnerV2 { ); } + /** #415: Schedule a one-shot task — fires once at fireAt, then auto-retires */ + private scheduleOnceTick(task: AnyTaskSpec): void { + if (task.trigger.type !== 'once') return; + const delay = Math.max(0, task.trigger.fireAt - Date.now()); + const timer = setTimeout(() => { + this.executePipeline(task) + .catch((err) => { + this.logger.error(`[scheduler] ${task.id}: pipeline error`, err); + }) + .finally(() => { + this.retireOnceTask(task.id); + }); + }, delay); + if (typeof timer === 'object' && 'unref' in timer) timer.unref(); + this.timers.set(task.id, timer); + this.logger.info( + `[scheduler] ${task.id}: registered (profile=${task.profile}, once, fireAt=${new Date(task.trigger.fireAt).toISOString()}, delay=${delay}ms)`, + ); + } + + /** #415: Remove a once-task from runtime + persistent store after execution */ + private retireOnceTask(taskId: string): void { + const dynDefId = this.dynamicTaskIds.get(taskId); + if (dynDefId && this.dynamicTaskStore) { + this.dynamicTaskStore.remove(dynDefId); + } + this.unregister(taskId); + this.logger.info(`[scheduler] ${taskId}: retired (once task completed)`); + } + stop(): void { for (const [id, timer] of this.timers) { clearTimeout(timer); diff --git a/packages/api/src/infrastructure/scheduler/types.ts b/packages/api/src/infrastructure/scheduler/types.ts index c897f28bc..a8002974a 100644 --- a/packages/api/src/infrastructure/scheduler/types.ts +++ b/packages/api/src/infrastructure/scheduler/types.ts @@ -22,8 +22,11 @@ export interface GateCtx { /** Task profile presets (ADR-022 KD-1) */ export type TaskProfile = 'awareness' | 'poller'; -/** Phase 2: Trigger spec — interval or cron */ -export type TriggerSpec = { type: 'interval'; ms: number } | { type: 'cron'; expression: string; timezone?: string }; +/** Phase 2: Trigger spec — interval, cron, or once (#415) */ +export type TriggerSpec = + | { type: 'interval'; ms: number } + | { type: 'cron'; expression: string; timezone?: string } + | { type: 'once'; fireAt: number }; /** Phase 2: Context dimension — session × materialization */ export interface ContextSpec { diff --git a/packages/api/src/routes/schedule.ts b/packages/api/src/routes/schedule.ts index 76655450a..aebf2d768 100644 --- a/packages/api/src/routes/schedule.ts +++ b/packages/api/src/routes/schedule.ts @@ -24,6 +24,21 @@ import type { TriggerSpec } from '../infrastructure/scheduler/types.js'; import { resolveHeaderUserId } from '../utils/request-identity.js'; import { governanceRoutes } from './schedule-governance.js'; +/** #415: Normalize once-trigger input — accepts delayMs (relative) or fireAt (absolute) */ +function normalizeOnceTrigger(trigger: Record): TriggerSpec | { error: string } { + if (trigger.type !== 'once') return trigger as TriggerSpec; + const delayMs = typeof trigger.delayMs === 'number' ? trigger.delayMs : undefined; + const fireAt = typeof trigger.fireAt === 'number' ? trigger.fireAt : undefined; + if (delayMs != null) { + if (delayMs < 0) return { error: 'once trigger delayMs must be >= 0' }; + return { type: 'once', fireAt: Date.now() + delayMs }; + } + if (fireAt != null) { + return { type: 'once', fireAt }; + } + return { error: 'once trigger requires either delayMs or fireAt' }; +} + export interface ScheduleRoutesOptions { taskRunner: TaskRunnerV2; dynamicTaskStore?: DynamicTaskStore; @@ -200,7 +215,18 @@ export const scheduleRoutes: FastifyPluginAsync = async ( return { error: `Unknown template: ${body.templateId}` }; } - const trigger = body.trigger ?? template.defaultTrigger; + // #415: normalize once trigger (delayMs → fireAt) + let trigger: TriggerSpec; + if (body.trigger && (body.trigger as Record).type === 'once') { + const result = normalizeOnceTrigger(body.trigger as Record); + if ('error' in result) { + reply.status(400); + return { error: result.error }; + } + trigger = result; + } else { + trigger = body.trigger ?? template.defaultTrigger; + } const params = body.params ?? {}; const display = body.display ? { @@ -250,7 +276,18 @@ export const scheduleRoutes: FastifyPluginAsync = async ( return { error: `Unknown template: ${body.templateId}` }; } - const trigger = body.trigger ?? template.defaultTrigger; + // #415: normalize once trigger (delayMs → fireAt) + let trigger: TriggerSpec; + if (body.trigger && (body.trigger as Record).type === 'once') { + const result = normalizeOnceTrigger(body.trigger as Record); + if ('error' in result) { + reply.status(400); + return { error: result.error }; + } + trigger = result; + } else { + trigger = body.trigger ?? template.defaultTrigger; + } const params = body.params ?? {}; if (typeof params !== 'object' || params === null || Array.isArray(params)) { diff --git a/packages/api/test/scheduler/dynamic-task-store.test.js b/packages/api/test/scheduler/dynamic-task-store.test.js index d545729b1..2230bbc7f 100644 --- a/packages/api/test/scheduler/dynamic-task-store.test.js +++ b/packages/api/test/scheduler/dynamic-task-store.test.js @@ -110,4 +110,17 @@ describe('DynamicTaskStore', () => { store.insert(SAMPLE_DEF); assert.throws(() => store.insert(SAMPLE_DEF), /UNIQUE|constraint/i); }); + + test('#415: once trigger round-trips correctly', () => { + const fireAt = Date.now() + 120_000; + const onceDef = { + ...SAMPLE_DEF, + id: 'dyn-once-rt', + trigger: { type: 'once', fireAt }, + }; + store.insert(onceDef); + const loaded = store.getById('dyn-once-rt'); + assert.equal(loaded.trigger.type, 'once'); + assert.equal(loaded.trigger.fireAt, fireAt); + }); }); diff --git a/packages/api/test/scheduler/task-runner-v2.test.js b/packages/api/test/scheduler/task-runner-v2.test.js index 74034c2d4..a04e0f5dd 100644 --- a/packages/api/test/scheduler/task-runner-v2.test.js +++ b/packages/api/test/scheduler/task-runner-v2.test.js @@ -859,3 +859,161 @@ describe('TaskRunnerV2 — governance controls (AC-D1)', () => { runner.stop(); }); }); + +// ─── #415: once trigger ───────────────────────────────────── + +describe('TaskRunnerV2 — once trigger (#415)', () => { + let db, ledger, dynamicTaskStore; + const noop = () => {}; + const silentLogger = { info: noop, error: noop }; + + beforeEach(async () => { + db = new Database(':memory:'); + const { applyMigrations } = await import('../../dist/domains/memory/schema.js'); + const { RunLedger } = await import('../../dist/infrastructure/scheduler/RunLedger.js'); + const { DynamicTaskStore } = await import('../../dist/infrastructure/scheduler/DynamicTaskStore.js'); + applyMigrations(db); + ledger = new RunLedger(db); + dynamicTaskStore = new DynamicTaskStore(db); + }); + + const makeOnceTask = (id, fireAt, overrides = {}) => ({ + id, + profile: 'awareness', + trigger: { type: 'once', fireAt }, + admission: { + gate: async () => ({ run: true, workItems: [{ signal: 'go', subjectKey: 'once-k' }] }), + }, + run: { overlap: 'skip', timeoutMs: 5000, execute: async () => {} }, + state: { runLedger: 'sqlite' }, + outcome: { whenNoSignal: 'drop' }, + enabled: () => true, + ...overrides, + }); + + it('once trigger fires after delay and records RUN_DELIVERED', async () => { + const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); + const runner = new TaskRunnerV2({ logger: silentLogger, ledger, dynamicTaskStore }); + let executed = false; + + runner.registerDynamic( + makeOnceTask('once-fire', Date.now() + 80, { + run: { + overlap: 'skip', + timeoutMs: 5000, + execute: async () => { + executed = true; + }, + }, + }), + 'dyn-once-1', + ); + runner.start(); + await new Promise((r) => setTimeout(r, 200)); + + assert.ok(executed, 'once task should have fired'); + const rows = ledger.query('once-fire', 10); + assert.equal(rows.length, 1); + assert.equal(rows[0].outcome, 'RUN_DELIVERED'); + runner.stop(); + }); + + it('once trigger auto-retires: unregisters from runner + removes from store', async () => { + const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); + const runner = new TaskRunnerV2({ logger: silentLogger, ledger, dynamicTaskStore }); + + // Seed the dynamic store so retire can clean it up + dynamicTaskStore.insert({ + id: 'dyn-retire-1', + templateId: 'reminder', + trigger: { type: 'once', fireAt: Date.now() + 50 }, + params: { message: 'test' }, + display: { label: 'test', category: 'system' }, + deliveryThreadId: null, + enabled: true, + createdBy: 'test', + createdAt: new Date().toISOString(), + }); + + runner.registerDynamic(makeOnceTask('dyn-retire-1', Date.now() + 50), 'dyn-retire-1'); + runner.start(); + await new Promise((r) => setTimeout(r, 250)); + + // Should be unregistered from runner + assert.ok( + !runner.getRegisteredTasks().includes('dyn-retire-1'), + 'task should be unregistered after once execution', + ); + // Should be removed from store + assert.equal( + dynamicTaskStore.getById('dyn-retire-1'), + null, + 'task should be removed from DynamicTaskStore after once execution', + ); + runner.stop(); + }); + + it('once trigger with fireAt in the past fires immediately', async () => { + const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); + const runner = new TaskRunnerV2({ logger: silentLogger, ledger, dynamicTaskStore }); + let executed = false; + + runner.registerDynamic( + makeOnceTask('once-past', Date.now() - 5000, { + run: { + overlap: 'skip', + timeoutMs: 5000, + execute: async () => { + executed = true; + }, + }, + }), + 'dyn-past-1', + ); + runner.start(); + await new Promise((r) => setTimeout(r, 100)); + + assert.ok(executed, 'once task with past fireAt should fire immediately'); + runner.stop(); + }); + + it('once trigger does NOT fire before fireAt', async () => { + const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); + const runner = new TaskRunnerV2({ logger: silentLogger, ledger, dynamicTaskStore }); + let executed = false; + + runner.registerDynamic( + makeOnceTask('once-future', Date.now() + 10_000, { + run: { + overlap: 'skip', + timeoutMs: 5000, + execute: async () => { + executed = true; + }, + }, + }), + 'dyn-future-1', + ); + runner.start(); + await new Promise((r) => setTimeout(r, 100)); + + assert.ok(!executed, 'once task should NOT fire before fireAt'); + runner.stop(); + }); + + it('getTaskSummaries includes once trigger info', async () => { + const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); + const runner = new TaskRunnerV2({ logger: silentLogger, ledger, dynamicTaskStore }); + const fireAt = Date.now() + 60_000; + + runner.registerDynamic(makeOnceTask('once-summary', fireAt), 'dyn-sum-1'); + + const summaries = runner.getTaskSummaries(); + const s = summaries.find((t) => t.id === 'once-summary'); + assert.ok(s, 'should find once task in summaries'); + assert.equal(s.trigger.type, 'once'); + assert.equal(s.trigger.fireAt, fireAt); + assert.equal(s.source, 'dynamic'); + runner.stop(); + }); +}); diff --git a/packages/mcp-server/src/tools/schedule-tools.ts b/packages/mcp-server/src/tools/schedule-tools.ts index 9992b7d04..84f17a630 100644 --- a/packages/mcp-server/src/tools/schedule-tools.ts +++ b/packages/mcp-server/src/tools/schedule-tools.ts @@ -57,7 +57,7 @@ export const registerScheduledTaskInputSchema = { trigger: z .string() .describe( - 'Trigger config as JSON string. Examples: {"type":"cron","expression":"0 9 * * *"} or {"type":"interval","ms":3600000}', + 'Trigger config as JSON string. Examples: {"type":"cron","expression":"0 9 * * *"} or {"type":"interval","ms":3600000} or {"type":"once","delayMs":120000} (fire once after 2min) or {"type":"once","fireAt":1712345678000} (fire once at epoch ms)', ), params: z .string() @@ -203,6 +203,7 @@ export const scheduleTools = [ name: 'cat_cafe_register_scheduled_task', description: 'Create a new scheduled task from a template (confirm step). The task will be persisted and run automatically on schedule. ' + + 'Supports recurring (cron/interval) and one-shot (once) triggers. Once tasks auto-retire after execution. ' + 'When the task fires, a cat is woken with full capabilities — it can send rich blocks (images, audio, cards), search the web, generate content, etc. ' + 'IMPORTANT: You MUST call preview_scheduled_task first and get user confirmation before calling this. ' + 'trigger and params must be JSON strings, not objects.', diff --git a/packages/web/src/components/workspace/schedule-helpers.ts b/packages/web/src/components/workspace/schedule-helpers.ts index 439717dd8..c8401c5b2 100644 --- a/packages/web/src/components/workspace/schedule-helpers.ts +++ b/packages/web/src/components/workspace/schedule-helpers.ts @@ -19,9 +19,11 @@ export interface RunStats { } export interface TriggerSpec { - type: 'interval' | 'cron'; + type: 'interval' | 'cron' | 'once'; ms?: number; expression?: string; + /** #415: epoch ms — when the once trigger will fire */ + fireAt?: number; } export interface TaskDisplayMeta { @@ -93,6 +95,16 @@ export function fallbackCategory(taskId: string): DisplayCategory { export function formatTrigger(trigger: TriggerSpec): string { if (trigger.type === 'cron') return `cron: ${trigger.expression}`; + if (trigger.type === 'once') { + if (!trigger.fireAt) return 'once'; + const d = new Date(trigger.fireAt); + const now = Date.now(); + if (trigger.fireAt <= now) return 'once (fired)'; + const diff = trigger.fireAt - now; + if (diff < 60_000) return `once in ${Math.ceil(diff / 1000)}s`; + if (diff < 3_600_000) return `once in ${Math.ceil(diff / 60_000)}m`; + return `once @ ${d.toLocaleTimeString([], { hour: '2-digit', minute: '2-digit' })}`; + } const ms = trigger.ms ?? 0; if (ms >= 3600000) return `${Math.round(ms / 3600000)}h`; if (ms >= 60000) return `${Math.round(ms / 60000)}m`; From 33be9af151f66e008a96ebf2784a4ed08fc8ae7c Mon Sep 17 00:00:00 2001 From: mindfn Date: Fri, 10 Apr 2026 10:53:04 +0800 Subject: [PATCH 02/12] fix(scheduler): cancel past-due once tasks on hydrate instead of executing (#415) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P1: once tasks with fireAt in the past are now cancelled during hydrateDynamic (restart scenario) — records SKIP_MISSED_WINDOW in ledger, sends notification to delivery thread, removes from store. Live registration with slightly-past fireAt still fires immediately. P2: handleMissedOnceTask delivers user notification via fire-and-forget deliver() with task label and original fireAt in the message. [宪宪/Opus-46🐾] Co-Authored-By: Claude Opus 4.6 --- .../infrastructure/scheduler/TaskRunnerV2.ts | 45 ++++++- .../api/src/infrastructure/scheduler/types.ts | 1 + .../api/test/scheduler/task-runner-v2.test.js | 113 +++++++++++++++++- 3 files changed, 156 insertions(+), 3 deletions(-) diff --git a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts index 170a405ba..c12a3e85a 100644 --- a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts +++ b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts @@ -1,5 +1,5 @@ import { getNextCronMs } from './cron-utils.js'; -import type { DynamicTaskStore } from './DynamicTaskStore.js'; +import type { DynamicTaskDef, DynamicTaskStore } from './DynamicTaskStore.js'; import { executeTaskPipeline } from './execute-pipeline.js'; import type { RunLedger } from './RunLedger.js'; import type { TaskTemplate } from './templates/types.js'; @@ -158,6 +158,12 @@ export class TaskRunnerV2 { const defs = store.getAll().filter((d) => d.enabled); let loaded = 0; for (const def of defs) { + // #415: once tasks with past fireAt → missed window, cancel + notify + retire + if (def.trigger.type === 'once' && def.trigger.fireAt < Date.now()) { + this.handleMissedOnceTask(def, store); + continue; + } + const template = templateGetter.get(def.templateId); if (!template) { this.logger.error(`[scheduler] hydrate: unknown template "${def.templateId}" for def ${def.id}`); @@ -273,6 +279,43 @@ export class TaskRunnerV2 { this.logger.info(`[scheduler] ${taskId}: retired (once task completed)`); } + /** #415: Handle once-task that missed its execution window (hydrated after restart) */ + private handleMissedOnceTask(def: DynamicTaskDef, store: DynamicTaskStore): void { + const fireAt = def.trigger.type === 'once' ? def.trigger.fireAt : 0; + const fireAtIso = new Date(fireAt).toISOString(); + this.logger.info(`[scheduler] ${def.id}: once task missed window (fireAt=${fireAtIso}), retiring`); + + // Record in ledger for audit trail + this.ledger.record({ + task_id: def.id, + subject_key: def.id, + outcome: 'SKIP_MISSED_WINDOW', + signal_summary: `Execution window missed: fireAt=${fireAtIso}`, + duration_ms: 0, + started_at: new Date().toISOString(), + assigned_cat_id: null, + error_summary: null, + }); + + // Notify user via delivery thread (fire-and-forget) + if (def.deliveryThreadId && this.deliver) { + const label = def.display?.label ?? def.templateId; + const content = + `⏰ 定时任务「${label}」的执行时间窗已错过(原定 ${fireAtIso}),` + '服务在该时间段未运行。任务已自动取消。'; + this.deliver({ + threadId: def.deliveryThreadId, + content, + catId: def.createdBy, + userId: ((def.params as Record).triggerUserId as string) ?? 'system', + }).catch((err) => { + this.logger.error(`[scheduler] ${def.id}: failed to send missed-window notification`, err); + }); + } + + // Remove from persistent store + store.remove(def.id); + } + stop(): void { for (const [id, timer] of this.timers) { clearTimeout(timer); diff --git a/packages/api/src/infrastructure/scheduler/types.ts b/packages/api/src/infrastructure/scheduler/types.ts index a8002974a..cd3b39acc 100644 --- a/packages/api/src/infrastructure/scheduler/types.ts +++ b/packages/api/src/infrastructure/scheduler/types.ts @@ -42,6 +42,7 @@ export type RunOutcome = | 'SKIP_GLOBAL_PAUSE' | 'SKIP_TASK_OVERRIDE' | 'SKIP_SELF_ECHO' + | 'SKIP_MISSED_WINDOW' | 'RUN_DELIVERED' | 'RUN_FAILED'; diff --git a/packages/api/test/scheduler/task-runner-v2.test.js b/packages/api/test/scheduler/task-runner-v2.test.js index a04e0f5dd..7d237acc3 100644 --- a/packages/api/test/scheduler/task-runner-v2.test.js +++ b/packages/api/test/scheduler/task-runner-v2.test.js @@ -953,11 +953,12 @@ describe('TaskRunnerV2 — once trigger (#415)', () => { runner.stop(); }); - it('once trigger with fireAt in the past fires immediately', async () => { + it('live-registered once trigger with past fireAt fires immediately (processing delay)', async () => { const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); const runner = new TaskRunnerV2({ logger: silentLogger, ledger, dynamicTaskStore }); let executed = false; + // Live registration (not hydration) — should fire even if slightly past runner.registerDynamic( makeOnceTask('once-past', Date.now() - 5000, { run: { @@ -973,7 +974,115 @@ describe('TaskRunnerV2 — once trigger (#415)', () => { runner.start(); await new Promise((r) => setTimeout(r, 100)); - assert.ok(executed, 'once task with past fireAt should fire immediately'); + assert.ok(executed, 'live-registered once task with past fireAt should fire immediately'); + runner.stop(); + }); + + it('hydrated once trigger with past fireAt is cancelled (missed window, not executed)', async () => { + const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); + const runner = new TaskRunnerV2({ logger: silentLogger, ledger, dynamicTaskStore }); + let executed = false; + + // Seed the store with a past-due once task (simulates restart scenario) + const pastFireAt = Date.now() - 60_000; + dynamicTaskStore.insert({ + id: 'dyn-missed-1', + templateId: 'reminder', + trigger: { type: 'once', fireAt: pastFireAt }, + params: { message: 'should not fire' }, + display: { label: '错过的提醒', category: 'system' }, + deliveryThreadId: null, + enabled: true, + createdBy: 'test', + createdAt: new Date(pastFireAt - 60_000).toISOString(), + }); + + // Provide a template that tracks execution + const templateGetter = { + get: (id) => { + if (id !== 'reminder') return null; + return { + templateId: 'reminder', + label: 'Reminder', + category: 'system', + description: 'test', + subjectKind: 'none', + defaultTrigger: { type: 'cron', expression: '0 9 * * *' }, + paramSchema: {}, + createSpec: (instanceId, params) => + makeOnceTask(instanceId, params.trigger.fireAt, { + run: { + overlap: 'skip', + timeoutMs: 5000, + execute: async () => { + executed = true; + }, + }, + }), + }; + }, + }; + + const loaded = runner.hydrateDynamic(dynamicTaskStore, templateGetter); + + // Should NOT have been loaded + assert.equal(loaded, 0, 'past-due once task should not be hydrated'); + + // Should be removed from store + assert.equal(dynamicTaskStore.getById('dyn-missed-1'), null, 'past-due once task should be removed from store'); + + // Should NOT be registered in runner + assert.ok(!runner.getRegisteredTasks().includes('dyn-missed-1'), 'past-due once task should not be in runner'); + + // Should have recorded SKIP_MISSED_WINDOW in ledger + const rows = ledger.query('dyn-missed-1', 10); + assert.equal(rows.length, 1); + assert.equal(rows[0].outcome, 'SKIP_MISSED_WINDOW'); + + // Execute should never have been called + assert.ok(!executed, 'past-due once task should NOT execute'); + runner.stop(); + }); + + it('hydrated once trigger with past fireAt sends missed-window notification', async () => { + const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); + const deliverCalls = []; + const mockDeliver = async (opts) => { + deliverCalls.push(opts); + return 'msg-id'; + }; + const runner = new TaskRunnerV2({ + logger: silentLogger, + ledger, + dynamicTaskStore, + deliver: mockDeliver, + }); + + const pastFireAt = Date.now() - 120_000; + dynamicTaskStore.insert({ + id: 'dyn-notify-1', + templateId: 'reminder', + trigger: { type: 'once', fireAt: pastFireAt }, + params: { message: 'weather check', triggerUserId: 'user-42' }, + display: { label: '天气查询', category: 'system' }, + deliveryThreadId: 'thread-abc', + enabled: true, + createdBy: 'opus', + createdAt: new Date(pastFireAt - 60_000).toISOString(), + }); + + const templateGetter = { get: () => null }; + runner.hydrateDynamic(dynamicTaskStore, templateGetter); + + // Allow fire-and-forget deliver to complete + await new Promise((r) => setTimeout(r, 50)); + + assert.equal(deliverCalls.length, 1, 'should have sent missed-window notification'); + assert.equal(deliverCalls[0].threadId, 'thread-abc'); + assert.equal(deliverCalls[0].catId, 'opus'); + assert.equal(deliverCalls[0].userId, 'user-42'); + assert.ok(deliverCalls[0].content.includes('天气查询'), 'notification should include task label'); + assert.ok(deliverCalls[0].content.includes('错过'), 'notification should mention missed window'); runner.stop(); }); From 73d660ee11b0d01a7e4d16824822d44895d84164 Mon Sep 17 00:00:00 2001 From: mindfn Date: Fri, 10 Apr 2026 11:29:21 +0800 Subject: [PATCH 03/12] feat(scheduler): add lifecycle notifications for all task state transitions (#415) Complete the scheduled task lifecycle by notifying users on register, pause, resume, delete, and execution failure. Extract schedule-notify.ts helper, wire onItemOutcome callback in execute-pipeline for failure detection, and pass deliver into schedule routes for lifecycle events. Co-Authored-By: Claude Opus 4.6 --- packages/api/src/index.ts | 1 + .../infrastructure/scheduler/TaskRunnerV2.ts | 8 + .../scheduler/execute-pipeline.ts | 6 + .../scheduler/schedule-notify.ts | 69 ++++++ packages/api/src/routes/schedule.ts | 26 ++- .../test/scheduler/schedule-notify.test.js | 201 ++++++++++++++++++ 6 files changed, 308 insertions(+), 3 deletions(-) create mode 100644 packages/api/src/infrastructure/scheduler/schedule-notify.ts create mode 100644 packages/api/test/scheduler/schedule-notify.test.js diff --git a/packages/api/src/index.ts b/packages/api/src/index.ts index 6ae427623..7d8ea163f 100644 --- a/packages/api/src/index.ts +++ b/packages/api/src/index.ts @@ -544,6 +544,7 @@ async function main(): Promise { globalControlStore, packTemplateStore, taskStore, + deliver: schedulerDeliver, }); // ── Phase G: Summary Compaction (registers into unified scheduler) ── diff --git a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts index c12a3e85a..3a3dd03f7 100644 --- a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts +++ b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts @@ -2,6 +2,7 @@ import { getNextCronMs } from './cron-utils.js'; import type { DynamicTaskDef, DynamicTaskStore } from './DynamicTaskStore.js'; import { executeTaskPipeline } from './execute-pipeline.js'; import type { RunLedger } from './RunLedger.js'; +import { notifyTaskFailed } from './schedule-notify.js'; import type { TaskTemplate } from './templates/types.js'; import type { ActorRole, @@ -389,6 +390,13 @@ export class TaskRunnerV2 { deliver: this.deliver, fetchContent: this.fetchContent, invokeTrigger: this.invokeTrigger, + onItemOutcome: (taskId, _subjectKey, outcome, errorSummary) => { + if (outcome !== 'RUN_FAILED') return; + const dynDefId = this.dynamicTaskIds.get(taskId); + if (!dynDefId || !this.dynamicTaskStore) return; + const def = this.dynamicTaskStore.getById(dynDefId); + if (def) notifyTaskFailed(this.deliver, def, errorSummary); + }, }); } } diff --git a/packages/api/src/infrastructure/scheduler/execute-pipeline.ts b/packages/api/src/infrastructure/scheduler/execute-pipeline.ts index 8013a7e2e..3159d1851 100644 --- a/packages/api/src/infrastructure/scheduler/execute-pipeline.ts +++ b/packages/api/src/infrastructure/scheduler/execute-pipeline.ts @@ -35,6 +35,8 @@ export interface PipelineContext { fetchContent?: (url: string) => Promise; /** Phase 4b: invoke a cat to handle a scheduled task (fire-and-forget) */ invokeTrigger?: ScheduleInvokeTrigger; + /** #415: per-workItem outcome callback (used for failure notifications) */ + onItemOutcome?: (taskId: string, subjectKey: string, outcome: RunOutcome, errorSummary: string | null) => void; } function withTimeout(promise: Promise, ms: number, taskId: string): Promise { @@ -70,6 +72,7 @@ export async function executeTaskPipeline(ctx: PipelineContext): Promise { deliver, fetchContent, invokeTrigger, + onItemOutcome, } = ctx; const startMs = Date.now(); const tickCount = (tickCounts.get(task.id) ?? 0) + 1; @@ -209,6 +212,9 @@ export async function executeTaskPipeline(ctx: PipelineContext): Promise { error_summary: errorSummary, }); + // #415: notify on outcome (used for failure notifications) + if (onItemOutcome) onItemOutcome(task.id, item.subjectKey, outcome, errorSummary); + // AC-D2: Record emission after successful thread-scoped delivery for self-echo suppression if (outcome === 'RUN_DELIVERED' && emissionStore && item.subjectKey.startsWith('thread-')) { const threadId = item.subjectKey.slice(7); diff --git a/packages/api/src/infrastructure/scheduler/schedule-notify.ts b/packages/api/src/infrastructure/scheduler/schedule-notify.ts new file mode 100644 index 000000000..fb375da38 --- /dev/null +++ b/packages/api/src/infrastructure/scheduler/schedule-notify.ts @@ -0,0 +1,69 @@ +/** + * #415 Phase 2: Task lifecycle notifications + * + * Fire-and-forget notifications to delivery threads for lifecycle events: + * registered, paused, resumed, deleted, failed, missed-window. + */ + +import { getNextCronMs } from './cron-utils.js'; +import type { DynamicTaskDef } from './DynamicTaskStore.js'; +import type { DeliverOpts, TriggerSpec } from './types.js'; + +type DeliverFn = (opts: DeliverOpts) => Promise; + +/** Compute epoch ms of next fire time for a trigger */ +export function computeNextFireTime(trigger: TriggerSpec): number | null { + if (trigger.type === 'once') return trigger.fireAt; + if (trigger.type === 'cron') return Date.now() + getNextCronMs(trigger.expression, trigger.timezone); + if (trigger.type === 'interval') return Date.now() + trigger.ms; + return null; +} + +function formatTime(epoch: number): string { + return new Date(epoch).toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai', hour12: false }); +} + +function resolveUserId(def: DynamicTaskDef): string { + return ((def.params as Record).triggerUserId as string) ?? 'system'; +} + +function label(def: DynamicTaskDef): string { + return def.display?.label ?? def.templateId; +} + +function fire(deliver: DeliverFn | undefined, def: DynamicTaskDef, content: string): void { + if (!deliver || !def.deliveryThreadId) return; + deliver({ threadId: def.deliveryThreadId, content, catId: def.createdBy, userId: resolveUserId(def) }).catch( + () => {}, + ); +} + +export function notifyTaskRegistered(deliver: DeliverFn | undefined, def: DynamicTaskDef): void { + const nextFire = computeNextFireTime(def.trigger); + const timeStr = nextFire ? formatTime(nextFire) : '未知'; + const once = def.trigger.type === 'once' ? '(一次性,执行后自动退役)' : ''; + fire(deliver, def, `✅ 定时任务「${label(def)}」已创建,下次执行时间:${timeStr}${once}`); +} + +export function notifyTaskPaused(deliver: DeliverFn | undefined, def: DynamicTaskDef): void { + fire(deliver, def, `⏸️ 定时任务「${label(def)}」已暂停`); +} + +export function notifyTaskResumed(deliver: DeliverFn | undefined, def: DynamicTaskDef): void { + const nextFire = computeNextFireTime(def.trigger); + const timeStr = nextFire ? formatTime(nextFire) : '未知'; + fire(deliver, def, `▶️ 定时任务「${label(def)}」已恢复,下次执行时间:${timeStr}`); +} + +export function notifyTaskDeleted(deliver: DeliverFn | undefined, def: DynamicTaskDef): void { + fire(deliver, def, `🗑️ 定时任务「${label(def)}」已删除`); +} + +export function notifyTaskFailed( + deliver: DeliverFn | undefined, + def: DynamicTaskDef, + errorSummary: string | null, +): void { + const reason = errorSummary ? `:${errorSummary.slice(0, 200)}` : ''; + fire(deliver, def, `❌ 定时任务「${label(def)}」执行失败${reason}`); +} diff --git a/packages/api/src/routes/schedule.ts b/packages/api/src/routes/schedule.ts index aebf2d768..7f6b19533 100644 --- a/packages/api/src/routes/schedule.ts +++ b/packages/api/src/routes/schedule.ts @@ -19,8 +19,14 @@ import type { ITaskStore } from '../domains/cats/services/stores/ports/TaskStore import type { DynamicTaskStore } from '../infrastructure/scheduler/DynamicTaskStore.js'; import type { GlobalControlStore } from '../infrastructure/scheduler/GlobalControlStore.js'; import type { PackTemplateStore } from '../infrastructure/scheduler/PackTemplateStore.js'; +import { + notifyTaskDeleted, + notifyTaskPaused, + notifyTaskRegistered, + notifyTaskResumed, +} from '../infrastructure/scheduler/schedule-notify.js'; import type { TaskRunnerV2 } from '../infrastructure/scheduler/TaskRunnerV2.js'; -import type { TriggerSpec } from '../infrastructure/scheduler/types.js'; +import type { DeliverOpts, TriggerSpec } from '../infrastructure/scheduler/types.js'; import { resolveHeaderUserId } from '../utils/request-identity.js'; import { governanceRoutes } from './schedule-governance.js'; @@ -54,6 +60,8 @@ export interface ScheduleRoutesOptions { packTemplateStore?: PackTemplateStore; /** #320: Unified task store for thread→subjectKey resolution */ taskStore?: ITaskStore; + /** #415: deliver function for lifecycle notifications */ + deliver?: (opts: DeliverOpts) => Promise; } /** Extract threadId from subjectKey — handles both thread-xxx (real tasks) and thread:xxx formats */ @@ -70,7 +78,8 @@ function addSubjectKeyWithAliases(target: Set, subjectKey: string): void } export const scheduleRoutes: FastifyPluginAsync = async (app, opts) => { - const { taskRunner, dynamicTaskStore, templateRegistry, globalControlStore, packTemplateStore, taskStore } = opts; + const { taskRunner, dynamicTaskStore, templateRegistry, globalControlStore, packTemplateStore, taskStore, deliver } = + opts; // GET /api/schedule/tasks // #320: Optional ?threadId= filter — resolves thread's task subjectKeys for cross-match @@ -327,6 +336,9 @@ export const scheduleRoutes: FastifyPluginAsync = async ( spec.display = display; taskRunner.registerDynamic(spec, id); + // #415: lifecycle notification — task registered + notifyTaskRegistered(deliver, def); + return { success: true, task: { id, ...display, trigger } }; }); @@ -338,6 +350,8 @@ export const scheduleRoutes: FastifyPluginAsync = async ( } const { id } = request.params as { id: string }; + // Read def before deletion for notification + const defForNotify = dynamicTaskStore.getById(id); const removed = dynamicTaskStore.remove(id); if (!removed) { reply.status(404); @@ -345,6 +359,10 @@ export const scheduleRoutes: FastifyPluginAsync = async ( } taskRunner.unregister(id); + + // #415: lifecycle notification — task deleted + if (defForNotify) notifyTaskDeleted(deliver, defForNotify); + return { success: true }; }); @@ -369,12 +387,13 @@ export const scheduleRoutes: FastifyPluginAsync = async ( return { error: 'Dynamic task not found' }; } + const def = dynamicTaskStore.getById(id); if (!body.enabled) { // Pause: unregister from runtime taskRunner.unregister(id); + if (def) notifyTaskPaused(deliver, def); } else { // Resume: re-register in runtime - const def = dynamicTaskStore.getById(id); if (def) { const template = templateRegistry.get(def.templateId); if (template) { @@ -390,6 +409,7 @@ export const scheduleRoutes: FastifyPluginAsync = async ( // Already registered — ignore } } + notifyTaskResumed(deliver, def); } } diff --git a/packages/api/test/scheduler/schedule-notify.test.js b/packages/api/test/scheduler/schedule-notify.test.js new file mode 100644 index 000000000..8caf374aa --- /dev/null +++ b/packages/api/test/scheduler/schedule-notify.test.js @@ -0,0 +1,201 @@ +import assert from 'node:assert/strict'; +import { describe, it } from 'node:test'; + +describe('schedule-notify: computeNextFireTime', () => { + it('once → returns fireAt directly', async () => { + const { computeNextFireTime } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const fireAt = Date.now() + 120_000; + assert.equal(computeNextFireTime({ type: 'once', fireAt }), fireAt); + }); + + it('interval → returns now + ms (within tolerance)', async () => { + const { computeNextFireTime } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const before = Date.now(); + const result = computeNextFireTime({ type: 'interval', ms: 60_000 }); + assert.ok(result >= before + 60_000); + assert.ok(result <= before + 61_000); // 1s tolerance + }); + + it('cron → returns a future epoch ms', async () => { + const { computeNextFireTime } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const result = computeNextFireTime({ type: 'cron', expression: '0 9 * * *' }); + assert.ok(result > Date.now(), 'next cron fire should be in the future'); + }); +}); + +describe('schedule-notify: notification functions', () => { + const makeDef = (overrides = {}) => ({ + id: 'dyn-test-1', + templateId: 'reminder', + trigger: { type: 'once', fireAt: Date.now() + 60_000 }, + params: { message: 'test', triggerUserId: 'user-42' }, + display: { label: '测试提醒', category: 'system' }, + deliveryThreadId: 'thread-xyz', + enabled: true, + createdBy: 'opus', + createdAt: new Date().toISOString(), + ...overrides, + }); + + it('notifyTaskRegistered sends to deliveryThreadId with label and time', async () => { + const { notifyTaskRegistered } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const calls = []; + const mockDeliver = async (opts) => { + calls.push(opts); + return 'msg-1'; + }; + notifyTaskRegistered(mockDeliver, makeDef()); + await new Promise((r) => setTimeout(r, 20)); + assert.equal(calls.length, 1); + assert.equal(calls[0].threadId, 'thread-xyz'); + assert.equal(calls[0].catId, 'opus'); + assert.equal(calls[0].userId, 'user-42'); + assert.ok(calls[0].content.includes('测试提醒')); + assert.ok(calls[0].content.includes('已创建')); + assert.ok(calls[0].content.includes('一次性'), 'should mention once for once-trigger'); + }); + + it('notifyTaskPaused sends pause message', async () => { + const { notifyTaskPaused } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const calls = []; + const mockDeliver = async (opts) => { + calls.push(opts); + return 'msg-1'; + }; + notifyTaskPaused(mockDeliver, makeDef()); + await new Promise((r) => setTimeout(r, 20)); + assert.equal(calls.length, 1); + assert.ok(calls[0].content.includes('已暂停')); + }); + + it('notifyTaskResumed sends resume message with next time', async () => { + const { notifyTaskResumed } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const calls = []; + const mockDeliver = async (opts) => { + calls.push(opts); + return 'msg-1'; + }; + notifyTaskResumed(mockDeliver, makeDef()); + await new Promise((r) => setTimeout(r, 20)); + assert.equal(calls.length, 1); + assert.ok(calls[0].content.includes('已恢复')); + assert.ok(calls[0].content.includes('下次执行时间')); + }); + + it('notifyTaskDeleted sends delete message', async () => { + const { notifyTaskDeleted } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const calls = []; + const mockDeliver = async (opts) => { + calls.push(opts); + return 'msg-1'; + }; + notifyTaskDeleted(mockDeliver, makeDef()); + await new Promise((r) => setTimeout(r, 20)); + assert.equal(calls.length, 1); + assert.ok(calls[0].content.includes('已删除')); + }); + + it('notifyTaskFailed sends failure message with error', async () => { + const { notifyTaskFailed } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const calls = []; + const mockDeliver = async (opts) => { + calls.push(opts); + return 'msg-1'; + }; + notifyTaskFailed(mockDeliver, makeDef(), 'connection timeout'); + await new Promise((r) => setTimeout(r, 20)); + assert.equal(calls.length, 1); + assert.ok(calls[0].content.includes('执行失败')); + assert.ok(calls[0].content.includes('connection timeout')); + }); + + it('no-op when deliver is undefined', async () => { + const { notifyTaskRegistered } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + // Should not throw + notifyTaskRegistered(undefined, makeDef()); + }); + + it('no-op when deliveryThreadId is null', async () => { + const { notifyTaskRegistered } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + let called = false; + const mockDeliver = async () => { + called = true; + return 'msg-1'; + }; + notifyTaskRegistered(mockDeliver, makeDef({ deliveryThreadId: null })); + await new Promise((r) => setTimeout(r, 20)); + assert.ok(!called, 'should not deliver when deliveryThreadId is null'); + }); +}); + +describe('TaskRunnerV2 — execution failure notification (#415)', () => { + it('RUN_FAILED triggers notifyTaskFailed via onItemOutcome', async () => { + const Database = (await import('better-sqlite3')).default; + const db = new Database(':memory:'); + const { applyMigrations } = await import('../../dist/domains/memory/schema.js'); + const { RunLedger } = await import('../../dist/infrastructure/scheduler/RunLedger.js'); + const { DynamicTaskStore } = await import('../../dist/infrastructure/scheduler/DynamicTaskStore.js'); + const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); + applyMigrations(db); + const ledger = new RunLedger(db); + const dynamicTaskStore = new DynamicTaskStore(db); + const deliverCalls = []; + const mockDeliver = async (opts) => { + deliverCalls.push(opts); + return 'msg-1'; + }; + const noop = () => {}; + const runner = new TaskRunnerV2({ + logger: { info: noop, error: noop }, + ledger, + dynamicTaskStore, + deliver: mockDeliver, + }); + + // Seed dynamic store + dynamicTaskStore.insert({ + id: 'dyn-fail-1', + templateId: 'reminder', + trigger: { type: 'interval', ms: 999999 }, + params: { message: 'test', triggerUserId: 'user-42' }, + display: { label: '失败任务', category: 'system' }, + deliveryThreadId: 'thread-fail', + enabled: true, + createdBy: 'opus', + createdAt: new Date().toISOString(), + }); + + runner.registerDynamic( + { + id: 'dyn-fail-1', + profile: 'awareness', + trigger: { type: 'interval', ms: 999999 }, + admission: { + gate: async () => ({ run: true, workItems: [{ signal: 'go', subjectKey: 'k' }] }), + }, + run: { + overlap: 'skip', + timeoutMs: 5000, + execute: async () => { + throw new Error('kaboom'); + }, + }, + state: { runLedger: 'sqlite' }, + outcome: { whenNoSignal: 'drop' }, + enabled: () => true, + }, + 'dyn-fail-1', + ); + + await runner.triggerNow('dyn-fail-1'); + // Allow fire-and-forget to settle + await new Promise((r) => setTimeout(r, 50)); + + assert.ok(deliverCalls.length >= 1, 'should have sent failure notification'); + const failMsg = deliverCalls.find((c) => c.content.includes('执行失败')); + assert.ok(failMsg, 'should contain failure notification'); + assert.equal(failMsg.threadId, 'thread-fail'); + assert.ok(failMsg.content.includes('kaboom')); + runner.stop(); + }); +}); From 624e69eec17e6abf1c783adee4cb019aaa769dfe Mon Sep 17 00:00:00 2001 From: mindfn Date: Fri, 10 Apr 2026 12:20:46 +0800 Subject: [PATCH 04/12] fix(scheduler): only send resume notification after successful re-registration (#415) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When template is missing from registry, the task cannot actually resume. Move notifyTaskResumed inside the template-found branch and return 500 when the template is absent, preventing false "resumed" notifications. [宪宪/Opus-46🐾] Co-Authored-By: Claude Opus 4.6 --- packages/api/src/routes/schedule.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/api/src/routes/schedule.ts b/packages/api/src/routes/schedule.ts index 7f6b19533..0433c89ac 100644 --- a/packages/api/src/routes/schedule.ts +++ b/packages/api/src/routes/schedule.ts @@ -408,8 +408,11 @@ export const scheduleRoutes: FastifyPluginAsync = async ( } catch { // Already registered — ignore } + notifyTaskResumed(deliver, def); + } else { + reply.status(500); + return { error: `Template ${def.templateId} not found — task cannot resume` }; } - notifyTaskResumed(deliver, def); } } From 8a17462d3b1e9e218696ee5faa0702fc0763a292 Mon Sep 17 00:00:00 2001 From: mindfn Date: Fri, 10 Apr 2026 12:33:28 +0800 Subject: [PATCH 05/12] fix(scheduler): address cloud review P1/P2 findings (#415) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P1: Cap setTimeout delay at 2^31-1 ms to prevent Node overflow misfire on once tasks scheduled >24.8 days out. Uses chunked re-scheduling. P1: Roll back setEnabled on failed resume when template is missing, preventing storage/runtime inconsistency. P2: Validate once trigger fireAt/delayMs as finite positive numbers before persisting. [宪宪/Opus-46🐾] Co-Authored-By: Claude Opus 4.6 --- .../src/infrastructure/scheduler/TaskRunnerV2.ts | 16 +++++++++++++--- packages/api/src/routes/schedule.ts | 6 +++++- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts index 3a3dd03f7..239d77210 100644 --- a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts +++ b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts @@ -250,10 +250,20 @@ export class TaskRunnerV2 { ); } + /** Max safe setTimeout delay — Node clamps values above 2^31-1 ms to ~1ms */ + private static MAX_TIMER_DELAY = 2_147_483_647; + /** #415: Schedule a one-shot task — fires once at fireAt, then auto-retires */ private scheduleOnceTick(task: AnyTaskSpec): void { if (task.trigger.type !== 'once') return; - const delay = Math.max(0, task.trigger.fireAt - Date.now()); + const remaining = Math.max(0, task.trigger.fireAt - Date.now()); + // Node setTimeout overflows at 2^31-1 ms — chunk long delays into safe steps + if (remaining > TaskRunnerV2.MAX_TIMER_DELAY) { + const timer = setTimeout(() => this.scheduleOnceTick(task), TaskRunnerV2.MAX_TIMER_DELAY); + if (typeof timer === 'object' && 'unref' in timer) timer.unref(); + this.timers.set(task.id, timer); + return; + } const timer = setTimeout(() => { this.executePipeline(task) .catch((err) => { @@ -262,11 +272,11 @@ export class TaskRunnerV2 { .finally(() => { this.retireOnceTask(task.id); }); - }, delay); + }, remaining); if (typeof timer === 'object' && 'unref' in timer) timer.unref(); this.timers.set(task.id, timer); this.logger.info( - `[scheduler] ${task.id}: registered (profile=${task.profile}, once, fireAt=${new Date(task.trigger.fireAt).toISOString()}, delay=${delay}ms)`, + `[scheduler] ${task.id}: registered (profile=${task.profile}, once, fireAt=${new Date(task.trigger.fireAt).toISOString()}, delay=${remaining}ms)`, ); } diff --git a/packages/api/src/routes/schedule.ts b/packages/api/src/routes/schedule.ts index 0433c89ac..0a0ff3d9e 100644 --- a/packages/api/src/routes/schedule.ts +++ b/packages/api/src/routes/schedule.ts @@ -36,10 +36,13 @@ function normalizeOnceTrigger(trigger: Record): TriggerSpec | { const delayMs = typeof trigger.delayMs === 'number' ? trigger.delayMs : undefined; const fireAt = typeof trigger.fireAt === 'number' ? trigger.fireAt : undefined; if (delayMs != null) { - if (delayMs < 0) return { error: 'once trigger delayMs must be >= 0' }; + if (!Number.isFinite(delayMs) || delayMs < 0) return { error: 'once trigger delayMs must be a finite number >= 0' }; return { type: 'once', fireAt: Date.now() + delayMs }; } if (fireAt != null) { + if (!Number.isFinite(fireAt) || fireAt < 0) { + return { error: 'once trigger fireAt must be a finite positive epoch ms' }; + } return { type: 'once', fireAt }; } return { error: 'once trigger requires either delayMs or fireAt' }; @@ -410,6 +413,7 @@ export const scheduleRoutes: FastifyPluginAsync = async ( } notifyTaskResumed(deliver, def); } else { + dynamicTaskStore.setEnabled(id, false); // roll back — resume failed reply.status(500); return { error: `Template ${def.templateId} not found — task cannot resume` }; } From 53b25aeee6d71710441a9662f0169029975eaa23 Mon Sep 17 00:00:00 2001 From: mindfn Date: Fri, 10 Apr 2026 14:43:01 +0800 Subject: [PATCH 06/12] feat(scheduler): add success notification with next fire time (#415) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Notify users after successful task execution: recurring tasks report next fire time, once tasks confirm completion and auto-retirement. Closes the last lifecycle feedback gap identified by review. [宪宪/Opus-46🐾] Co-Authored-By: Claude Opus 4.6 --- .../infrastructure/scheduler/TaskRunnerV2.ts | 7 +- .../scheduler/schedule-notify.ts | 10 ++ .../test/scheduler/schedule-notify.test.js | 108 ++++++++++++++++++ 3 files changed, 122 insertions(+), 3 deletions(-) diff --git a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts index 239d77210..e5fd9c48d 100644 --- a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts +++ b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts @@ -2,7 +2,7 @@ import { getNextCronMs } from './cron-utils.js'; import type { DynamicTaskDef, DynamicTaskStore } from './DynamicTaskStore.js'; import { executeTaskPipeline } from './execute-pipeline.js'; import type { RunLedger } from './RunLedger.js'; -import { notifyTaskFailed } from './schedule-notify.js'; +import { notifyTaskFailed, notifyTaskSucceeded } from './schedule-notify.js'; import type { TaskTemplate } from './templates/types.js'; import type { ActorRole, @@ -401,11 +401,12 @@ export class TaskRunnerV2 { fetchContent: this.fetchContent, invokeTrigger: this.invokeTrigger, onItemOutcome: (taskId, _subjectKey, outcome, errorSummary) => { - if (outcome !== 'RUN_FAILED') return; const dynDefId = this.dynamicTaskIds.get(taskId); if (!dynDefId || !this.dynamicTaskStore) return; const def = this.dynamicTaskStore.getById(dynDefId); - if (def) notifyTaskFailed(this.deliver, def, errorSummary); + if (!def) return; + if (outcome === 'RUN_FAILED') notifyTaskFailed(this.deliver, def, errorSummary); + if (outcome === 'RUN_DELIVERED') notifyTaskSucceeded(this.deliver, def); }, }); } diff --git a/packages/api/src/infrastructure/scheduler/schedule-notify.ts b/packages/api/src/infrastructure/scheduler/schedule-notify.ts index fb375da38..ca933dff9 100644 --- a/packages/api/src/infrastructure/scheduler/schedule-notify.ts +++ b/packages/api/src/infrastructure/scheduler/schedule-notify.ts @@ -59,6 +59,16 @@ export function notifyTaskDeleted(deliver: DeliverFn | undefined, def: DynamicTa fire(deliver, def, `🗑️ 定时任务「${label(def)}」已删除`); } +export function notifyTaskSucceeded(deliver: DeliverFn | undefined, def: DynamicTaskDef): void { + if (def.trigger.type === 'once') { + fire(deliver, def, `✅ 定时任务「${label(def)}」已执行完成,任务已自动结束`); + } else { + const nextFire = computeNextFireTime(def.trigger); + const timeStr = nextFire ? formatTime(nextFire) : '未知'; + fire(deliver, def, `✅ 定时任务「${label(def)}」本次执行完成,下次执行时间:${timeStr}`); + } +} + export function notifyTaskFailed( deliver: DeliverFn | undefined, def: DynamicTaskDef, diff --git a/packages/api/test/scheduler/schedule-notify.test.js b/packages/api/test/scheduler/schedule-notify.test.js index 8caf374aa..5a973bf15 100644 --- a/packages/api/test/scheduler/schedule-notify.test.js +++ b/packages/api/test/scheduler/schedule-notify.test.js @@ -198,4 +198,112 @@ describe('TaskRunnerV2 — execution failure notification (#415)', () => { assert.ok(failMsg.content.includes('kaboom')); runner.stop(); }); + + it('RUN_DELIVERED triggers notifyTaskSucceeded via onItemOutcome', async () => { + const Database = (await import('better-sqlite3')).default; + const db = new Database(':memory:'); + const { applyMigrations } = await import('../../dist/domains/memory/schema.js'); + const { RunLedger } = await import('../../dist/infrastructure/scheduler/RunLedger.js'); + const { DynamicTaskStore } = await import('../../dist/infrastructure/scheduler/DynamicTaskStore.js'); + const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); + applyMigrations(db); + const ledger = new RunLedger(db); + const dynamicTaskStore = new DynamicTaskStore(db); + const deliverCalls = []; + const mockDeliver = async (opts) => { + deliverCalls.push(opts); + return 'msg-1'; + }; + const noop = () => {}; + const runner = new TaskRunnerV2({ + logger: { info: noop, error: noop }, + ledger, + dynamicTaskStore, + deliver: mockDeliver, + }); + + dynamicTaskStore.insert({ + id: 'dyn-ok-1', + templateId: 'reminder', + trigger: { type: 'interval', ms: 999999 }, + params: { message: 'test', triggerUserId: 'user-42' }, + display: { label: '成功任务', category: 'system' }, + deliveryThreadId: 'thread-ok', + enabled: true, + createdBy: 'opus', + createdAt: new Date().toISOString(), + }); + + runner.registerDynamic( + { + id: 'dyn-ok-1', + profile: 'awareness', + trigger: { type: 'interval', ms: 999999 }, + admission: { + gate: async () => ({ run: true, workItems: [{ signal: 'go', subjectKey: 'k' }] }), + }, + run: { + overlap: 'skip', + timeoutMs: 5000, + execute: async () => ({ delivered: true }), + }, + state: { runLedger: 'sqlite' }, + outcome: { whenNoSignal: 'drop' }, + enabled: () => true, + }, + 'dyn-ok-1', + ); + + await runner.triggerNow('dyn-ok-1'); + await new Promise((r) => setTimeout(r, 50)); + + const successMsg = deliverCalls.find((c) => c.content.includes('执行完成')); + assert.ok(successMsg, 'should contain success notification'); + assert.equal(successMsg.threadId, 'thread-ok'); + assert.ok(successMsg.content.includes('下次执行时间'), 'recurring task should include next fire time'); + runner.stop(); + }); +}); + +describe('schedule-notify: notifyTaskSucceeded', () => { + const makeDef2 = (overrides = {}) => ({ + id: 'dyn-test-1', + templateId: 'reminder', + trigger: { type: 'once', fireAt: Date.now() + 60_000 }, + params: { message: 'test', triggerUserId: 'user-42' }, + display: { label: '测试提醒', category: 'system' }, + deliveryThreadId: 'thread-xyz', + enabled: true, + createdBy: 'opus', + createdAt: new Date().toISOString(), + ...overrides, + }); + + it('recurring task includes next fire time', async () => { + const { notifyTaskSucceeded } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const calls = []; + const mockDeliver = async (opts) => { + calls.push(opts); + return 'msg-1'; + }; + notifyTaskSucceeded(mockDeliver, makeDef2({ trigger: { type: 'interval', ms: 60000 } })); + await new Promise((r) => setTimeout(r, 20)); + assert.equal(calls.length, 1); + assert.ok(calls[0].content.includes('本次执行完成')); + assert.ok(calls[0].content.includes('下次执行时间')); + }); + + it('once task says task has ended', async () => { + const { notifyTaskSucceeded } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const calls = []; + const mockDeliver = async (opts) => { + calls.push(opts); + return 'msg-1'; + }; + notifyTaskSucceeded(mockDeliver, makeDef2({ trigger: { type: 'once', fireAt: Date.now() } })); + await new Promise((r) => setTimeout(r, 20)); + assert.equal(calls.length, 1); + assert.ok(calls[0].content.includes('已执行完成')); + assert.ok(calls[0].content.includes('自动结束')); + }); }); From d6054ccb104d8d965f974780fb23f65615b06462 Mon Sep 17 00:00:00 2001 From: mindfn Date: Fri, 10 Apr 2026 14:48:01 +0800 Subject: [PATCH 07/12] fix: biome format for project-setup-card-ime test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-existing formatting issue exposed after rebase onto latest main. [宪宪/Opus-46🐾] Co-Authored-By: Claude Opus 4.6 --- .../components/__tests__/project-setup-card-ime.test.tsx | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/packages/web/src/components/__tests__/project-setup-card-ime.test.tsx b/packages/web/src/components/__tests__/project-setup-card-ime.test.tsx index 6a8bd8ac2..f27f03332 100644 --- a/packages/web/src/components/__tests__/project-setup-card-ime.test.tsx +++ b/packages/web/src/components/__tests__/project-setup-card-ime.test.tsx @@ -40,13 +40,7 @@ describe('ProjectSetupCard IME guard', () => { await act(async () => { root.render( - , + , ); }); From 1cf3c58366c7999d03121d5c372eda13bc8ebf05 Mon Sep 17 00:00:00 2001 From: mindfn Date: Fri, 10 Apr 2026 17:36:38 +0800 Subject: [PATCH 08/12] fix(scheduler): only retire once tasks after actual execution (#415) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P1: Don't retire once tasks on governance skip (SKIP_GLOBAL_PAUSE, SKIP_TASK_OVERRIDE). Track actual execution via onceTaskExecuted set, only retire when work items were processed. P2: Use taskId directly in retireOnceTask instead of dynamicTaskIds map lookup, preventing orphaned DB rows when task is paused during in-flight execution. [宪宪/Opus-46🐾] Co-Authored-By: Claude Opus 4.6 --- .../infrastructure/scheduler/TaskRunnerV2.ts | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts index e5fd9c48d..ccd965f9c 100644 --- a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts +++ b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts @@ -85,6 +85,8 @@ export class TaskRunnerV2 { private lastRunAt = new Map(); /** Phase 3A: track dynamic task IDs → DynamicTaskDef.id mapping */ private dynamicTaskIds = new Map(); + /** #415: track once-tasks that actually executed (not governance-skipped) */ + private onceTaskExecuted = new Set(); /** True after start() has been called — used to auto-schedule late-registered tasks */ private started = false; private logger: TaskRunnerV2Options['logger']; @@ -265,12 +267,18 @@ export class TaskRunnerV2 { return; } const timer = setTimeout(() => { + this.onceTaskExecuted.delete(task.id); this.executePipeline(task) .catch((err) => { this.logger.error(`[scheduler] ${task.id}: pipeline error`, err); }) .finally(() => { - this.retireOnceTask(task.id); + if (this.onceTaskExecuted.has(task.id)) { + this.onceTaskExecuted.delete(task.id); + this.retireOnceTask(task.id); + } else { + this.logger.info(`[scheduler] ${task.id}: once task governance-skipped, not retiring`); + } }); }, remaining); if (typeof timer === 'object' && 'unref' in timer) timer.unref(); @@ -282,9 +290,9 @@ export class TaskRunnerV2 { /** #415: Remove a once-task from runtime + persistent store after execution */ private retireOnceTask(taskId: string): void { - const dynDefId = this.dynamicTaskIds.get(taskId); - if (dynDefId && this.dynamicTaskStore) { - this.dynamicTaskStore.remove(dynDefId); + // Use taskId directly — for dynamic tasks, taskId === dynDefId + if (this.dynamicTaskStore) { + this.dynamicTaskStore.remove(taskId); } this.unregister(taskId); this.logger.info(`[scheduler] ${taskId}: retired (once task completed)`); @@ -401,6 +409,9 @@ export class TaskRunnerV2 { fetchContent: this.fetchContent, invokeTrigger: this.invokeTrigger, onItemOutcome: (taskId, _subjectKey, outcome, errorSummary) => { + if (outcome === 'RUN_DELIVERED' || outcome === 'RUN_FAILED') { + this.onceTaskExecuted.add(taskId); + } const dynDefId = this.dynamicTaskIds.get(taskId); if (!dynDefId || !this.dynamicTaskStore) return; const def = this.dynamicTaskStore.getById(dynDefId); From ecb1eba03f1116f8c980d1524f124c4012eb4642 Mon Sep 17 00:00:00 2001 From: mindfn Date: Fri, 10 Apr 2026 17:46:40 +0800 Subject: [PATCH 09/12] fix(scheduler): retry governance-skipped once tasks + scope tracking set (#415) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P1: Once tasks now retry every 30s after governance skip until controls are lifted, instead of being stranded with no follow-up execution. P2: onceTaskExecuted set is now restricted to once-trigger tasks only, preventing stale entries from interval/cron tasks accumulating. [宪宪/Opus-46🐾] Co-Authored-By: Claude Opus 4.6 --- .../api/src/infrastructure/scheduler/TaskRunnerV2.ts | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts index ccd965f9c..210324fd4 100644 --- a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts +++ b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts @@ -277,7 +277,14 @@ export class TaskRunnerV2 { this.onceTaskExecuted.delete(task.id); this.retireOnceTask(task.id); } else { - this.logger.info(`[scheduler] ${task.id}: once task governance-skipped, not retiring`); + // Governance-skipped — retry after delay until controls are lifted + this.logger.info(`[scheduler] ${task.id}: once task governance-skipped, retrying in 30s`); + const retryTimer = setTimeout(() => { + if (!this.tasks.some((t) => t.id === task.id)) return; + this.scheduleOnceTick(task); + }, 30_000); + if (typeof retryTimer === 'object' && 'unref' in retryTimer) retryTimer.unref(); + this.timers.set(task.id, retryTimer); } }); }, remaining); @@ -410,7 +417,8 @@ export class TaskRunnerV2 { invokeTrigger: this.invokeTrigger, onItemOutcome: (taskId, _subjectKey, outcome, errorSummary) => { if (outcome === 'RUN_DELIVERED' || outcome === 'RUN_FAILED') { - this.onceTaskExecuted.add(taskId); + const spec = this.tasks.find((t) => t.id === taskId); + if (spec?.trigger.type === 'once') this.onceTaskExecuted.add(taskId); } const dynDefId = this.dynamicTaskIds.get(taskId); if (!dynDefId || !this.dynamicTaskStore) return; From ce474ce73f9e3f8bf8ca3b1fb141a08e81e99094 Mon Sep 17 00:00:00 2001 From: mindfn Date: Fri, 10 Apr 2026 17:54:05 +0800 Subject: [PATCH 10/12] fix(scheduler): check started flag before retrying governance-skipped once task (#415) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Prevent ghost retries after runner.stop() by guarding the retry callback with this.started check. [宪宪/Opus-46🐾] Co-Authored-By: Claude Opus 4.6 --- packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts index 210324fd4..4045adc65 100644 --- a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts +++ b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts @@ -280,7 +280,7 @@ export class TaskRunnerV2 { // Governance-skipped — retry after delay until controls are lifted this.logger.info(`[scheduler] ${task.id}: once task governance-skipped, retrying in 30s`); const retryTimer = setTimeout(() => { - if (!this.tasks.some((t) => t.id === task.id)) return; + if (!this.started || !this.tasks.some((t) => t.id === task.id)) return; this.scheduleOnceTick(task); }, 30_000); if (typeof retryTimer === 'object' && 'unref' in retryTimer) retryTimer.unref(); From 1fe521029235b82c16bf43264ad4d0137eb2f401 Mon Sep 17 00:00:00 2001 From: mindfn Date: Fri, 10 Apr 2026 22:32:26 +0800 Subject: [PATCH 11/12] fix(scheduler): use ledger-based retry detection + unregister guard (#415) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P2: Replace onceTaskExecuted set with ledger query — only retry on SKIP_GLOBAL_PAUSE/SKIP_TASK_OVERRIDE, retire on all other outcomes (gate-no-signal, gate-throw, etc.). Removes stale-set accumulation. P2: Add timers.has guard to once timer callback to prevent ghost execution after task unregistration. [宪宪/Opus-46🐾] Co-Authored-By: Claude Opus 4.6 --- .../infrastructure/scheduler/TaskRunnerV2.ts | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts index 4045adc65..4f2c69101 100644 --- a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts +++ b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts @@ -85,8 +85,6 @@ export class TaskRunnerV2 { private lastRunAt = new Map(); /** Phase 3A: track dynamic task IDs → DynamicTaskDef.id mapping */ private dynamicTaskIds = new Map(); - /** #415: track once-tasks that actually executed (not governance-skipped) */ - private onceTaskExecuted = new Set(); /** True after start() has been called — used to auto-schedule late-registered tasks */ private started = false; private logger: TaskRunnerV2Options['logger']; @@ -267,17 +265,18 @@ export class TaskRunnerV2 { return; } const timer = setTimeout(() => { - this.onceTaskExecuted.delete(task.id); + // Guard: skip if task was unregistered before timeout fires + if (!this.timers.has(task.id)) return; this.executePipeline(task) .catch((err) => { this.logger.error(`[scheduler] ${task.id}: pipeline error`, err); }) .finally(() => { - if (this.onceTaskExecuted.has(task.id)) { - this.onceTaskExecuted.delete(task.id); - this.retireOnceTask(task.id); - } else { - // Governance-skipped — retry after delay until controls are lifted + // Check ledger to distinguish governance skip from actual execution/other skips + const entries = this.ledger.query(task.id, 1); + const lastOutcome = entries[0]?.outcome; + const isGovernanceSkip = lastOutcome === 'SKIP_GLOBAL_PAUSE' || lastOutcome === 'SKIP_TASK_OVERRIDE'; + if (isGovernanceSkip) { this.logger.info(`[scheduler] ${task.id}: once task governance-skipped, retrying in 30s`); const retryTimer = setTimeout(() => { if (!this.started || !this.tasks.some((t) => t.id === task.id)) return; @@ -285,6 +284,8 @@ export class TaskRunnerV2 { }, 30_000); if (typeof retryTimer === 'object' && 'unref' in retryTimer) retryTimer.unref(); this.timers.set(task.id, retryTimer); + } else { + this.retireOnceTask(task.id); } }); }, remaining); @@ -416,10 +417,6 @@ export class TaskRunnerV2 { fetchContent: this.fetchContent, invokeTrigger: this.invokeTrigger, onItemOutcome: (taskId, _subjectKey, outcome, errorSummary) => { - if (outcome === 'RUN_DELIVERED' || outcome === 'RUN_FAILED') { - const spec = this.tasks.find((t) => t.id === taskId); - if (spec?.trigger.type === 'once') this.onceTaskExecuted.add(taskId); - } const dynDefId = this.dynamicTaskIds.get(taskId); if (!dynDefId || !this.dynamicTaskStore) return; const def = this.dynamicTaskStore.getById(dynDefId); From 36cbd0d4d94ba40f168f30222aa6587c2bbec68e Mon Sep 17 00:00:00 2001 From: mindfn Date: Fri, 10 Apr 2026 23:26:47 +0800 Subject: [PATCH 12/12] fix(scheduler): use server-authoritative catId for lifecycle notifications (#415) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace client-supplied `def.createdBy` with `'system'` in schedule-notify fire() to prevent sender impersonation. Matches codebase convention used in reminder.ts and other server-generated messages. [宪宪/Opus-46🐾] Co-Authored-By: Claude Opus 4.6 --- packages/api/src/infrastructure/scheduler/schedule-notify.ts | 4 +--- packages/api/test/scheduler/schedule-notify.test.js | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/api/src/infrastructure/scheduler/schedule-notify.ts b/packages/api/src/infrastructure/scheduler/schedule-notify.ts index ca933dff9..be337fccb 100644 --- a/packages/api/src/infrastructure/scheduler/schedule-notify.ts +++ b/packages/api/src/infrastructure/scheduler/schedule-notify.ts @@ -33,9 +33,7 @@ function label(def: DynamicTaskDef): string { function fire(deliver: DeliverFn | undefined, def: DynamicTaskDef, content: string): void { if (!deliver || !def.deliveryThreadId) return; - deliver({ threadId: def.deliveryThreadId, content, catId: def.createdBy, userId: resolveUserId(def) }).catch( - () => {}, - ); + deliver({ threadId: def.deliveryThreadId, content, catId: 'system', userId: resolveUserId(def) }).catch(() => {}); } export function notifyTaskRegistered(deliver: DeliverFn | undefined, def: DynamicTaskDef): void { diff --git a/packages/api/test/scheduler/schedule-notify.test.js b/packages/api/test/scheduler/schedule-notify.test.js index 5a973bf15..a5cbdf5a8 100644 --- a/packages/api/test/scheduler/schedule-notify.test.js +++ b/packages/api/test/scheduler/schedule-notify.test.js @@ -48,7 +48,7 @@ describe('schedule-notify: notification functions', () => { await new Promise((r) => setTimeout(r, 20)); assert.equal(calls.length, 1); assert.equal(calls[0].threadId, 'thread-xyz'); - assert.equal(calls[0].catId, 'opus'); + assert.equal(calls[0].catId, 'system'); assert.equal(calls[0].userId, 'user-42'); assert.ok(calls[0].content.includes('测试提醒')); assert.ok(calls[0].content.includes('已创建'));