Skip to content

refactor(cue): decompose cue-engine.ts into 5 focused modules + minor fixes hardening the overall Cue functionality#582

Merged
reachraza merged 7 commits intorcfrom
cue-polish
Mar 18, 2026
Merged

refactor(cue): decompose cue-engine.ts into 5 focused modules + minor fixes hardening the overall Cue functionality#582
reachraza merged 7 commits intorcfrom
cue-polish

Conversation

@reachraza
Copy link
Collaborator

@reachraza reachraza commented Mar 18, 2026

Summary

cue-engine.ts had grown to 1,574 lines handling 10 distinct responsibilities. This PR decomposes it into focused single-responsibility modules and cleans up dead code.

Dead Code Cleanup

  • Delete CueGraphView.tsx (1,166 lines): fully superseded by CuePipelineEditor, confirmed dead
  • Deduplicate CueGraphSession: canonical definition moved to cue-pipeline-types.ts; local copies removed from CueModal.tsx and CuePipelineEditor.tsx
  • Shared test helpers: cue-test-helpers.ts with createMockSession / createMockConfig / createMockDeps factories, eliminating copy-paste across 6 test files

cue-engine.ts decomposition / refactor (1,574 → 719 lines, −55%)

New module Responsibility Lines
cue-activity-log.ts Ring buffer (push / getAll / clear), ACTIVITY_LOG_MAX = 500 40
cue-heartbeat.ts Heartbeat writer, sleep/wake detection, reconciliation trigger 109
cue-subscription-setup.ts 5 setup functions: heartbeat, scheduled, file, GitHub, task 384
cue-fan-in-tracker.ts Fan-in source tracking and completion dispatch 220
cue-run-manager.ts Concurrency gating, queue management, run lifecycle 418

Key design decisions

  • onRunCompleted callback resolves the circular dependency between run-manager and engine — chain depth flows one direction only
  • onRunStopped callback ensures manually stopped runs appear in the activity log (was a silent gap)
  • scheduledFiredKeys Set is owned by the engine and passed by reference to subscription setup, keeping session-scoped double-fire prevention in one place
  • fanInTracker.reset() added to stop() to prevent timer leaks across engine restarts
  • calculateNextScheduledTime re-exported from cue-engine.ts for zero import breakage in existing tests

Test plan

  • 475 cue tests passing, 0 regressions
  • npm run lint and npm run lint:eslint clean
  • Added stopRun → activityLog test covering the previously silent gap

Summary by CodeRabbit

  • New Features

    • Activity log for recent Cue run results.
    • New run manager, fan‑in tracker, heartbeat, and centralized subscription setup.
  • Improvements

    • Stronger concurrency, queuing and lifecycle handling (including two‑phase outputs).
    • Improved sleep/wake detection and missed‑event reconciliation.
    • More reliable multi‑source completion aggregation.
  • Tests

    • Added and refactored unit tests; shared test helpers introduced.
  • Removals

    • Graph visualization component removed.
  • Chores

    • CI pull‑request trigger updated.

…0+1)

- Delete CueGraphView.tsx (1,166 lines dead code, superseded by CuePipelineEditor)
- Add cue-test-helpers.ts: shared createMockSession/Config/Deps factory across 6 test files
- Deduplicate CueGraphSession: add to cue-pipeline-types.ts, remove local copies from CueModal and CuePipelineEditor

- cue-engine.ts decomposition (1,574 → 719 lines, −55%):
- cue-activity-log.ts: ring buffer (push/getAll/clear), ACTIVITY_LOG_MAX=500
- cue-heartbeat.ts: heartbeat writer + sleep/wake detection + reconciliation
- cue-subscription-setup.ts: 5 setup functions (heartbeat, scheduled, file, GitHub, task)
- cue-fan-in-tracker.ts: fan-in source tracking + completion dispatch
- cue-run-manager.ts: concurrency gating, queue management, run lifecycle

Key wiring decisions:
- onRunCompleted callback resolves circular dep between run-manager and engine
- onRunStopped callback ensures stopped runs appear in activity log
- scheduledFiredKeys Set owned by engine, passed by ref to subscription setup
- fanInTracker.reset() added to stop() to prevent timer leaks across restarts
- calculateNextScheduledTime re-exported from engine for backwards compat

Tests: 475 passing, 0 regressions; added stopRun→activityLog coverage
@coderabbitai
Copy link

coderabbitai bot commented Mar 18, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 204121fa-bfd0-4174-b3be-7e178dc68446

📥 Commits

Reviewing files that changed from the base of the PR and between 1175e16 and d85290c.

📒 Files selected for processing (1)
  • src/main/cue/cue-subscription-setup.ts

📝 Walkthrough

Walkthrough

Modularizes the Cue Engine into managers (run manager, fan‑in tracker, heartbeat, subscription setup), adds an in‑memory ring‑buffer activity log, centralizes test mocks into shared helpers, removes the CueGraphView component, and updates related tests and renderer types/props.

Changes

Cohort / File(s) Summary
Test helpers
src/__tests__/main/cue/cue-test-helpers.ts
New shared mock factories: createMockSession, createMockConfig, createMockDeps.
Tests — migrated to helpers
src/__tests__/main/cue/cue-*.test.ts, src/__tests__/main/cue/cue-engine.test.ts, src/__tests__/main/cue/cue-activity-log.test.ts
Replaced inline mock factories with imports from cue-test-helpers; added activity-log tests and new concurrency/stopRun tests; adjusted setups.
Activity log
src/main/cue/cue-activity-log.ts
New in-memory ring-buffer activity log with push, getAll(limit?), and clear; exports CueActivityLog and createCueActivityLog.
Run manager
src/main/cue/cue-run-manager.ts
New CueRunManager: per-session concurrency, queuing/timeouts, two‑phase (outputPrompt) runs, per-run AbortController, lifecycle APIs and callbacks.
Fan‑in tracker
src/main/cue/cue-fan-in-tracker.ts
New CueFanInTracker: aggregates source completions, truncates outputs, handles timeouts (continue/break), emits aggregated agent.completed events.
Heartbeat & reconciliation
src/main/cue/cue-heartbeat.ts
New heartbeat module: periodic heartbeat writes (30s), sleep detection (2 min threshold), and reconciliation via reconcileMissedTimeEvents; exposes start/stop/detectSleepAndReconcile.
Subscription setup
src/main/cue/cue-subscription-setup.ts
New subscription wiring (heartbeat, scheduled, file watcher, GitHub poller, task scanner); adds calculateNextScheduledTime and per‑subscription setup functions.
CueEngine core refactor
src/main/cue/cue-engine.ts
Major refactor delegating state/control to new managers (CueRunManager, CueFanInTracker, CueHeartbeat), rewiring DI and lifecycle; re-exports calculateNextScheduledTime.
Types & renderer adjustments
src/shared/cue-pipeline-types.ts, src/renderer/components/CuePipelineEditor/CuePipelineEditor.tsx, src/renderer/components/CueModal.tsx
Added CueGraphSession type; updated imports/usages; added cueShortcutKeys?: string[] to CueModalProps; removed duplicate help render.
Component removal
src/renderer/components/CueGraphView.tsx
Removed the entire CueGraphView canvas visualization component and its rendering/layout/interaction logic.
Renderer test mock cleanup
src/__tests__/renderer/components/CueModal.test.tsx
Removed CueGraphView mock; retained CuePipelineEditor mock.
CI trigger update
.github/workflows/ci.yml
Added rc branch to pull_request trigger list.

Sequence Diagram(s)

