diff --git a/cat-cafe-skills/schedule-tasks/SKILL.md b/cat-cafe-skills/schedule-tasks/SKILL.md index 9895d0261..c815d2626 100644 --- a/cat-cafe-skills/schedule-tasks/SKILL.md +++ b/cat-cafe-skills/schedule-tasks/SKILL.md @@ -1,9 +1,9 @@ --- name: schedule-tasks description: > - 定时任务注册、管理、能力指南。 - Use when: 用户想设定时任务、定期提醒、周期巡检、定时发送内容。 - Not for: 一次性即时操作、已有 builtin 任务的手动触发。 + 定时任务注册、管理、能力指南。支持周期任务和一次性延迟任务。 + Use when: 用户想设定时任务、定期提醒、周期巡检、定时发送内容、延迟执行一次性操作。 + Not for: 已有 builtin 任务的手动触发。 Output: 注册/管理定时任务,任务到点唤醒猫执行。 triggers: - "定时" @@ -17,6 +17,12 @@ triggers: - "定期" - "周期" - "定时任务" + - "分钟后" + - "小时后" + - "之后" + - "later" + - "in 5 minutes" + - "once" --- # Schedule Tasks — 定时任务注册与管理 @@ -72,6 +78,8 @@ triggers: ## Trigger 语法速查 +### 周期触发(recurring) + | 用户说 | trigger JSON | |--------|-------------| | 每天早上 9 点 | `{"type":"cron","expression":"0 9 * * *"}` | @@ -80,6 +88,17 @@ triggers: | 每周一早上 10 点 | `{"type":"cron","expression":"0 10 * * 1"}` | | 每 5 分钟 | `{"type":"interval","ms":300000}` | +### 一次性触发(once — #415) + +| 用户说 | trigger JSON | +|--------|-------------| +| 2 分钟后提醒我 | `{"type":"once","delayMs":120000}` | +| 1 小时后查天气 | `{"type":"once","delayMs":3600000}` | +| 30 秒后通知我 | `{"type":"once","delayMs":30000}` | + +一次性任务执行后会**自动退役**(从 runtime 注销 + 从 SQLite 删除),不会重复触发。 +路由层会将 `delayMs` 归一化为绝对时间 `fireAt`(epoch ms),确保重启后触发时间不漂移。 + ## 管理 | 操作 | 工具 | diff --git a/packages/api/src/index.ts b/packages/api/src/index.ts index 1ce43ffcc..7d8ea163f 100644 --- a/packages/api/src/index.ts +++ b/packages/api/src/index.ts @@ -533,6 +533,7 @@ async function main(): Promise { const { DynamicTaskStore } = await import('./infrastructure/scheduler/DynamicTaskStore.js'); const { templateRegistry } = await import('./infrastructure/scheduler/templates/registry.js'); const dynamicTaskStore = new DynamicTaskStore(schedulerDb); + taskRunnerV2.setDynamicTaskStore(dynamicTaskStore); // #415: wire store for once-trigger auto-retirement // ── F139 Phase 2+3A+3B: Schedule panel API routes ── const { scheduleRoutes } = await import('./routes/schedule.js'); @@ -543,6 +544,7 @@ async function main(): Promise { globalControlStore, packTemplateStore, taskStore, + deliver: schedulerDeliver, }); // ── Phase G: Summary Compaction (registers into unified scheduler) ── diff --git a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts index e727d8732..4f2c69101 100644 --- a/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts +++ b/packages/api/src/infrastructure/scheduler/TaskRunnerV2.ts @@ -1,7 +1,8 @@ import { getNextCronMs } from './cron-utils.js'; -import type { DynamicTaskStore } from './DynamicTaskStore.js'; +import type { DynamicTaskDef, DynamicTaskStore } from './DynamicTaskStore.js'; import { executeTaskPipeline } from './execute-pipeline.js'; import type { RunLedger } from './RunLedger.js'; +import { notifyTaskFailed, notifyTaskSucceeded } from './schedule-notify.js'; import type { TaskTemplate } from './templates/types.js'; import type { ActorRole, @@ -31,6 +32,8 @@ export interface TaskRunnerV2Options { fetchContent?: (url: string) => Promise; /** Phase 4b: invoke a cat to handle a scheduled task (fire-and-forget) */ invokeTrigger?: ScheduleInvokeTrigger; + /** #415: dynamic task store — needed for once-trigger auto-retirement */ + dynamicTaskStore?: DynamicTaskStore; } /** Phase 2.5: Compute human-readable subject preview from subjectKind + lastRun (AC-E2) */ @@ -92,6 +95,7 @@ export class TaskRunnerV2 { private deliver: TaskRunnerV2Options['deliver']; private fetchContent: TaskRunnerV2Options['fetchContent']; private invokeTrigger: TaskRunnerV2Options['invokeTrigger']; + private dynamicTaskStore: TaskRunnerV2Options['dynamicTaskStore']; constructor(opts: TaskRunnerV2Options) { this.logger = opts.logger; @@ -102,6 +106,7 @@ export class TaskRunnerV2 { this.deliver = opts.deliver; this.fetchContent = opts.fetchContent; this.invokeTrigger = opts.invokeTrigger; + this.dynamicTaskStore = opts.dynamicTaskStore; } /** Late-bind invokeTrigger (constructed after TaskRunnerV2 in boot sequence) */ @@ -109,6 +114,11 @@ export class TaskRunnerV2 { this.invokeTrigger = trigger; } + /** #415: Late-bind dynamicTaskStore (constructed after TaskRunnerV2 in boot sequence) */ + setDynamicTaskStore(store: DynamicTaskStore): void { + this.dynamicTaskStore = store; + } + register(task: AnyTaskSpec): void { if (this.tasks.some((t) => t.id === task.id)) { throw new Error(`TaskRunnerV2: duplicate task id "${task.id}"`); @@ -149,6 +159,12 @@ export class TaskRunnerV2 { const defs = store.getAll().filter((d) => d.enabled); let loaded = 0; for (const def of defs) { + // #415: once tasks with past fireAt → missed window, cancel + notify + retire + if (def.trigger.type === 'once' && def.trigger.fireAt < Date.now()) { + this.handleMissedOnceTask(def, store); + continue; + } + const template = templateGetter.get(def.templateId); if (!template) { this.logger.error(`[scheduler] hydrate: unknown template "${def.templateId}" for def ${def.id}`); @@ -190,6 +206,8 @@ export class TaskRunnerV2 { if (task.trigger.type === 'cron') { this.scheduleCronTick(task); + } else if (task.trigger.type === 'once') { + this.scheduleOnceTick(task); } else { const runTick = () => { // Guard: skip if task was unregistered before tick fires (防幽灵执行) @@ -232,6 +250,99 @@ export class TaskRunnerV2 { ); } + /** Max safe setTimeout delay — Node clamps values above 2^31-1 ms to ~1ms */ + private static MAX_TIMER_DELAY = 2_147_483_647; + + /** #415: Schedule a one-shot task — fires once at fireAt, then auto-retires */ + private scheduleOnceTick(task: AnyTaskSpec): void { + if (task.trigger.type !== 'once') return; + const remaining = Math.max(0, task.trigger.fireAt - Date.now()); + // Node setTimeout overflows at 2^31-1 ms — chunk long delays into safe steps + if (remaining > TaskRunnerV2.MAX_TIMER_DELAY) { + const timer = setTimeout(() => this.scheduleOnceTick(task), TaskRunnerV2.MAX_TIMER_DELAY); + if (typeof timer === 'object' && 'unref' in timer) timer.unref(); + this.timers.set(task.id, timer); + return; + } + const timer = setTimeout(() => { + // Guard: skip if task was unregistered before timeout fires + if (!this.timers.has(task.id)) return; + this.executePipeline(task) + .catch((err) => { + this.logger.error(`[scheduler] ${task.id}: pipeline error`, err); + }) + .finally(() => { + // Check ledger to distinguish governance skip from actual execution/other skips + const entries = this.ledger.query(task.id, 1); + const lastOutcome = entries[0]?.outcome; + const isGovernanceSkip = lastOutcome === 'SKIP_GLOBAL_PAUSE' || lastOutcome === 'SKIP_TASK_OVERRIDE'; + if (isGovernanceSkip) { + this.logger.info(`[scheduler] ${task.id}: once task governance-skipped, retrying in 30s`); + const retryTimer = setTimeout(() => { + if (!this.started || !this.tasks.some((t) => t.id === task.id)) return; + this.scheduleOnceTick(task); + }, 30_000); + if (typeof retryTimer === 'object' && 'unref' in retryTimer) retryTimer.unref(); + this.timers.set(task.id, retryTimer); + } else { + this.retireOnceTask(task.id); + } + }); + }, remaining); + if (typeof timer === 'object' && 'unref' in timer) timer.unref(); + this.timers.set(task.id, timer); + this.logger.info( + `[scheduler] ${task.id}: registered (profile=${task.profile}, once, fireAt=${new Date(task.trigger.fireAt).toISOString()}, delay=${remaining}ms)`, + ); + } + + /** #415: Remove a once-task from runtime + persistent store after execution */ + private retireOnceTask(taskId: string): void { + // Use taskId directly — for dynamic tasks, taskId === dynDefId + if (this.dynamicTaskStore) { + this.dynamicTaskStore.remove(taskId); + } + this.unregister(taskId); + this.logger.info(`[scheduler] ${taskId}: retired (once task completed)`); + } + + /** #415: Handle once-task that missed its execution window (hydrated after restart) */ + private handleMissedOnceTask(def: DynamicTaskDef, store: DynamicTaskStore): void { + const fireAt = def.trigger.type === 'once' ? def.trigger.fireAt : 0; + const fireAtIso = new Date(fireAt).toISOString(); + this.logger.info(`[scheduler] ${def.id}: once task missed window (fireAt=${fireAtIso}), retiring`); + + // Record in ledger for audit trail + this.ledger.record({ + task_id: def.id, + subject_key: def.id, + outcome: 'SKIP_MISSED_WINDOW', + signal_summary: `Execution window missed: fireAt=${fireAtIso}`, + duration_ms: 0, + started_at: new Date().toISOString(), + assigned_cat_id: null, + error_summary: null, + }); + + // Notify user via delivery thread (fire-and-forget) + if (def.deliveryThreadId && this.deliver) { + const label = def.display?.label ?? def.templateId; + const content = + `⏰ 定时任务「${label}」的执行时间窗已错过(原定 ${fireAtIso}),` + '服务在该时间段未运行。任务已自动取消。'; + this.deliver({ + threadId: def.deliveryThreadId, + content, + catId: def.createdBy, + userId: ((def.params as Record).triggerUserId as string) ?? 'system', + }).catch((err) => { + this.logger.error(`[scheduler] ${def.id}: failed to send missed-window notification`, err); + }); + } + + // Remove from persistent store + store.remove(def.id); + } + stop(): void { for (const [id, timer] of this.timers) { clearTimeout(timer); @@ -305,6 +416,14 @@ export class TaskRunnerV2 { deliver: this.deliver, fetchContent: this.fetchContent, invokeTrigger: this.invokeTrigger, + onItemOutcome: (taskId, _subjectKey, outcome, errorSummary) => { + const dynDefId = this.dynamicTaskIds.get(taskId); + if (!dynDefId || !this.dynamicTaskStore) return; + const def = this.dynamicTaskStore.getById(dynDefId); + if (!def) return; + if (outcome === 'RUN_FAILED') notifyTaskFailed(this.deliver, def, errorSummary); + if (outcome === 'RUN_DELIVERED') notifyTaskSucceeded(this.deliver, def); + }, }); } } diff --git a/packages/api/src/infrastructure/scheduler/execute-pipeline.ts b/packages/api/src/infrastructure/scheduler/execute-pipeline.ts index 8013a7e2e..3159d1851 100644 --- a/packages/api/src/infrastructure/scheduler/execute-pipeline.ts +++ b/packages/api/src/infrastructure/scheduler/execute-pipeline.ts @@ -35,6 +35,8 @@ export interface PipelineContext { fetchContent?: (url: string) => Promise; /** Phase 4b: invoke a cat to handle a scheduled task (fire-and-forget) */ invokeTrigger?: ScheduleInvokeTrigger; + /** #415: per-workItem outcome callback (used for failure notifications) */ + onItemOutcome?: (taskId: string, subjectKey: string, outcome: RunOutcome, errorSummary: string | null) => void; } function withTimeout(promise: Promise, ms: number, taskId: string): Promise { @@ -70,6 +72,7 @@ export async function executeTaskPipeline(ctx: PipelineContext): Promise { deliver, fetchContent, invokeTrigger, + onItemOutcome, } = ctx; const startMs = Date.now(); const tickCount = (tickCounts.get(task.id) ?? 0) + 1; @@ -209,6 +212,9 @@ export async function executeTaskPipeline(ctx: PipelineContext): Promise { error_summary: errorSummary, }); + // #415: notify on outcome (used for failure notifications) + if (onItemOutcome) onItemOutcome(task.id, item.subjectKey, outcome, errorSummary); + // AC-D2: Record emission after successful thread-scoped delivery for self-echo suppression if (outcome === 'RUN_DELIVERED' && emissionStore && item.subjectKey.startsWith('thread-')) { const threadId = item.subjectKey.slice(7); diff --git a/packages/api/src/infrastructure/scheduler/schedule-notify.ts b/packages/api/src/infrastructure/scheduler/schedule-notify.ts new file mode 100644 index 000000000..be337fccb --- /dev/null +++ b/packages/api/src/infrastructure/scheduler/schedule-notify.ts @@ -0,0 +1,77 @@ +/** + * #415 Phase 2: Task lifecycle notifications + * + * Fire-and-forget notifications to delivery threads for lifecycle events: + * registered, paused, resumed, deleted, failed, missed-window. + */ + +import { getNextCronMs } from './cron-utils.js'; +import type { DynamicTaskDef } from './DynamicTaskStore.js'; +import type { DeliverOpts, TriggerSpec } from './types.js'; + +type DeliverFn = (opts: DeliverOpts) => Promise; + +/** Compute epoch ms of next fire time for a trigger */ +export function computeNextFireTime(trigger: TriggerSpec): number | null { + if (trigger.type === 'once') return trigger.fireAt; + if (trigger.type === 'cron') return Date.now() + getNextCronMs(trigger.expression, trigger.timezone); + if (trigger.type === 'interval') return Date.now() + trigger.ms; + return null; +} + +function formatTime(epoch: number): string { + return new Date(epoch).toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai', hour12: false }); +} + +function resolveUserId(def: DynamicTaskDef): string { + return ((def.params as Record).triggerUserId as string) ?? 'system'; +} + +function label(def: DynamicTaskDef): string { + return def.display?.label ?? def.templateId; +} + +function fire(deliver: DeliverFn | undefined, def: DynamicTaskDef, content: string): void { + if (!deliver || !def.deliveryThreadId) return; + deliver({ threadId: def.deliveryThreadId, content, catId: 'system', userId: resolveUserId(def) }).catch(() => {}); +} + +export function notifyTaskRegistered(deliver: DeliverFn | undefined, def: DynamicTaskDef): void { + const nextFire = computeNextFireTime(def.trigger); + const timeStr = nextFire ? formatTime(nextFire) : '未知'; + const once = def.trigger.type === 'once' ? '(一次性,执行后自动退役)' : ''; + fire(deliver, def, `✅ 定时任务「${label(def)}」已创建,下次执行时间:${timeStr}${once}`); +} + +export function notifyTaskPaused(deliver: DeliverFn | undefined, def: DynamicTaskDef): void { + fire(deliver, def, `⏸️ 定时任务「${label(def)}」已暂停`); +} + +export function notifyTaskResumed(deliver: DeliverFn | undefined, def: DynamicTaskDef): void { + const nextFire = computeNextFireTime(def.trigger); + const timeStr = nextFire ? formatTime(nextFire) : '未知'; + fire(deliver, def, `▶️ 定时任务「${label(def)}」已恢复,下次执行时间:${timeStr}`); +} + +export function notifyTaskDeleted(deliver: DeliverFn | undefined, def: DynamicTaskDef): void { + fire(deliver, def, `🗑️ 定时任务「${label(def)}」已删除`); +} + +export function notifyTaskSucceeded(deliver: DeliverFn | undefined, def: DynamicTaskDef): void { + if (def.trigger.type === 'once') { + fire(deliver, def, `✅ 定时任务「${label(def)}」已执行完成,任务已自动结束`); + } else { + const nextFire = computeNextFireTime(def.trigger); + const timeStr = nextFire ? formatTime(nextFire) : '未知'; + fire(deliver, def, `✅ 定时任务「${label(def)}」本次执行完成,下次执行时间:${timeStr}`); + } +} + +export function notifyTaskFailed( + deliver: DeliverFn | undefined, + def: DynamicTaskDef, + errorSummary: string | null, +): void { + const reason = errorSummary ? `:${errorSummary.slice(0, 200)}` : ''; + fire(deliver, def, `❌ 定时任务「${label(def)}」执行失败${reason}`); +} diff --git a/packages/api/src/infrastructure/scheduler/types.ts b/packages/api/src/infrastructure/scheduler/types.ts index c897f28bc..cd3b39acc 100644 --- a/packages/api/src/infrastructure/scheduler/types.ts +++ b/packages/api/src/infrastructure/scheduler/types.ts @@ -22,8 +22,11 @@ export interface GateCtx { /** Task profile presets (ADR-022 KD-1) */ export type TaskProfile = 'awareness' | 'poller'; -/** Phase 2: Trigger spec — interval or cron */ -export type TriggerSpec = { type: 'interval'; ms: number } | { type: 'cron'; expression: string; timezone?: string }; +/** Phase 2: Trigger spec — interval, cron, or once (#415) */ +export type TriggerSpec = + | { type: 'interval'; ms: number } + | { type: 'cron'; expression: string; timezone?: string } + | { type: 'once'; fireAt: number }; /** Phase 2: Context dimension — session × materialization */ export interface ContextSpec { @@ -39,6 +42,7 @@ export type RunOutcome = | 'SKIP_GLOBAL_PAUSE' | 'SKIP_TASK_OVERRIDE' | 'SKIP_SELF_ECHO' + | 'SKIP_MISSED_WINDOW' | 'RUN_DELIVERED' | 'RUN_FAILED'; diff --git a/packages/api/src/routes/schedule.ts b/packages/api/src/routes/schedule.ts index 07dd5c3c8..19b29ee4d 100644 --- a/packages/api/src/routes/schedule.ts +++ b/packages/api/src/routes/schedule.ts @@ -19,11 +19,35 @@ import type { ITaskStore } from '../domains/cats/services/stores/ports/TaskStore import type { DynamicTaskStore } from '../infrastructure/scheduler/DynamicTaskStore.js'; import type { GlobalControlStore } from '../infrastructure/scheduler/GlobalControlStore.js'; import type { PackTemplateStore } from '../infrastructure/scheduler/PackTemplateStore.js'; +import { + notifyTaskDeleted, + notifyTaskPaused, + notifyTaskRegistered, + notifyTaskResumed, +} from '../infrastructure/scheduler/schedule-notify.js'; import type { TaskRunnerV2 } from '../infrastructure/scheduler/TaskRunnerV2.js'; -import type { TriggerSpec } from '../infrastructure/scheduler/types.js'; +import type { DeliverOpts, TriggerSpec } from '../infrastructure/scheduler/types.js'; import { resolveHeaderUserId } from '../utils/request-identity.js'; import { governanceRoutes } from './schedule-governance.js'; +/** #415: Normalize once-trigger input — accepts delayMs (relative) or fireAt (absolute) */ +function normalizeOnceTrigger(trigger: Record): TriggerSpec | { error: string } { + if (trigger.type !== 'once') return trigger as TriggerSpec; + const delayMs = typeof trigger.delayMs === 'number' ? trigger.delayMs : undefined; + const fireAt = typeof trigger.fireAt === 'number' ? trigger.fireAt : undefined; + if (delayMs != null) { + if (!Number.isFinite(delayMs) || delayMs < 0) return { error: 'once trigger delayMs must be a finite number >= 0' }; + return { type: 'once', fireAt: Date.now() + delayMs }; + } + if (fireAt != null) { + if (!Number.isFinite(fireAt) || fireAt < 0) { + return { error: 'once trigger fireAt must be a finite positive epoch ms' }; + } + return { type: 'once', fireAt }; + } + return { error: 'once trigger requires either delayMs or fireAt' }; +} + export interface ScheduleRoutesOptions { taskRunner: TaskRunnerV2; dynamicTaskStore?: DynamicTaskStore; @@ -39,6 +63,8 @@ export interface ScheduleRoutesOptions { packTemplateStore?: PackTemplateStore; /** #320: Unified task store for thread→subjectKey resolution */ taskStore?: ITaskStore; + /** #415: deliver function for lifecycle notifications */ + deliver?: (opts: DeliverOpts) => Promise; } /** Extract threadId from subjectKey — handles both thread-xxx (real tasks) and thread:xxx formats */ @@ -55,7 +81,8 @@ function addSubjectKeyWithAliases(target: Set, subjectKey: string): void } export const scheduleRoutes: FastifyPluginAsync = async (app, opts) => { - const { taskRunner, dynamicTaskStore, templateRegistry, globalControlStore, packTemplateStore, taskStore } = opts; + const { taskRunner, dynamicTaskStore, templateRegistry, globalControlStore, packTemplateStore, taskStore, deliver } = + opts; // GET /api/schedule/tasks // #320: Optional ?threadId= filter — resolves thread's task subjectKeys for cross-match @@ -205,7 +232,18 @@ export const scheduleRoutes: FastifyPluginAsync = async ( return { error: `Unknown template: ${body.templateId}` }; } - const trigger = body.trigger ?? template.defaultTrigger; + // #415: normalize once trigger (delayMs → fireAt) + let trigger: TriggerSpec; + if (body.trigger && (body.trigger as Record).type === 'once') { + const result = normalizeOnceTrigger(body.trigger as Record); + if ('error' in result) { + reply.status(400); + return { error: result.error }; + } + trigger = result; + } else { + trigger = body.trigger ?? template.defaultTrigger; + } const params = body.params ?? {}; const display = body.display ? { @@ -255,7 +293,18 @@ export const scheduleRoutes: FastifyPluginAsync = async ( return { error: `Unknown template: ${body.templateId}` }; } - const trigger = body.trigger ?? template.defaultTrigger; + // #415: normalize once trigger (delayMs → fireAt) + let trigger: TriggerSpec; + if (body.trigger && (body.trigger as Record).type === 'once') { + const result = normalizeOnceTrigger(body.trigger as Record); + if ('error' in result) { + reply.status(400); + return { error: result.error }; + } + trigger = result; + } else { + trigger = body.trigger ?? template.defaultTrigger; + } const params = body.params ?? {}; if (typeof params !== 'object' || params === null || Array.isArray(params)) { @@ -295,6 +344,9 @@ export const scheduleRoutes: FastifyPluginAsync = async ( spec.display = display; taskRunner.registerDynamic(spec, id); + // #415: lifecycle notification — task registered + notifyTaskRegistered(deliver, def); + return { success: true, task: { id, ...display, trigger } }; }); @@ -306,6 +358,8 @@ export const scheduleRoutes: FastifyPluginAsync = async ( } const { id } = request.params as { id: string }; + // Read def before deletion for notification + const defForNotify = dynamicTaskStore.getById(id); const removed = dynamicTaskStore.remove(id); if (!removed) { reply.status(404); @@ -313,6 +367,10 @@ export const scheduleRoutes: FastifyPluginAsync = async ( } taskRunner.unregister(id); + + // #415: lifecycle notification — task deleted + if (defForNotify) notifyTaskDeleted(deliver, defForNotify); + return { success: true }; }); @@ -337,12 +395,13 @@ export const scheduleRoutes: FastifyPluginAsync = async ( return { error: 'Dynamic task not found' }; } + const def = dynamicTaskStore.getById(id); if (!body.enabled) { // Pause: unregister from runtime taskRunner.unregister(id); + if (def) notifyTaskPaused(deliver, def); } else { // Resume: re-register in runtime - const def = dynamicTaskStore.getById(id); if (def) { const template = templateRegistry.get(def.templateId); if (template) { @@ -357,6 +416,11 @@ export const scheduleRoutes: FastifyPluginAsync = async ( } catch { // Already registered — ignore } + notifyTaskResumed(deliver, def); + } else { + dynamicTaskStore.setEnabled(id, false); // roll back — resume failed + reply.status(500); + return { error: `Template ${def.templateId} not found — task cannot resume` }; } } } diff --git a/packages/api/test/scheduler/dynamic-task-store.test.js b/packages/api/test/scheduler/dynamic-task-store.test.js index d545729b1..2230bbc7f 100644 --- a/packages/api/test/scheduler/dynamic-task-store.test.js +++ b/packages/api/test/scheduler/dynamic-task-store.test.js @@ -110,4 +110,17 @@ describe('DynamicTaskStore', () => { store.insert(SAMPLE_DEF); assert.throws(() => store.insert(SAMPLE_DEF), /UNIQUE|constraint/i); }); + + test('#415: once trigger round-trips correctly', () => { + const fireAt = Date.now() + 120_000; + const onceDef = { + ...SAMPLE_DEF, + id: 'dyn-once-rt', + trigger: { type: 'once', fireAt }, + }; + store.insert(onceDef); + const loaded = store.getById('dyn-once-rt'); + assert.equal(loaded.trigger.type, 'once'); + assert.equal(loaded.trigger.fireAt, fireAt); + }); }); diff --git a/packages/api/test/scheduler/schedule-notify.test.js b/packages/api/test/scheduler/schedule-notify.test.js new file mode 100644 index 000000000..a5cbdf5a8 --- /dev/null +++ b/packages/api/test/scheduler/schedule-notify.test.js @@ -0,0 +1,309 @@ +import assert from 'node:assert/strict'; +import { describe, it } from 'node:test'; + +describe('schedule-notify: computeNextFireTime', () => { + it('once → returns fireAt directly', async () => { + const { computeNextFireTime } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const fireAt = Date.now() + 120_000; + assert.equal(computeNextFireTime({ type: 'once', fireAt }), fireAt); + }); + + it('interval → returns now + ms (within tolerance)', async () => { + const { computeNextFireTime } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const before = Date.now(); + const result = computeNextFireTime({ type: 'interval', ms: 60_000 }); + assert.ok(result >= before + 60_000); + assert.ok(result <= before + 61_000); // 1s tolerance + }); + + it('cron → returns a future epoch ms', async () => { + const { computeNextFireTime } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const result = computeNextFireTime({ type: 'cron', expression: '0 9 * * *' }); + assert.ok(result > Date.now(), 'next cron fire should be in the future'); + }); +}); + +describe('schedule-notify: notification functions', () => { + const makeDef = (overrides = {}) => ({ + id: 'dyn-test-1', + templateId: 'reminder', + trigger: { type: 'once', fireAt: Date.now() + 60_000 }, + params: { message: 'test', triggerUserId: 'user-42' }, + display: { label: '测试提醒', category: 'system' }, + deliveryThreadId: 'thread-xyz', + enabled: true, + createdBy: 'opus', + createdAt: new Date().toISOString(), + ...overrides, + }); + + it('notifyTaskRegistered sends to deliveryThreadId with label and time', async () => { + const { notifyTaskRegistered } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const calls = []; + const mockDeliver = async (opts) => { + calls.push(opts); + return 'msg-1'; + }; + notifyTaskRegistered(mockDeliver, makeDef()); + await new Promise((r) => setTimeout(r, 20)); + assert.equal(calls.length, 1); + assert.equal(calls[0].threadId, 'thread-xyz'); + assert.equal(calls[0].catId, 'system'); + assert.equal(calls[0].userId, 'user-42'); + assert.ok(calls[0].content.includes('测试提醒')); + assert.ok(calls[0].content.includes('已创建')); + assert.ok(calls[0].content.includes('一次性'), 'should mention once for once-trigger'); + }); + + it('notifyTaskPaused sends pause message', async () => { + const { notifyTaskPaused } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const calls = []; + const mockDeliver = async (opts) => { + calls.push(opts); + return 'msg-1'; + }; + notifyTaskPaused(mockDeliver, makeDef()); + await new Promise((r) => setTimeout(r, 20)); + assert.equal(calls.length, 1); + assert.ok(calls[0].content.includes('已暂停')); + }); + + it('notifyTaskResumed sends resume message with next time', async () => { + const { notifyTaskResumed } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const calls = []; + const mockDeliver = async (opts) => { + calls.push(opts); + return 'msg-1'; + }; + notifyTaskResumed(mockDeliver, makeDef()); + await new Promise((r) => setTimeout(r, 20)); + assert.equal(calls.length, 1); + assert.ok(calls[0].content.includes('已恢复')); + assert.ok(calls[0].content.includes('下次执行时间')); + }); + + it('notifyTaskDeleted sends delete message', async () => { + const { notifyTaskDeleted } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const calls = []; + const mockDeliver = async (opts) => { + calls.push(opts); + return 'msg-1'; + }; + notifyTaskDeleted(mockDeliver, makeDef()); + await new Promise((r) => setTimeout(r, 20)); + assert.equal(calls.length, 1); + assert.ok(calls[0].content.includes('已删除')); + }); + + it('notifyTaskFailed sends failure message with error', async () => { + const { notifyTaskFailed } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const calls = []; + const mockDeliver = async (opts) => { + calls.push(opts); + return 'msg-1'; + }; + notifyTaskFailed(mockDeliver, makeDef(), 'connection timeout'); + await new Promise((r) => setTimeout(r, 20)); + assert.equal(calls.length, 1); + assert.ok(calls[0].content.includes('执行失败')); + assert.ok(calls[0].content.includes('connection timeout')); + }); + + it('no-op when deliver is undefined', async () => { + const { notifyTaskRegistered } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + // Should not throw + notifyTaskRegistered(undefined, makeDef()); + }); + + it('no-op when deliveryThreadId is null', async () => { + const { notifyTaskRegistered } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + let called = false; + const mockDeliver = async () => { + called = true; + return 'msg-1'; + }; + notifyTaskRegistered(mockDeliver, makeDef({ deliveryThreadId: null })); + await new Promise((r) => setTimeout(r, 20)); + assert.ok(!called, 'should not deliver when deliveryThreadId is null'); + }); +}); + +describe('TaskRunnerV2 — execution failure notification (#415)', () => { + it('RUN_FAILED triggers notifyTaskFailed via onItemOutcome', async () => { + const Database = (await import('better-sqlite3')).default; + const db = new Database(':memory:'); + const { applyMigrations } = await import('../../dist/domains/memory/schema.js'); + const { RunLedger } = await import('../../dist/infrastructure/scheduler/RunLedger.js'); + const { DynamicTaskStore } = await import('../../dist/infrastructure/scheduler/DynamicTaskStore.js'); + const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); + applyMigrations(db); + const ledger = new RunLedger(db); + const dynamicTaskStore = new DynamicTaskStore(db); + const deliverCalls = []; + const mockDeliver = async (opts) => { + deliverCalls.push(opts); + return 'msg-1'; + }; + const noop = () => {}; + const runner = new TaskRunnerV2({ + logger: { info: noop, error: noop }, + ledger, + dynamicTaskStore, + deliver: mockDeliver, + }); + + // Seed dynamic store + dynamicTaskStore.insert({ + id: 'dyn-fail-1', + templateId: 'reminder', + trigger: { type: 'interval', ms: 999999 }, + params: { message: 'test', triggerUserId: 'user-42' }, + display: { label: '失败任务', category: 'system' }, + deliveryThreadId: 'thread-fail', + enabled: true, + createdBy: 'opus', + createdAt: new Date().toISOString(), + }); + + runner.registerDynamic( + { + id: 'dyn-fail-1', + profile: 'awareness', + trigger: { type: 'interval', ms: 999999 }, + admission: { + gate: async () => ({ run: true, workItems: [{ signal: 'go', subjectKey: 'k' }] }), + }, + run: { + overlap: 'skip', + timeoutMs: 5000, + execute: async () => { + throw new Error('kaboom'); + }, + }, + state: { runLedger: 'sqlite' }, + outcome: { whenNoSignal: 'drop' }, + enabled: () => true, + }, + 'dyn-fail-1', + ); + + await runner.triggerNow('dyn-fail-1'); + // Allow fire-and-forget to settle + await new Promise((r) => setTimeout(r, 50)); + + assert.ok(deliverCalls.length >= 1, 'should have sent failure notification'); + const failMsg = deliverCalls.find((c) => c.content.includes('执行失败')); + assert.ok(failMsg, 'should contain failure notification'); + assert.equal(failMsg.threadId, 'thread-fail'); + assert.ok(failMsg.content.includes('kaboom')); + runner.stop(); + }); + + it('RUN_DELIVERED triggers notifyTaskSucceeded via onItemOutcome', async () => { + const Database = (await import('better-sqlite3')).default; + const db = new Database(':memory:'); + const { applyMigrations } = await import('../../dist/domains/memory/schema.js'); + const { RunLedger } = await import('../../dist/infrastructure/scheduler/RunLedger.js'); + const { DynamicTaskStore } = await import('../../dist/infrastructure/scheduler/DynamicTaskStore.js'); + const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); + applyMigrations(db); + const ledger = new RunLedger(db); + const dynamicTaskStore = new DynamicTaskStore(db); + const deliverCalls = []; + const mockDeliver = async (opts) => { + deliverCalls.push(opts); + return 'msg-1'; + }; + const noop = () => {}; + const runner = new TaskRunnerV2({ + logger: { info: noop, error: noop }, + ledger, + dynamicTaskStore, + deliver: mockDeliver, + }); + + dynamicTaskStore.insert({ + id: 'dyn-ok-1', + templateId: 'reminder', + trigger: { type: 'interval', ms: 999999 }, + params: { message: 'test', triggerUserId: 'user-42' }, + display: { label: '成功任务', category: 'system' }, + deliveryThreadId: 'thread-ok', + enabled: true, + createdBy: 'opus', + createdAt: new Date().toISOString(), + }); + + runner.registerDynamic( + { + id: 'dyn-ok-1', + profile: 'awareness', + trigger: { type: 'interval', ms: 999999 }, + admission: { + gate: async () => ({ run: true, workItems: [{ signal: 'go', subjectKey: 'k' }] }), + }, + run: { + overlap: 'skip', + timeoutMs: 5000, + execute: async () => ({ delivered: true }), + }, + state: { runLedger: 'sqlite' }, + outcome: { whenNoSignal: 'drop' }, + enabled: () => true, + }, + 'dyn-ok-1', + ); + + await runner.triggerNow('dyn-ok-1'); + await new Promise((r) => setTimeout(r, 50)); + + const successMsg = deliverCalls.find((c) => c.content.includes('执行完成')); + assert.ok(successMsg, 'should contain success notification'); + assert.equal(successMsg.threadId, 'thread-ok'); + assert.ok(successMsg.content.includes('下次执行时间'), 'recurring task should include next fire time'); + runner.stop(); + }); +}); + +describe('schedule-notify: notifyTaskSucceeded', () => { + const makeDef2 = (overrides = {}) => ({ + id: 'dyn-test-1', + templateId: 'reminder', + trigger: { type: 'once', fireAt: Date.now() + 60_000 }, + params: { message: 'test', triggerUserId: 'user-42' }, + display: { label: '测试提醒', category: 'system' }, + deliveryThreadId: 'thread-xyz', + enabled: true, + createdBy: 'opus', + createdAt: new Date().toISOString(), + ...overrides, + }); + + it('recurring task includes next fire time', async () => { + const { notifyTaskSucceeded } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const calls = []; + const mockDeliver = async (opts) => { + calls.push(opts); + return 'msg-1'; + }; + notifyTaskSucceeded(mockDeliver, makeDef2({ trigger: { type: 'interval', ms: 60000 } })); + await new Promise((r) => setTimeout(r, 20)); + assert.equal(calls.length, 1); + assert.ok(calls[0].content.includes('本次执行完成')); + assert.ok(calls[0].content.includes('下次执行时间')); + }); + + it('once task says task has ended', async () => { + const { notifyTaskSucceeded } = await import('../../dist/infrastructure/scheduler/schedule-notify.js'); + const calls = []; + const mockDeliver = async (opts) => { + calls.push(opts); + return 'msg-1'; + }; + notifyTaskSucceeded(mockDeliver, makeDef2({ trigger: { type: 'once', fireAt: Date.now() } })); + await new Promise((r) => setTimeout(r, 20)); + assert.equal(calls.length, 1); + assert.ok(calls[0].content.includes('已执行完成')); + assert.ok(calls[0].content.includes('自动结束')); + }); +}); diff --git a/packages/api/test/scheduler/task-runner-v2.test.js b/packages/api/test/scheduler/task-runner-v2.test.js index 74034c2d4..7d237acc3 100644 --- a/packages/api/test/scheduler/task-runner-v2.test.js +++ b/packages/api/test/scheduler/task-runner-v2.test.js @@ -859,3 +859,270 @@ describe('TaskRunnerV2 — governance controls (AC-D1)', () => { runner.stop(); }); }); + +// ─── #415: once trigger ───────────────────────────────────── + +describe('TaskRunnerV2 — once trigger (#415)', () => { + let db, ledger, dynamicTaskStore; + const noop = () => {}; + const silentLogger = { info: noop, error: noop }; + + beforeEach(async () => { + db = new Database(':memory:'); + const { applyMigrations } = await import('../../dist/domains/memory/schema.js'); + const { RunLedger } = await import('../../dist/infrastructure/scheduler/RunLedger.js'); + const { DynamicTaskStore } = await import('../../dist/infrastructure/scheduler/DynamicTaskStore.js'); + applyMigrations(db); + ledger = new RunLedger(db); + dynamicTaskStore = new DynamicTaskStore(db); + }); + + const makeOnceTask = (id, fireAt, overrides = {}) => ({ + id, + profile: 'awareness', + trigger: { type: 'once', fireAt }, + admission: { + gate: async () => ({ run: true, workItems: [{ signal: 'go', subjectKey: 'once-k' }] }), + }, + run: { overlap: 'skip', timeoutMs: 5000, execute: async () => {} }, + state: { runLedger: 'sqlite' }, + outcome: { whenNoSignal: 'drop' }, + enabled: () => true, + ...overrides, + }); + + it('once trigger fires after delay and records RUN_DELIVERED', async () => { + const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); + const runner = new TaskRunnerV2({ logger: silentLogger, ledger, dynamicTaskStore }); + let executed = false; + + runner.registerDynamic( + makeOnceTask('once-fire', Date.now() + 80, { + run: { + overlap: 'skip', + timeoutMs: 5000, + execute: async () => { + executed = true; + }, + }, + }), + 'dyn-once-1', + ); + runner.start(); + await new Promise((r) => setTimeout(r, 200)); + + assert.ok(executed, 'once task should have fired'); + const rows = ledger.query('once-fire', 10); + assert.equal(rows.length, 1); + assert.equal(rows[0].outcome, 'RUN_DELIVERED'); + runner.stop(); + }); + + it('once trigger auto-retires: unregisters from runner + removes from store', async () => { + const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); + const runner = new TaskRunnerV2({ logger: silentLogger, ledger, dynamicTaskStore }); + + // Seed the dynamic store so retire can clean it up + dynamicTaskStore.insert({ + id: 'dyn-retire-1', + templateId: 'reminder', + trigger: { type: 'once', fireAt: Date.now() + 50 }, + params: { message: 'test' }, + display: { label: 'test', category: 'system' }, + deliveryThreadId: null, + enabled: true, + createdBy: 'test', + createdAt: new Date().toISOString(), + }); + + runner.registerDynamic(makeOnceTask('dyn-retire-1', Date.now() + 50), 'dyn-retire-1'); + runner.start(); + await new Promise((r) => setTimeout(r, 250)); + + // Should be unregistered from runner + assert.ok( + !runner.getRegisteredTasks().includes('dyn-retire-1'), + 'task should be unregistered after once execution', + ); + // Should be removed from store + assert.equal( + dynamicTaskStore.getById('dyn-retire-1'), + null, + 'task should be removed from DynamicTaskStore after once execution', + ); + runner.stop(); + }); + + it('live-registered once trigger with past fireAt fires immediately (processing delay)', async () => { + const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); + const runner = new TaskRunnerV2({ logger: silentLogger, ledger, dynamicTaskStore }); + let executed = false; + + // Live registration (not hydration) — should fire even if slightly past + runner.registerDynamic( + makeOnceTask('once-past', Date.now() - 5000, { + run: { + overlap: 'skip', + timeoutMs: 5000, + execute: async () => { + executed = true; + }, + }, + }), + 'dyn-past-1', + ); + runner.start(); + await new Promise((r) => setTimeout(r, 100)); + + assert.ok(executed, 'live-registered once task with past fireAt should fire immediately'); + runner.stop(); + }); + + it('hydrated once trigger with past fireAt is cancelled (missed window, not executed)', async () => { + const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); + const runner = new TaskRunnerV2({ logger: silentLogger, ledger, dynamicTaskStore }); + let executed = false; + + // Seed the store with a past-due once task (simulates restart scenario) + const pastFireAt = Date.now() - 60_000; + dynamicTaskStore.insert({ + id: 'dyn-missed-1', + templateId: 'reminder', + trigger: { type: 'once', fireAt: pastFireAt }, + params: { message: 'should not fire' }, + display: { label: '错过的提醒', category: 'system' }, + deliveryThreadId: null, + enabled: true, + createdBy: 'test', + createdAt: new Date(pastFireAt - 60_000).toISOString(), + }); + + // Provide a template that tracks execution + const templateGetter = { + get: (id) => { + if (id !== 'reminder') return null; + return { + templateId: 'reminder', + label: 'Reminder', + category: 'system', + description: 'test', + subjectKind: 'none', + defaultTrigger: { type: 'cron', expression: '0 9 * * *' }, + paramSchema: {}, + createSpec: (instanceId, params) => + makeOnceTask(instanceId, params.trigger.fireAt, { + run: { + overlap: 'skip', + timeoutMs: 5000, + execute: async () => { + executed = true; + }, + }, + }), + }; + }, + }; + + const loaded = runner.hydrateDynamic(dynamicTaskStore, templateGetter); + + // Should NOT have been loaded + assert.equal(loaded, 0, 'past-due once task should not be hydrated'); + + // Should be removed from store + assert.equal(dynamicTaskStore.getById('dyn-missed-1'), null, 'past-due once task should be removed from store'); + + // Should NOT be registered in runner + assert.ok(!runner.getRegisteredTasks().includes('dyn-missed-1'), 'past-due once task should not be in runner'); + + // Should have recorded SKIP_MISSED_WINDOW in ledger + const rows = ledger.query('dyn-missed-1', 10); + assert.equal(rows.length, 1); + assert.equal(rows[0].outcome, 'SKIP_MISSED_WINDOW'); + + // Execute should never have been called + assert.ok(!executed, 'past-due once task should NOT execute'); + runner.stop(); + }); + + it('hydrated once trigger with past fireAt sends missed-window notification', async () => { + const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); + const deliverCalls = []; + const mockDeliver = async (opts) => { + deliverCalls.push(opts); + return 'msg-id'; + }; + const runner = new TaskRunnerV2({ + logger: silentLogger, + ledger, + dynamicTaskStore, + deliver: mockDeliver, + }); + + const pastFireAt = Date.now() - 120_000; + dynamicTaskStore.insert({ + id: 'dyn-notify-1', + templateId: 'reminder', + trigger: { type: 'once', fireAt: pastFireAt }, + params: { message: 'weather check', triggerUserId: 'user-42' }, + display: { label: '天气查询', category: 'system' }, + deliveryThreadId: 'thread-abc', + enabled: true, + createdBy: 'opus', + createdAt: new Date(pastFireAt - 60_000).toISOString(), + }); + + const templateGetter = { get: () => null }; + runner.hydrateDynamic(dynamicTaskStore, templateGetter); + + // Allow fire-and-forget deliver to complete + await new Promise((r) => setTimeout(r, 50)); + + assert.equal(deliverCalls.length, 1, 'should have sent missed-window notification'); + assert.equal(deliverCalls[0].threadId, 'thread-abc'); + assert.equal(deliverCalls[0].catId, 'opus'); + assert.equal(deliverCalls[0].userId, 'user-42'); + assert.ok(deliverCalls[0].content.includes('天气查询'), 'notification should include task label'); + assert.ok(deliverCalls[0].content.includes('错过'), 'notification should mention missed window'); + runner.stop(); + }); + + it('once trigger does NOT fire before fireAt', async () => { + const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); + const runner = new TaskRunnerV2({ logger: silentLogger, ledger, dynamicTaskStore }); + let executed = false; + + runner.registerDynamic( + makeOnceTask('once-future', Date.now() + 10_000, { + run: { + overlap: 'skip', + timeoutMs: 5000, + execute: async () => { + executed = true; + }, + }, + }), + 'dyn-future-1', + ); + runner.start(); + await new Promise((r) => setTimeout(r, 100)); + + assert.ok(!executed, 'once task should NOT fire before fireAt'); + runner.stop(); + }); + + it('getTaskSummaries includes once trigger info', async () => { + const { TaskRunnerV2 } = await import('../../dist/infrastructure/scheduler/TaskRunnerV2.js'); + const runner = new TaskRunnerV2({ logger: silentLogger, ledger, dynamicTaskStore }); + const fireAt = Date.now() + 60_000; + + runner.registerDynamic(makeOnceTask('once-summary', fireAt), 'dyn-sum-1'); + + const summaries = runner.getTaskSummaries(); + const s = summaries.find((t) => t.id === 'once-summary'); + assert.ok(s, 'should find once task in summaries'); + assert.equal(s.trigger.type, 'once'); + assert.equal(s.trigger.fireAt, fireAt); + assert.equal(s.source, 'dynamic'); + runner.stop(); + }); +}); diff --git a/packages/mcp-server/src/tools/schedule-tools.ts b/packages/mcp-server/src/tools/schedule-tools.ts index 9992b7d04..84f17a630 100644 --- a/packages/mcp-server/src/tools/schedule-tools.ts +++ b/packages/mcp-server/src/tools/schedule-tools.ts @@ -57,7 +57,7 @@ export const registerScheduledTaskInputSchema = { trigger: z .string() .describe( - 'Trigger config as JSON string. Examples: {"type":"cron","expression":"0 9 * * *"} or {"type":"interval","ms":3600000}', + 'Trigger config as JSON string. Examples: {"type":"cron","expression":"0 9 * * *"} or {"type":"interval","ms":3600000} or {"type":"once","delayMs":120000} (fire once after 2min) or {"type":"once","fireAt":1712345678000} (fire once at epoch ms)', ), params: z .string() @@ -203,6 +203,7 @@ export const scheduleTools = [ name: 'cat_cafe_register_scheduled_task', description: 'Create a new scheduled task from a template (confirm step). The task will be persisted and run automatically on schedule. ' + + 'Supports recurring (cron/interval) and one-shot (once) triggers. Once tasks auto-retire after execution. ' + 'When the task fires, a cat is woken with full capabilities — it can send rich blocks (images, audio, cards), search the web, generate content, etc. ' + 'IMPORTANT: You MUST call preview_scheduled_task first and get user confirmation before calling this. ' + 'trigger and params must be JSON strings, not objects.', diff --git a/packages/web/src/components/workspace/schedule-helpers.ts b/packages/web/src/components/workspace/schedule-helpers.ts index 439717dd8..c8401c5b2 100644 --- a/packages/web/src/components/workspace/schedule-helpers.ts +++ b/packages/web/src/components/workspace/schedule-helpers.ts @@ -19,9 +19,11 @@ export interface RunStats { } export interface TriggerSpec { - type: 'interval' | 'cron'; + type: 'interval' | 'cron' | 'once'; ms?: number; expression?: string; + /** #415: epoch ms — when the once trigger will fire */ + fireAt?: number; } export interface TaskDisplayMeta { @@ -93,6 +95,16 @@ export function fallbackCategory(taskId: string): DisplayCategory { export function formatTrigger(trigger: TriggerSpec): string { if (trigger.type === 'cron') return `cron: ${trigger.expression}`; + if (trigger.type === 'once') { + if (!trigger.fireAt) return 'once'; + const d = new Date(trigger.fireAt); + const now = Date.now(); + if (trigger.fireAt <= now) return 'once (fired)'; + const diff = trigger.fireAt - now; + if (diff < 60_000) return `once in ${Math.ceil(diff / 1000)}s`; + if (diff < 3_600_000) return `once in ${Math.ceil(diff / 60_000)}m`; + return `once @ ${d.toLocaleTimeString([], { hour: '2-digit', minute: '2-digit' })}`; + } const ms = trigger.ms ?? 0; if (ms >= 3600000) return `${Math.round(ms / 3600000)}h`; if (ms >= 60000) return `${Math.round(ms / 60000)}m`;