Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Maestro
TEMP-REFACTORING-PLAN.md
TEMP-REFACTORING-PLAN-2.md
CUE-NOTES.md
CUE-REFACTORING.md
Auto\ Run\ Docs/
Work\ Trees/
community-data/
Expand Down
199 changes: 199 additions & 0 deletions src/__tests__/main/cue/cue-concurrency.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -633,4 +633,203 @@ describe('CueEngine Concurrency Control', () => {
engine.stop();
});
});

describe('stress scenarios', () => {
it('processes multiple queued events in FIFO order', async () => {
const resolvers: ((val: CueRunResult) => void)[] = [];
const callOrder: number[] = [];
let callCount = 0;

const deps = createMockDeps({
onCueRun: vi.fn(() => {
const idx = callCount++;
callOrder.push(idx);
return new Promise<CueRunResult>((resolve) => {
resolvers.push(resolve);
});
}),
});

const config = createMockConfig({
settings: {
timeout_minutes: 30,
timeout_on_fail: 'break',
max_concurrent: 1,
queue_size: 10,
},
subscriptions: [
{
name: 'timer',
event: 'time.heartbeat',
enabled: true,
prompt: 'test',
interval_minutes: 1,
},
],
});
mockLoadCueConfig.mockReturnValue(config);
const engine = new CueEngine(deps);
engine.start();

// Initial heartbeat occupies the slot
await vi.advanceTimersByTimeAsync(10);
expect(deps.onCueRun).toHaveBeenCalledTimes(1);

// Queue several events via timer advances
vi.advanceTimersByTime(1 * 60 * 1000); // queued: 1
vi.advanceTimersByTime(1 * 60 * 1000); // queued: 2
vi.advanceTimersByTime(1 * 60 * 1000); // queued: 3
expect(engine.getQueueStatus().get('session-1')).toBe(3);
expect(deps.onCueRun).toHaveBeenCalledTimes(1); // still only 1 dispatched

// Resolve the first run -> drain dispatches next queued event
const makeResult = (runId: string): CueRunResult => ({
runId,
sessionId: 'session-1',
sessionName: 'Test Session',
subscriptionName: 'timer',
event: {} as CueEvent,
status: 'completed',
stdout: '',
stderr: '',
exitCode: 0,
durationMs: 100,
startedAt: new Date().toISOString(),
endedAt: new Date().toISOString(),
});

resolvers[0](makeResult('r0'));
await vi.advanceTimersByTimeAsync(10);
expect(deps.onCueRun).toHaveBeenCalledTimes(2);

// Resolve second -> third dispatched
resolvers[1](makeResult('r1'));
await vi.advanceTimersByTimeAsync(10);
expect(deps.onCueRun).toHaveBeenCalledTimes(3);

// Resolve third -> fourth dispatched
resolvers[2](makeResult('r2'));
await vi.advanceTimersByTimeAsync(10);
expect(deps.onCueRun).toHaveBeenCalledTimes(4);

// Verify events dispatched in sequential order
expect(callOrder).toEqual([0, 1, 2, 3]);

engine.stopAll();
engine.stop();
});

it('all queued events become stale and are dropped', async () => {
let resolveRun: ((val: CueRunResult) => void) | undefined;
const deps = createMockDeps({
onCueRun: vi.fn(
() =>
new Promise<CueRunResult>((resolve) => {
resolveRun = resolve;
})
),
});

const config = createMockConfig({
settings: {
timeout_minutes: 1, // 1 minute timeout
timeout_on_fail: 'break',
max_concurrent: 1,
queue_size: 10,
},
subscriptions: [
{
name: 'timer',
event: 'time.heartbeat',
enabled: true,
prompt: 'test',
interval_minutes: 1,
},
],
});
mockLoadCueConfig.mockReturnValue(config);
const engine = new CueEngine(deps);
engine.start();

// First run occupies the slot
await vi.advanceTimersByTimeAsync(10);
expect(deps.onCueRun).toHaveBeenCalledTimes(1);

// Queue several events
vi.advanceTimersByTime(1 * 60 * 1000); // queued: 1
vi.advanceTimersByTime(1 * 60 * 1000); // queued: 2
expect(engine.getQueueStatus().get('session-1')).toBe(2);

// Advance time past timeout so all queued events become stale (>1 minute)
vi.advanceTimersByTime(3 * 60 * 1000);

// Resolve the in-flight run -> drain should evict all queued events as stale
resolveRun!({
runId: 'r1',
sessionId: 'session-1',
sessionName: 'Test Session',
subscriptionName: 'timer',
event: {} as CueEvent,
status: 'completed',
stdout: '',
stderr: '',
exitCode: 0,
durationMs: 100,
startedAt: new Date().toISOString(),
endedAt: new Date().toISOString(),
});
await vi.advanceTimersByTimeAsync(10);

// All stale events should have been dropped
expect(deps.onLog).toHaveBeenCalledWith(
'cue',
expect.stringContaining('Dropping stale queued event')
);

engine.stopAll();
engine.stop();
});

it('enable/disable toggle during queue drain does not crash', async () => {
const deps = createMockDeps({
onCueRun: vi.fn(() => new Promise<CueRunResult>(() => {})),
});

const config = createMockConfig({
settings: {
timeout_minutes: 30,
timeout_on_fail: 'break',
max_concurrent: 1,
queue_size: 10,
},
subscriptions: [
{
name: 'timer',
event: 'time.heartbeat',
enabled: true,
prompt: 'test',
interval_minutes: 1,
},
],
});
mockLoadCueConfig.mockReturnValue(config);
const engine = new CueEngine(deps);
engine.start();

await vi.advanceTimersByTimeAsync(10);

// Queue some events
vi.advanceTimersByTime(1 * 60 * 1000);
vi.advanceTimersByTime(1 * 60 * 1000);
expect(engine.getQueueStatus().get('session-1')).toBe(2);

// Stop engine mid-drain — should not crash
expect(() => {
engine.stop();
}).not.toThrow();

// Queue should be cleared
expect(engine.getQueueStatus().size).toBe(0);
});
});
});
Loading