sequenceDiagram
    participant Engine as Engine Start
    participant SubSetup as SubscriptionSetup
    participant RunMgr as CueRunManager
    participant FanIn as CueFanInTracker
    participant Heartbeat as CueHeartbeat
    participant Activity as CueActivityLog

    Engine->>Heartbeat: create/start
    Engine->>FanIn: create
    Engine->>RunMgr: create
    Engine->>SubSetup: setup subscriptions

    SubSetup->>RunMgr: execute(sessionId, prompt, event, subscriptionName, outputPrompt?, chainDepth?)
    RunMgr->>RunMgr: check concurrency / queue or dispatch
    alt dispatch
        RunMgr->>RunMgr: call onCueRun
        RunMgr->>Activity: push(result)
        RunMgr->>FanIn: notifyCompletion (if fan‑in)
    else queued
        RunMgr->>RunMgr: enqueue event
    end

    FanIn->>FanIn: aggregate completions
    alt all sources complete
        FanIn->>RunMgr: dispatch agent.completed (aggregated data)
    else timeout
        FanIn->>RunMgr: dispatch (partial or abort per timeout_on_fail)
    end

    Heartbeat->>Heartbeat: periodic write (30s)
    Heartbeat->>Heartbeat: detectSleepAndReconcile on start
Loading
sequenceDiagram
    participant Source as Event Source (file/schedule/github)
    participant SubSetup as SubscriptionSetup
    participant RunMgr as CueRunManager
    participant DB as DB/logging
    participant Callback as onRunCompleted
    participant Activity as CueActivityLog

    Source->>SubSetup: trigger event
    SubSetup->>SubSetup: filter/enabled check
    SubSetup->>RunMgr: execute(...)
    RunMgr->>DB: persist run event (non‑fatal)
    RunMgr->>RunMgr: onCueRun -> receive result
    RunMgr->>Callback: onRunCompleted(sessionId, result, subscriptionName, chainDepth)
    Callback->>Activity: push(result)
    Activity->>Activity: maintain ring buffer (trim oldest if needed)
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

Suggested labels

runmaestro.ai

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 12.50% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main changes: decomposing cue-engine.ts into focused modules and hardening functionality.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch cue-polish
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@reachraza
Copy link
Collaborator Author

@CodeRabbit review

@coderabbitai
Copy link

coderabbitai bot commented Mar 18, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@greptile-apps
Copy link

greptile-apps bot commented Mar 18, 2026

Greptile Summary

This PR decomposes cue-engine.ts (1,574 lines) into five focused single-responsibility modules (cue-activity-log, cue-heartbeat, cue-subscription-setup, cue-fan-in-tracker, cue-run-manager), deletes the dead CueGraphView.tsx (1,166 lines), deduplicates the CueGraphSession type into cue-pipeline-types.ts, and centralises test mock factories. The structural decomposition is well-designed — module boundaries are clean, the onRunCompleted / onRunStopped callbacks correctly break the circular dependency between the run manager and the engine, and the new fanInTracker.reset() call in stop() closes a timer-leak gap.

Two behavioural bugs were found in the extracted modules:

  • Stale nextTrigger after time.scheduled fires (cue-subscription-setup.ts): setupScheduledSubscription computes state.nextTriggers once at setup and never refreshes it inside checkAndFire, so the UI always shows the original (now past) scheduled time after the first fire. setupHeartbeatSubscription handles this correctly by updating nextTriggers on every tick.
  • stopRun does not free the concurrency slot synchronously (cue-run-manager.ts): activeRunCount is only decremented in the finally block of the in-flight async doExecuteCueRun, not inside stopRun. During the window between the synchronous stop and the async settle, new events are incorrectly queued even though the slot is logically free. If the underlying onCueRun promise never reacts to the abort, the slot leaks permanently.

Confidence Score: 3/5

  • Safe to merge as an architectural improvement, but two logic bugs in the extracted modules should be addressed before this lands on a release branch.
  • The decomposition is high quality and the test suite (475 passing) provides good coverage of happy-path and edge cases. However, two pre-existing bugs surfaced clearly in the new, isolated modules: nextTriggers not refreshing for time.scheduled (stale UI display) and activeRunCount not being decremented in stopRun (concurrency slot leak after a manual stop). Neither is a crash, but both can silently degrade scheduling accuracy and throughput under concurrent workloads.
  • src/main/cue/cue-run-manager.ts (concurrency accounting in stopRun) and src/main/cue/cue-subscription-setup.ts (nextTriggers stale after time.scheduled fires).

Important Files Changed

Filename Overview
src/main/cue/cue-engine.ts Core coordinator reduced from 1,574 → 719 lines by delegating to 5 new modules; wiring and lifecycle management look correct; minor: stopRun wrapper could be inlined but doesn't harm correctness.
src/main/cue/cue-run-manager.ts Manages run lifecycle and concurrency; stopRun does not decrement activeRunCount synchronously, leaving a stale slot until the underlying async promise settles — new events during that window are unnecessarily queued; stopAll iterates activeRuns while calling stopRun which deletes from it.
src/main/cue/cue-subscription-setup.ts setupScheduledSubscription never updates state.nextTriggers inside its checkAndFire callback, so the UI always shows the original (possibly past) schedule time after the first fire; setupHeartbeatSubscription handles this correctly on the same pattern.
src/main/cue/cue-fan-in-tracker.ts Well-extracted fan-in tracker; timer leak fix via reset() is good; timed-out source detection correctly handles name vs. ID matching through session lookup.
src/main/cue/cue-heartbeat.ts Clean extraction; heartbeat interval and sleep-detection logic are well separated; constants exported for testability.
src/main/cue/cue-activity-log.ts Correct ring-buffer implementation; slice(-maxSize) creates a new array on each overflow which is acceptable given the 500-entry cap; tests are thorough.
src/tests/main/cue/cue-test-helpers.ts Centralises mock factories previously duplicated across 6 test files; defaults are sensible and consistent.
src/shared/cue-pipeline-types.ts Canonical CueGraphSession now lives here; the anonymous subscriptions array shape is structurally compatible with CueSubscription from cue-types.ts, so IPC serialization remains correct.
src/renderer/components/CueGraphView.tsx File deleted (1,166 lines of dead code); confirmed superseded by CuePipelineEditor.
src/renderer/components/CueModal.tsx Local CueGraphSession type removed; now imports from cue-pipeline-types; no functional regressions visible.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    subgraph Subscriptions["cue-subscription-setup.ts"]
        HB[setupHeartbeatSubscription]
        SC[setupScheduledSubscription]
        FW[setupFileWatcherSubscription]
        GH[setupGitHubPollerSubscription]
        TS[setupTaskScannerSubscription]
    end

    subgraph Engine["cue-engine.ts (coordinator)"]
        ENG[CueEngine]
        NA[notifyAgentCompleted]
        DS[dispatchSubscription]
    end

    subgraph Modules["New focused modules"]
        AL[cue-activity-log.ts\nRing buffer 500 entries]
        HBM[cue-heartbeat.ts\nHeartbeat + sleep/wake]
        FI[cue-fan-in-tracker.ts\nFan-in completion tracking]
        RM[cue-run-manager.ts\nConcurrency + queue + lifecycle]
    end

    HB & SC & FW & GH & TS -->|executeCueRun| RM
    NA -->|single source| DS
    NA -->|multi source| FI
    FI -->|all sources arrived| DS
    DS -->|fan-out or direct| RM

    RM -->|onRunCompleted| ENG
    ENG -->|pushActivityLog| AL
    ENG -->|notifyAgentCompleted chain| NA
    RM -->|onRunStopped| AL

    HBM -->|detectSleepAndReconcile\nstart/stop| ENG
    ENG -->|initSession| Subscriptions
Loading

Last reviewed commit: "refactor(cue): decom..."

Comment on lines +230 to +239

// Check every 60 seconds to catch scheduled times
const timer = setInterval(checkAndFire, 60_000);
state.timers.push(timer);

// Calculate and track the next trigger time
const nextMs = calculateNextScheduledTime(times, sub.schedule_days);
if (nextMs != null) {
state.nextTriggers.set(sub.name, nextMs);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 nextTriggers never updated after time.scheduled fires

state.nextTriggers.set(sub.name, nextMs) is only called once at setup time (line 238) and is never refreshed inside checkAndFire. After the scheduled time passes, state.nextTriggers still holds the original (now past) timestamp. getStatus() in cue-engine.ts derives nextTrigger from this map, so the UI will display a stale past time indefinitely until the engine or session is restarted.

Compare to setupHeartbeatSubscription, which correctly updates nextTriggers inside the timer callback (line 139) on every fire:

const checkAndFire = () => {
    // ... existing guard/filter logic ...
    if (!times.includes(currentTime)) { /* evict stale keys */ return; }
    // ... double-fire guard ...

    deps.onLog('cue', `[CUE] "${sub.name}" triggered (time.scheduled, ${currentTime})`);
    state.lastTriggered = event.timestamp;
+   // Refresh next trigger time so the UI shows a future time, not this past one
+   const nextMs = calculateNextScheduledTime(times, sub.schedule_days);
+   if (nextMs != null) state.nextTriggers.set(sub.name, nextMs);
    deps.executeCueRun(...);
};

Comment on lines +354 to +374
stopRun(runId: string): boolean {
const run = activeRuns.get(runId);
if (!run) return false;

manuallyStoppedRuns.add(runId);
deps.onStopCueRun?.(runId);
run.abortController?.abort();
run.result.status = 'stopped';
run.result.endedAt = new Date().toISOString();
run.result.durationMs = Date.now() - new Date(run.result.startedAt).getTime();

activeRuns.delete(runId);
deps.onRunStopped(run.result);
deps.onLog('cue', `[CUE] Run stopped: ${runId}`, {
type: 'runStopped',
runId,
sessionId: run.result.sessionId,
subscriptionName: run.result.subscriptionName,
});
return true;
},
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 stopRun doesn't decrement activeRunCount, leaving a stale concurrency slot

stopRun removes the run from activeRuns and sets its status to 'stopped', which means getActiveRuns() immediately reflects the stop. However, activeRunCount is only decremented inside the finally block of the still-in-flight doExecuteCueRun async function — that only happens once the underlying onCueRun promise eventually settles (possibly after the abort signal propagates).

During the window between the synchronous stopRun call and the async finally block:

  • activeRunCount[sessionId] remains elevated
  • Any new event dispatch checks currentCount >= maxConcurrent against this stale count — and incorrectly queues the event even though a slot is actually free
  • If onCueRun never reacts to the abort (i.e., the promise hangs), the slot is permanently leaked

The fix is to also decrement activeRunCount eagerly in stopRun:

stopRun(runId: string): boolean {
    const run = activeRuns.get(runId);
    if (!run) return false;

    manuallyStoppedRuns.add(runId);
    deps.onStopCueRun?.(runId);
    run.abortController?.abort();
    run.result.status = 'stopped';
    run.result.endedAt = new Date().toISOString();
    run.result.durationMs = Date.now() - new Date(run.result.startedAt).getTime();

    activeRuns.delete(runId);
+   // Free the concurrency slot immediately so queued events can proceed
+   const count = activeRunCount.get(run.result.sessionId) ?? 1;
+   activeRunCount.set(run.result.sessionId, Math.max(0, count - 1));
+   drainQueue(run.result.sessionId);
    deps.onRunStopped(run.result);
    deps.onLog(/* ... */);
    return true;
}

The finally block's activeRunCount decrement and drainQueue call will then be a harmless no-op (count already at 0).

Comment on lines +376 to +381
stopAll(): void {
for (const [runId] of activeRuns) {
this.stopRun(runId);
}
eventQueue.clear();
},
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 stopAll mutates activeRuns while iterating over it

stopRun calls activeRuns.delete(runId) for the entry currently being iterated. While JavaScript's Map iterator is safe when you delete the current entry, this pattern is fragile and non-obvious. A small refactor using a snapshot avoids any confusion:

Suggested change
stopAll(): void {
for (const [runId] of activeRuns) {
this.stopRun(runId);
}
eventQueue.clear();
},
stopAll(): void {
for (const runId of [...activeRuns.keys()]) {
this.stopRun(runId);
}
eventQueue.clear();
},

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (8)
src/shared/cue-pipeline-types.ts (1)

117-143: Consider using the already-exported CueEventType for the event field.

The file already exports CueEventType (lines 10-17), but the subscription's event field uses string instead. Using CueEventType would provide type safety in the renderer while maintaining the "renderer-safe" design goal.

 subscriptions: Array<{
 	name: string;
-	event: string;
+	event: CueEventType;
 	enabled: boolean;

The relaxation of prompt to optional and schedule_days to string[] makes sense for renderer consumption where strict validation isn't required.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/shared/cue-pipeline-types.ts` around lines 117 - 143, The subscription
type in CueGraphSession currently types the event field as string; change it to
use the already-exported CueEventType to improve type safety while keeping
renderer-safe semantics—locate the CueGraphSession interface and replace the
subscription property event: string with event: CueEventType (ensure
CueEventType is imported/exported in this module or referenced directly) so
existing consumers of session.subscriptions benefit from the stricter enum/type
without altering other optional fields like prompt or schedule_days.
src/main/cue/cue-run-manager.ts (1)

387-389: getActiveRunMap returns mutable internal state.

This returns the actual activeRuns map rather than a copy, allowing callers to mutate internal state. If this is intentional for performance/internal use, consider documenting it. Otherwise, return a copy or make it read-only.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/cue/cue-run-manager.ts` around lines 387 - 389, getActiveRunMap
currently exposes the internal mutable Map activeRuns directly; change it to
return a safe view by either returning a shallow copy (e.g., new
Map(activeRuns)) or a ReadonlyMap to prevent external mutation, and update the
function signature/return type accordingly so callers cannot mutate internal
state (reference: getActiveRunMap and activeRuns).
src/main/cue/cue-heartbeat.ts (2)

39-41: Consider logging heartbeat write failures instead of silent catch.

While heartbeat failures during DB initialization are expected and non-fatal, silently swallowing errors makes debugging harder. Consider logging at debug level or adding a comment clarifying this is intentional startup behavior.

Optional: Add debug logging for visibility
 		try {
 			updateHeartbeat();
-		} catch {
-			// Non-fatal if DB not ready
+		} catch (error) {
+			// Non-fatal if DB not ready — expected during startup
+			deps.onLog('debug' as MainLogLevel, `[CUE] Initial heartbeat write skipped: ${error}`);
 		}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/cue/cue-heartbeat.ts` around lines 39 - 41, The catch block in the
heartbeat write path currently swallows errors silently; update the catch in the
function handling heartbeat writes (look for the try/catch around writeHeartbeat
or CueHeartbeat logic in cue-heartbeat.ts) to capture the error (e.g., catch
(err)) and log it at debug level via the project's logger (or add a short
clarifying comment if logging is not desired) so DB-not-ready startup failures
are visible for debugging while remaining non-fatal.

17-17: EVENT_PRUNE_AGE_MS is exported but unused in this file.

This constant appears to be intended for use in database pruning elsewhere. Consider moving it to cue-db.ts where it would be used, or add a comment indicating its intended consumer.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/cue/cue-heartbeat.ts` at line 17, EVENT_PRUNE_AGE_MS is declared and
exported from cue-heartbeat.ts but not used there; move or document it so its
ownership is clear. Either relocate the constant into the module that performs
pruning (e.g., cue-db.ts where prune logic or functions like
pruneOldEvents/pruneEvents exist) and export it from there, or keep it in
cue-heartbeat.ts but add a clear comment stating it’s intended to be consumed by
the pruning logic and reference the prune function name; update any imports
accordingly so pruneOldEvents (or the actual pruning function) imports
EVENT_PRUNE_AGE_MS from its new location.
src/main/cue/cue-subscription-setup.ts (1)

349-352: Task scanner log message accesses payload properties that may be undefined.

The log message assumes event.payload.taskCount and event.payload.filename exist. If the task scanner emits events with different payload shapes, this could log undefined.

Defensive access
 			deps.onLog(
 				'cue',
-				`[CUE] "${sub.name}" triggered (task.pending: ${event.payload.taskCount} task(s) in ${event.payload.filename})`
+				`[CUE] "${sub.name}" triggered (task.pending: ${event.payload.taskCount ?? 0} task(s) in ${event.payload.filename ?? 'unknown'})`
 			);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/cue/cue-subscription-setup.ts` around lines 349 - 352, The log in
cue-subscription-setup uses event.payload.taskCount and event.payload.filename
directly which can be undefined; update the call to deps.onLog (the '[CUE]
"${sub.name}" triggered...' message) to defensively access the payload using
optional chaining and sensible defaults (e.g., event.payload?.taskCount ?? '0'
and event.payload?.filename ?? '<unknown>') so the log never prints "undefined"
and remains informative even with varying event shapes.
src/main/cue/cue-fan-in-tracker.ts (2)

97-97: Guard against empty array in Math.max to prevent -Infinity result.

If completions is somehow empty (e.g., due to race conditions or unexpected state), Math.max(...[]) returns -Infinity, which would propagate an invalid chainDepth to downstream subscriptions.

Proposed fix
-			const maxChainDepth = Math.max(...completions.map((c) => c.chainDepth));
+			const maxChainDepth = completions.length > 0 ? Math.max(...completions.map((c) => c.chainDepth)) : 0;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/cue/cue-fan-in-tracker.ts` at line 97, Guard against an empty
completions array before calling Math.max: replace the current computation of
const maxChainDepth = Math.max(...completions.map((c) => c.chainDepth)) with a
safe version that defaults to 0 when completions is empty (e.g., check
completions.length === 0 and set maxChainDepth = 0, or use Math.max(0,
...completions.map(...))). This ensures maxChainDepth cannot become -Infinity
and prevents invalid chainDepth from propagating to downstream subscribers.

180-180: Same Math.max empty array guard needed here.

For consistency and defensive coding, apply the same guard when computing maxChainDepth for successful fan-in completion.

Proposed fix
-			const maxChainDepth = Math.max(...completions.map((c) => c.chainDepth));
+			const maxChainDepth = completions.length > 0 ? Math.max(...completions.map((c) => c.chainDepth)) : 0;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/cue/cue-fan-in-tracker.ts` at line 180, When computing maxChainDepth
from the completions array (the line defining maxChainDepth from
completions.map), add the same empty-array guard used elsewhere so it can't call
Math.max with no arguments; either include a zero/default value when reducing or
check completions.length and set maxChainDepth to 0 when empty. Update the
expression that assigns maxChainDepth (referencing the completions variable and
the maxChainDepth binding) to use that guard so it safely handles an empty
completions array.
src/main/cue/cue-engine.ts (1)

272-274: Consider a manager-level active-run count API.

getStatus() now reaches into getActiveRunMap().values() and r.result.sessionId, which couples CueEngine back to CueRunManager internals. A small getActiveRunCount(sessionId) helper would keep the new module boundary cleaner.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/cue/cue-engine.ts` around lines 272 - 274, getStatus() in CueEngine
is reaching into CueRunManager internals via getActiveRunMap() and
r.result.sessionId; add a manager-level helper like
CueRunManager.getActiveRunCount(sessionId) that returns the number of active
runs for a session, implement it to encapsulate the
[...getActiveRunMap().values()].filter(...) logic inside CueRunManager, and
update CueEngine.getStatus() to call getActiveRunCount(sessionId) instead of
iterating getActiveRunMap() directly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/main/cue/cue-engine.ts`:
- Around line 133-140: The call to this.runManager.execute (invoked in
onDispatch) should pass the resolved prompt body (sub.prompt) rather than the
config path (sub.prompt_file); update all similar call sites (including the
other block around the 589–605 area) to use sub.prompt and not sub.prompt_file,
and add a guard that logs an error and returns/bails when sub.prompt is missing
so we don't invoke onCueRun/execute with an undefined prompt; locate the call
sites by searching for this.runManager.execute and onDispatch and change the
argument list to supply sub.prompt, plus add a null-check that logs via the
existing logger and exits the dispatch early if sub.prompt is falsy.
- Around line 133-140: The onDispatch callbacks are calling runManager.execute
directly (e.g., runManager.execute(sessionId, sub.prompt_file ?? sub.prompt,
event, sub.name, sub.output_prompt)), bypassing dispatchSubscription so fan_out
target resolution and fanOutSource/fanOutIndex enrichment never run; replace
those direct runManager.execute calls with a call to
this.dispatchSubscription(...) (forwarding sessionId, the subscription object
`sub`, and the event) so dispatchSubscription handles fan_out resolution and
enrichment for heartbeat/scheduled/file/GitHub/task triggers (also apply the
same change where similar direct executes exist).

In `@src/main/cue/cue-run-manager.ts`:
- Around line 376-381: The stopAll method currently iterates activeRuns while
calling this.stopRun(runId) which mutates activeRuns; to fix, first capture the
run IDs into a stable list (e.g. Array.from(activeRuns.keys()) or
[...activeRuns.keys()]) and then iterate that list calling this.stopRun(runId)
so deletions during stopRun won't affect the iteration; keep the
eventQueue.clear() call after the loop. Ensure references to stopAll, stopRun,
activeRuns, and eventQueue are used so the correct symbols are updated.
- Around line 354-374: The stopRun method removes a run from activeRuns but
never decrements activeRunCount, leaving the active count inflated; update
stopRun (the function named stopRun) to decrement activeRunCount when a run is
successfully stopped (and guard against going below zero), and ensure any
existing bookkeeping performed in doExecuteCueRun's finally (which normally
decrements activeRunCount) is mirrored here so activeRunCount accurately
reflects the number of running jobs after manual stops; locate uses of
activeRuns, activeRunCount and doExecuteCueRun to mirror the same
decrement/notification behavior.

---

Nitpick comments:
In `@src/main/cue/cue-engine.ts`:
- Around line 272-274: getStatus() in CueEngine is reaching into CueRunManager
internals via getActiveRunMap() and r.result.sessionId; add a manager-level
helper like CueRunManager.getActiveRunCount(sessionId) that returns the number
of active runs for a session, implement it to encapsulate the
[...getActiveRunMap().values()].filter(...) logic inside CueRunManager, and
update CueEngine.getStatus() to call getActiveRunCount(sessionId) instead of
iterating getActiveRunMap() directly.

In `@src/main/cue/cue-fan-in-tracker.ts`:
- Line 97: Guard against an empty completions array before calling Math.max:
replace the current computation of const maxChainDepth =
Math.max(...completions.map((c) => c.chainDepth)) with a safe version that
defaults to 0 when completions is empty (e.g., check completions.length === 0
and set maxChainDepth = 0, or use Math.max(0, ...completions.map(...))). This
ensures maxChainDepth cannot become -Infinity and prevents invalid chainDepth
from propagating to downstream subscribers.
- Line 180: When computing maxChainDepth from the completions array (the line
defining maxChainDepth from completions.map), add the same empty-array guard
used elsewhere so it can't call Math.max with no arguments; either include a
zero/default value when reducing or check completions.length and set
maxChainDepth to 0 when empty. Update the expression that assigns maxChainDepth
(referencing the completions variable and the maxChainDepth binding) to use that
guard so it safely handles an empty completions array.

In `@src/main/cue/cue-heartbeat.ts`:
- Around line 39-41: The catch block in the heartbeat write path currently
swallows errors silently; update the catch in the function handling heartbeat
writes (look for the try/catch around writeHeartbeat or CueHeartbeat logic in
cue-heartbeat.ts) to capture the error (e.g., catch (err)) and log it at debug
level via the project's logger (or add a short clarifying comment if logging is
not desired) so DB-not-ready startup failures are visible for debugging while
remaining non-fatal.
- Line 17: EVENT_PRUNE_AGE_MS is declared and exported from cue-heartbeat.ts but
not used there; move or document it so its ownership is clear. Either relocate
the constant into the module that performs pruning (e.g., cue-db.ts where prune
logic or functions like pruneOldEvents/pruneEvents exist) and export it from
there, or keep it in cue-heartbeat.ts but add a clear comment stating it’s
intended to be consumed by the pruning logic and reference the prune function
name; update any imports accordingly so pruneOldEvents (or the actual pruning
function) imports EVENT_PRUNE_AGE_MS from its new location.

In `@src/main/cue/cue-run-manager.ts`:
- Around line 387-389: getActiveRunMap currently exposes the internal mutable
Map activeRuns directly; change it to return a safe view by either returning a
shallow copy (e.g., new Map(activeRuns)) or a ReadonlyMap to prevent external
mutation, and update the function signature/return type accordingly so callers
cannot mutate internal state (reference: getActiveRunMap and activeRuns).

In `@src/main/cue/cue-subscription-setup.ts`:
- Around line 349-352: The log in cue-subscription-setup uses
event.payload.taskCount and event.payload.filename directly which can be
undefined; update the call to deps.onLog (the '[CUE] "${sub.name}" triggered...'
message) to defensively access the payload using optional chaining and sensible
defaults (e.g., event.payload?.taskCount ?? '0' and event.payload?.filename ??
'<unknown>') so the log never prints "undefined" and remains informative even
with varying event shapes.

In `@src/shared/cue-pipeline-types.ts`:
- Around line 117-143: The subscription type in CueGraphSession currently types
the event field as string; change it to use the already-exported CueEventType to
improve type safety while keeping renderer-safe semantics—locate the
CueGraphSession interface and replace the subscription property event: string
with event: CueEventType (ensure CueEventType is imported/exported in this
module or referenced directly) so existing consumers of session.subscriptions
benefit from the stricter enum/type without altering other optional fields like
prompt or schedule_days.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 436c5e2d-b9bb-4ffa-93c6-a9953bce3efd

📥 Commits

Reviewing files that changed from the base of the PR and between d6b746f and 1bb0c1e.

📒 Files selected for processing (19)
  • src/__tests__/main/cue/cue-activity-log.test.ts
  • src/__tests__/main/cue/cue-completion-chains.test.ts
  • src/__tests__/main/cue/cue-concurrency.test.ts
  • src/__tests__/main/cue/cue-engine.test.ts
  • src/__tests__/main/cue/cue-multi-hop-chains.test.ts
  • src/__tests__/main/cue/cue-session-lifecycle.test.ts
  • src/__tests__/main/cue/cue-sleep-wake.test.ts
  • src/__tests__/main/cue/cue-test-helpers.ts
  • src/__tests__/renderer/components/CueModal.test.tsx
  • src/main/cue/cue-activity-log.ts
  • src/main/cue/cue-engine.ts
  • src/main/cue/cue-fan-in-tracker.ts
  • src/main/cue/cue-heartbeat.ts
  • src/main/cue/cue-run-manager.ts
  • src/main/cue/cue-subscription-setup.ts
  • src/renderer/components/CueGraphView.tsx
  • src/renderer/components/CueModal.tsx
  • src/renderer/components/CuePipelineEditor/CuePipelineEditor.tsx
  • src/shared/cue-pipeline-types.ts
💤 Files with no reviewable changes (2)
  • src/tests/renderer/components/CueModal.test.tsx
  • src/renderer/components/CueGraphView.tsx

Comment on lines +354 to +374
stopRun(runId: string): boolean {
const run = activeRuns.get(runId);
if (!run) return false;

manuallyStoppedRuns.add(runId);
deps.onStopCueRun?.(runId);
run.abortController?.abort();
run.result.status = 'stopped';
run.result.endedAt = new Date().toISOString();
run.result.durationMs = Date.now() - new Date(run.result.startedAt).getTime();

activeRuns.delete(runId);
deps.onRunStopped(run.result);
deps.onLog('cue', `[CUE] Run stopped: ${runId}`, {
type: 'runStopped',
runId,
sessionId: run.result.sessionId,
subscriptionName: run.result.subscriptionName,
});
return true;
},
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Missing activeRunCount decrement in stopRun.

When a run is manually stopped, activeRuns is cleared (line 365) but activeRunCount is not decremented. This leaves the count inflated, potentially blocking future runs from dispatching when they should have available slots.

The finally block in doExecuteCueRun (lines 271-274) handles decrement for normal completions, but stopRun bypasses that flow since it removes the run from activeRuns immediately.

Proposed fix
 		stopRun(runId: string): boolean {
 			const run = activeRuns.get(runId);
 			if (!run) return false;

 			manuallyStoppedRuns.add(runId);
 			deps.onStopCueRun?.(runId);
 			run.abortController?.abort();
 			run.result.status = 'stopped';
 			run.result.endedAt = new Date().toISOString();
 			run.result.durationMs = Date.now() - new Date(run.result.startedAt).getTime();

 			activeRuns.delete(runId);
+
+			// Decrement active run count and drain queue
+			const sessionId = run.result.sessionId;
+			const count = activeRunCount.get(sessionId) ?? 1;
+			activeRunCount.set(sessionId, Math.max(0, count - 1));
+			drainQueue(sessionId);
+
 			deps.onRunStopped(run.result);
 			deps.onLog('cue', `[CUE] Run stopped: ${runId}`, {
 				type: 'runStopped',
 				runId,
 				sessionId: run.result.sessionId,
 				subscriptionName: run.result.subscriptionName,
 			});
 			return true;
 		},
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/cue/cue-run-manager.ts` around lines 354 - 374, The stopRun method
removes a run from activeRuns but never decrements activeRunCount, leaving the
active count inflated; update stopRun (the function named stopRun) to decrement
activeRunCount when a run is successfully stopped (and guard against going below
zero), and ensure any existing bookkeeping performed in doExecuteCueRun's
finally (which normally decrements activeRunCount) is mirrored here so
activeRunCount accurately reflects the number of running jobs after manual
stops; locate uses of activeRuns, activeRunCount and doExecuteCueRun to mirror
the same decrement/notification behavior.

…r scheduled fire

- stopRun now eagerly decrements activeRunCount and drains the queue,
  so queued events dispatch immediately instead of waiting for the
  async finally block (which may never run if onCueRun hangs)
- finally block skips its decrement for manually stopped runs to
  prevent double-decrement race with newly drained runs
- setupScheduledSubscription now recalculates nextTriggers after each
  fire so the UI shows the next future time, not a stale past one
- stopAll uses snapshot of keys to avoid mutating Map during iteration
- Added regression tests for both fixes (477 total cue tests passing)
…n and types

- Heartbeat onDispatch now calls dispatchSubscription instead of
  runManager.execute directly, so fan-out works for reconciled events
- Added getActiveRunCount(sessionId) to CueRunManager, used by
  getStatus() instead of reaching into getActiveRunMap() internals
- Guard Math.max in fan-in-tracker against empty completions array
  to avoid -Infinity chainDepth propagation
- CueGraphSession.event typed as CueEventType instead of string
@reachraza
Copy link
Collaborator Author

@CodeRabbit review

@coderabbitai
Copy link

coderabbitai bot commented Mar 18, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

♻️ Duplicate comments (2)
src/main/cue/cue-engine.ts (2)

581-594: ⚠️ Potential issue | 🟠 Major

Pass the resolved prompt text here, not prompt_file.

sub.prompt_file is the config path, while sub.prompt is the loaded prompt body. Using sub.prompt_file ?? sub.prompt here makes manual dispatches and fan-out runs execute the filesystem path whenever the subscription is file-backed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/cue/cue-engine.ts` around lines 581 - 594, The code is passing the
config path (sub.prompt_file) instead of the loaded prompt body to
runManager.execute; change the argument(s) that currently use sub.prompt_file ??
sub.prompt to use the resolved prompt text (sub.prompt ?? sub.prompt_file) so
both the fan-out call (this.runManager.execute(targetSession.id, sub.prompt_file
?? sub.prompt, fanOutEvent, ...)) and the manual/owner dispatch call
(this.runManager.execute(ownerSessionId, sub.prompt_file ?? sub.prompt, event,
...)) pass the actual prompt content (or fall back to the file path) to
runManager.execute.

640-646: ⚠️ Potential issue | 🟠 Major

Setup-driven triggers are bypassing dispatchSubscription() again.

This callback feeds heartbeat, scheduled, file, GitHub, and task events straight into runManager.execute(), so they skip fan-out target resolution and any centralized dispatch-time enrichment. The helper interface needs to pass the full CueSubscription back up so the engine can delegate through dispatchSubscription() instead of pushing raw prompt pieces into the run manager.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/cue/cue-engine.ts` around lines 640 - 646, The setup callback
executeCueRun in the SubscriptionSetupDeps is bypassing dispatch by calling
runManager.execute(...) directly; change its contract to accept the full
CueSubscription (e.g., executeCueRun: (sid, subscription: CueSubscription,
event, subName, outputPrompt) or simply (subscription: CueSubscription, ...))
and update the implementation in cue-engine.ts to call
this.dispatchSubscription(subscription, ...) so the engine performs fan-out
target resolution and dispatch-time enrichment instead of injecting raw prompt
pieces into runManager.execute; update any callers that construct setupDeps and
the SubscriptionSetupDeps type to pass the full CueSubscription through.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/main/cue/cue-fan-in-tracker.ts`:
- Around line 83-95: The fan-in event payload currently drops single-source
keys; update the payload created in the event object inside cue-fan-in-tracker
(the const event: CueEvent = { ... payload: { ... } }) to preserve
sourceSessionId, status, exitCode, durationMs, and triggeredBy — when there is
one completion keep its values, and when multiple either join them into a string
or use the first-completion as a fallback so the keys still exist (e.g.
sourceSessionId:
completions.length===1?completions[0].sessionId:completions.map(c=>c.sessionId).join(',')),
and apply the same change to the other fan-in block around the second payload
creation (the similar payload at the 169-179 region) so downstream
{{CUE_SOURCE_*}} consumers keep seeing those fields.

In `@src/main/cue/cue-run-manager.ts`:
- Around line 207-240: The output phase creates a new outputRunId and calls
deps.onCueRun(...) so stopping only the original runId doesn't affect the output
phase; modify cue-run-manager so the secondary phase reuses the primary runId
(or registers the outputRunId in activeRuns and maps it to the same stop
handler) so onStopCueRun/stopRun can stop the output phase: update the block
that generates outputRunId, recordCueEvent and calls deps.onCueRun to either (a)
use the existing runId variable for the output phase instead of outputRunId, or
(b) add outputRunId into activeRuns with the same stop handler and ensure
subscriptionName/outputRunId are cleaned up when the output phase finishes;
apply the same change to the similar code at the other location (lines noted in
review).
- Around line 164-175: The try/catch around recordCueEvent and
updateCueEventStatus should not silently swallow all errors; instead catch only
the expected recoverable DB-unavailable error (e.g., a specific error
code/class) and handle that, otherwise call the Sentry reporting helper
(captureException and/or captureMessage) with contextual details (include runId,
event.type, triggerName, sessionId, subscriptionName) and rethrow or surface the
error; import captureException/captureMessage and apply the same change to the
other similar blocks in this file that wrap recordCueEvent/updateCueEventStatus
so failures are reported to Sentry rather than dropped.
- Around line 387-391: stopAll() currently iterates activeRuns and calls
stopRun(), but stopRun() decrements sessions and calls drainQueue(), which can
promote queued runs into activeRuns while we're iterating; to fix, prevent
drainQueue() from running during stopAll: take a snapshot of activeRuns keys as
you already do, set a temporary flag (e.g. this.suspendDrain = true) or call
stopRun with a new parameter (e.g. stopRun(runId, {suppressDrain: true})), stop
each run using that suppression, then clear eventQueue, clear the flag
(this.suspendDrain = false) and call drainQueue() once; update stopRun() to
respect the flag/parameter before invoking drainQueue() (references: stopAll(),
stopRun(), activeRuns, eventQueue, drainQueue()).

In `@src/main/cue/cue-subscription-setup.ts`:
- Around line 215-227: The filter-return branch in the subscription handling
prevents advancing state.nextTriggers when a scheduled slot is skipped; move or
duplicate the next trigger calculation so calculateNextScheduledTime(times,
sub.schedule_days) is called and state.nextTriggers.set(sub.name, nextMs) is
executed even when matchesFilter(event.payload, sub.filter) is false (before
returning after deps.onLog), ensuring CueEngine.getStatus() sees an updated next
trigger; reference matchesFilter, deps.onLog, state.nextTriggers,
calculateNextScheduledTime, sub.name, sub.filter, times, and sub.schedule_days.

---

Duplicate comments:
In `@src/main/cue/cue-engine.ts`:
- Around line 581-594: The code is passing the config path (sub.prompt_file)
instead of the loaded prompt body to runManager.execute; change the argument(s)
that currently use sub.prompt_file ?? sub.prompt to use the resolved prompt text
(sub.prompt ?? sub.prompt_file) so both the fan-out call
(this.runManager.execute(targetSession.id, sub.prompt_file ?? sub.prompt,
fanOutEvent, ...)) and the manual/owner dispatch call
(this.runManager.execute(ownerSessionId, sub.prompt_file ?? sub.prompt, event,
...)) pass the actual prompt content (or fall back to the file path) to
runManager.execute.
- Around line 640-646: The setup callback executeCueRun in the
SubscriptionSetupDeps is bypassing dispatch by calling runManager.execute(...)
directly; change its contract to accept the full CueSubscription (e.g.,
executeCueRun: (sid, subscription: CueSubscription, event, subName,
outputPrompt) or simply (subscription: CueSubscription, ...)) and update the
implementation in cue-engine.ts to call this.dispatchSubscription(subscription,
...) so the engine performs fan-out target resolution and dispatch-time
enrichment instead of injecting raw prompt pieces into runManager.execute;
update any callers that construct setupDeps and the SubscriptionSetupDeps type
to pass the full CueSubscription through.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 7bad19d4-a087-4300-b3da-c8258d843b16

📥 Commits

Reviewing files that changed from the base of the PR and between ad43e04 and e751a31.

📒 Files selected for processing (7)
  • src/__tests__/main/cue/cue-concurrency.test.ts
  • src/__tests__/main/cue/cue-engine.test.ts
  • src/main/cue/cue-engine.ts
  • src/main/cue/cue-fan-in-tracker.ts
  • src/main/cue/cue-run-manager.ts
  • src/main/cue/cue-subscription-setup.ts
  • src/shared/cue-pipeline-types.ts

Comment on lines +182 to +184
// Clear concurrency and fan-in state
this.runManager.reset();
this.fanInTracker.reset();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

stop() forgets active runs without actually stopping them.

runManager.reset() only drops bookkeeping. If the engine is disabled while agents are still running, those processes keep executing, later callbacks can still append to the activity log, and the manager has already lost the run ids and the onStopCueRun path.

Comment on lines +83 to +95
const event: CueEvent = {
id: crypto.randomUUID(),
type: 'agent.completed',
timestamp: new Date().toISOString(),
triggerName: sub.name,
payload: {
completedSessions: completions.map((c) => c.sessionId),
timedOutSessions: timedOutSources,
sourceSession: completions.map((c) => c.sessionName).join(', '),
sourceOutput: completions.map((c) => c.output).join('\n---\n'),
outputTruncated: completions.some((c) => c.truncated),
partial: true,
},
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Preserve the existing agent.completed fields on fan-in events.

Single-source completions from src/main/cue/cue-engine.ts:494-509 still populate sourceSessionId, status, exitCode, durationMs, and triggeredBy, and src/main/cue/cue-executor.ts:155-164 / src/shared/templateVariables.ts:361-366 still read those keys for {{CUE_SOURCE_*}}. These aggregated payloads drop them, so downstream templates go blank as soon as a chain switches from single-source to fan-in.

Also applies to: 169-179

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/cue/cue-fan-in-tracker.ts` around lines 83 - 95, The fan-in event
payload currently drops single-source keys; update the payload created in the
event object inside cue-fan-in-tracker (the const event: CueEvent = { ...
payload: { ... } }) to preserve sourceSessionId, status, exitCode, durationMs,
and triggeredBy — when there is one completion keep its values, and when
multiple either join them into a string or use the first-completion as a
fallback so the keys still exist (e.g. sourceSessionId:
completions.length===1?completions[0].sessionId:completions.map(c=>c.sessionId).join(',')),
and apply the same change to the other fan-in block around the second payload
creation (the similar payload at the 169-179 region) so downstream
{{CUE_SOURCE_*}} consumers keep seeing those fields.

Comment on lines +164 to +175
recordCueEvent({
id: runId,
type: event.type,
triggerName: event.triggerName,
sessionId,
subscriptionName,
status: 'running',
payload: JSON.stringify(event.payload),
});
} catch {
// Non-fatal if DB is unavailable
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t drop cue-db failures silently.

If recordCueEvent() or updateCueEventStatus() starts failing, event persistence diverges from reality with no telemetry at all. Please either narrow these catches to the expected recoverable DB-unavailable case or report them through the main-process Sentry utility.

As per coding guidelines, "Do not silently swallow errors with try-catch blocks that only log. Let exceptions bubble up to Sentry for error tracking in production. Only catch and handle expected/recoverable errors explicitly (e.g., NETWORK_ERROR)." and "Use Sentry utilities for explicit reporting: import captureException and captureMessage from src/main/utils/sentry.ts or src/renderer/utils/sentry.ts to report exceptions with context and notable events."

Also applies to: 218-230, 242-246, 281-293

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/cue/cue-run-manager.ts` around lines 164 - 175, The try/catch around
recordCueEvent and updateCueEventStatus should not silently swallow all errors;
instead catch only the expected recoverable DB-unavailable error (e.g., a
specific error code/class) and handle that, otherwise call the Sentry reporting
helper (captureException and/or captureMessage) with contextual details (include
runId, event.type, triggerName, sessionId, subscriptionName) and rethrow or
surface the error; import captureException/captureMessage and apply the same
change to the other similar blocks in this file that wrap
recordCueEvent/updateCueEventStatus so failures are reported to Sentry rather
than dropped.

Comment on lines +207 to +240
const outputRunId = crypto.randomUUID();
const outputEvent: CueEvent = {
...event,
id: crypto.randomUUID(),
payload: {
...event.payload,
sourceOutput: result.stdout.substring(0, SOURCE_OUTPUT_MAX_CHARS),
outputPromptPhase: true,
},
};

try {
recordCueEvent({
id: outputRunId,
type: event.type,
triggerName: event.triggerName,
sessionId,
subscriptionName: `${subscriptionName}:output`,
status: 'running',
payload: JSON.stringify(outputEvent.payload),
});
} catch {
// Non-fatal if DB is unavailable
}

const contextPrompt = `${outputPrompt}\n\n---\n\nContext from completed task:\n${result.stdout.substring(0, SOURCE_OUTPUT_MAX_CHARS)}`;
const outputResult = await deps.onCueRun({
runId: outputRunId,
sessionId,
prompt: contextPrompt,
subscriptionName: `${subscriptionName}:output`,
event: outputEvent,
timeoutMs,
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make the output phase stoppable by the same active run.

The secondary onCueRun() call is issued under a fresh outputRunId, but only the primary runId is stored in activeRuns and exposed to stopRun(). If a user stops the run during that second phase, onStopCueRun targets the wrong id, the actual output process keeps running, and the freed concurrency slot can be reused while work is still in flight.

Also applies to: 358-377

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/cue/cue-run-manager.ts` around lines 207 - 240, The output phase
creates a new outputRunId and calls deps.onCueRun(...) so stopping only the
original runId doesn't affect the output phase; modify cue-run-manager so the
secondary phase reuses the primary runId (or registers the outputRunId in
activeRuns and maps it to the same stop handler) so onStopCueRun/stopRun can
stop the output phase: update the block that generates outputRunId,
recordCueEvent and calls deps.onCueRun to either (a) use the existing runId
variable for the output phase instead of outputRunId, or (b) add outputRunId
into activeRuns with the same stop handler and ensure
subscriptionName/outputRunId are cleaned up when the output phase finishes;
apply the same change to the similar code at the other location (lines noted in
review).

Comment on lines +387 to +391
stopAll(): void {
for (const runId of [...activeRuns.keys()]) {
this.stopRun(runId);
}
eventQueue.clear();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

stopAll() can start queued runs before it clears the queue.

Each stopRun() eagerly decrements the session count and calls drainQueue(). In this loop that means stopping the first active run can immediately promote queued work into new active runs, and those new run ids are not part of the snapshot you are iterating.

Minimal fix
 stopAll(): void {
+	eventQueue.clear();
 	for (const runId of [...activeRuns.keys()]) {
 		this.stopRun(runId);
 	}
-	eventQueue.clear();
 },
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/cue/cue-run-manager.ts` around lines 387 - 391, stopAll() currently
iterates activeRuns and calls stopRun(), but stopRun() decrements sessions and
calls drainQueue(), which can promote queued runs into activeRuns while we're
iterating; to fix, prevent drainQueue() from running during stopAll: take a
snapshot of activeRuns keys as you already do, set a temporary flag (e.g.
this.suspendDrain = true) or call stopRun with a new parameter (e.g.
stopRun(runId, {suppressDrain: true})), stop each run using that suppression,
then clear eventQueue, clear the flag (this.suspendDrain = false) and call
drainQueue() once; update stopRun() to respect the flag/parameter before
invoking drainQueue() (references: stopAll(), stopRun(), activeRuns, eventQueue,
drainQueue()).

…riptions

Move calculateNextScheduledTime call above the filter gate so the
UI's "Next" column updates even when the filter suppresses the event.
Previously nextTriggers would go stale on filtered-out scheduled fires.
@reachraza
Copy link
Collaborator Author

@CodeRabbit review

@coderabbitai
Copy link

coderabbitai bot commented Mar 18, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
src/main/cue/cue-subscription-setup.ts (1)

299-301: Type assertion may mask invalid event types.

The cast sub.event as 'github.pull_request' | 'github.issue' assumes the event is always one of these two values. If CueSubscription allows other event types or if config validation is bypassed, this could pass an invalid value to createCueGitHubPoller.

Consider adding a runtime guard or ensuring the caller only invokes this function for GitHub event types.

Proposed defensive check
 export function setupGitHubPollerSubscription(
 	deps: SubscriptionSetupDeps,
 	session: SessionInfo,
 	state: SubscriptionSetupState,
 	sub: CueSubscription
 ): void {
+	if (sub.event !== 'github.pull_request' && sub.event !== 'github.issue') {
+		deps.onLog('warn', `[CUE] "${sub.name}" has unsupported GitHub event type: ${sub.event}`);
+		return;
+	}
+
 	const cleanup = createCueGitHubPoller({
-		eventType: sub.event as 'github.pull_request' | 'github.issue',
+		eventType: sub.event,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/cue/cue-subscription-setup.ts` around lines 299 - 301, The code
currently force-casts sub.event to 'github.pull_request' | 'github.issue' when
calling createCueGitHubPoller which can mask invalid event types; add a runtime
guard in the surrounding function to explicitly check sub.event (from
CueSubscription) is one of the allowed values ('github.pull_request' or
'github.issue') before calling createCueGitHubPoller, and if not either throw a
clear error or skip/return with logging so an invalid event cannot be passed
through — reference sub.event, createCueGitHubPoller, and the CueSubscription
type when adding this validation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/main/cue/cue-subscription-setup.ts`:
- Around line 106-117: The immediate/initial heartbeat branch in
cue-subscription-setup.ts triggers executeCueRun for matching subscriptions but
does not update state.lastTriggered, causing inconsistency with the interval
branch; update the subscription state by setting state.lastTriggered =
immediateEvent.timestamp (or equivalent timestamp field) immediately after
deps.executeCueRun(...) in the immediate-event path (referencing immediateEvent,
sub, deps.executeCueRun and state.lastTriggered) so the initial fire is recorded
the same way interval-based fires are.

---

Nitpick comments:
In `@src/main/cue/cue-subscription-setup.ts`:
- Around line 299-301: The code currently force-casts sub.event to
'github.pull_request' | 'github.issue' when calling createCueGitHubPoller which
can mask invalid event types; add a runtime guard in the surrounding function to
explicitly check sub.event (from CueSubscription) is one of the allowed values
('github.pull_request' or 'github.issue') before calling createCueGitHubPoller,
and if not either throw a clear error or skip/return with logging so an invalid
event cannot be passed through — reference sub.event, createCueGitHubPoller, and
the CueSubscription type when adding this validation.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 1d6724f0-5388-48be-bffe-51f1a4691d6f

📥 Commits

Reviewing files that changed from the base of the PR and between e751a31 and 1175e16.

📒 Files selected for processing (1)
  • src/main/cue/cue-subscription-setup.ts

The immediate heartbeat dispatch was missing the state.lastTriggered
update that the interval branch already performs, leaving the Cue
modal's "Last Triggered" column empty until the first interval tick.
@reachraza
Copy link
Collaborator Author

@CodeRabbit review

@coderabbitai
Copy link

coderabbitai bot commented Mar 18, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/main/cue/cue-subscription-setup.ts`:
- Line 33: The loop in calculateNextScheduledTime currently iterates with
dayOffset < 7 which excludes the same weekday exactly one week ahead and can
return null for valid weekly schedules; change the loop bound to include the 7th
day (e.g., dayOffset <= 7 or iterate up to and including 7) so that dayOffset
covers 0..7, ensuring nextTriggers considers the same-day-next-week candidate
when today's slot has already passed, and verify the logic around nextTriggers
still correctly selects the earliest valid trigger.
- Around line 187-201: The stale-key eviction loop currently runs only in the
non-match branch causing deps.scheduledFiredKeys to grow and stale same-minute
keys to suppress future triggers; move the eviction logic that iterates
deps.scheduledFiredKeys and deletes keys not ending with `:${currentTime}` so it
runs unconditionally before computing firedKey and before the guard that checks
deps.scheduledFiredKeys.has(firedKey) (use the same session.id, sub.name and
currentTime values), then proceed with creating firedKey and the has/add logic
as before.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 5611db21-bf6d-4472-bf0f-8280a414c6e8

📥 Commits

Reviewing files that changed from the base of the PR and between 1175e16 and 67887ed.

📒 Files selected for processing (1)
  • src/main/cue/cue-subscription-setup.ts

Comment on lines +187 to +201
// Evict stale fired-keys from previous minutes
for (const key of deps.scheduledFiredKeys) {
if (key.startsWith(`${session.id}:${sub.name}:`) && !key.endsWith(`:${currentTime}`)) {
deps.scheduledFiredKeys.delete(key);
}
}
return;
}

// Guard against double-fire (e.g., config refresh within the same minute)
const firedKey = `${session.id}:${sub.name}:${currentTime}`;
if (deps.scheduledFiredKeys.has(firedKey)) {
return;
}
deps.scheduledFiredKeys.add(firedKey);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Move stale scheduledFiredKeys eviction out of the non-match branch.

At Line 187, eviction only runs when currentTime is not scheduled. For schedules that match every minute, keys never clear, the set grows indefinitely, and same-minute keys can suppress future-day triggers.

💡 Proposed fix
 		const now = new Date();
 		const currentDay = DAY_NAMES[now.getDay()];
 		const currentTime = `${String(now.getHours()).padStart(2, '0')}:${String(now.getMinutes()).padStart(2, '0')}`;
+		const keyPrefix = `${session.id}:${sub.name}:`;

+		// Evict stale fired-keys every tick (including always-matching schedules)
+		for (const key of deps.scheduledFiredKeys) {
+			if (key.startsWith(keyPrefix) && !key.endsWith(`:${currentTime}`)) {
+				deps.scheduledFiredKeys.delete(key);
+			}
+		}
+
 		// Check day filter (if specified, current day must match)
 		if (sub.schedule_days && sub.schedule_days.length > 0) {
 			if (!sub.schedule_days.includes(currentDay)) {
 				return;
 			}
 		}

 		// Check if current time matches any scheduled time
 		if (!times.includes(currentTime)) {
-			// Evict stale fired-keys from previous minutes
-			for (const key of deps.scheduledFiredKeys) {
-				if (key.startsWith(`${session.id}:${sub.name}:`) && !key.endsWith(`:${currentTime}`)) {
-					deps.scheduledFiredKeys.delete(key);
-				}
-			}
 			return;
 		}

 		// Guard against double-fire (e.g., config refresh within the same minute)
-		const firedKey = `${session.id}:${sub.name}:${currentTime}`;
+		const firedKey = `${keyPrefix}${currentTime}`;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main/cue/cue-subscription-setup.ts` around lines 187 - 201, The stale-key
eviction loop currently runs only in the non-match branch causing
deps.scheduledFiredKeys to grow and stale same-minute keys to suppress future
triggers; move the eviction logic that iterates deps.scheduledFiredKeys and
deletes keys not ending with `:${currentTime}` so it runs unconditionally before
computing firedKey and before the guard that checks
deps.scheduledFiredKeys.has(firedKey) (use the same session.id, sub.name and
currentTime values), then proceed with creating firedKey and the has/add logic
as before.

Loop bound was dayOffset < 7 (0..6), missing the 7th day offset.
When a single-day schedule's slot had already passed (e.g. Mon 09:00
checked at Mon 09:01), the function returned null because next Monday
was at offset 7. Changed to <= 7 so weekly schedules always resolve.
@reachraza
Copy link
Collaborator Author

@CodeRabbit review

@coderabbitai
Copy link

coderabbitai bot commented Mar 18, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@reachraza reachraza merged commit bd9ba47 into rc Mar 18, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant