Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Maestro
TEMP-REFACTORING-PLAN.md
TEMP-REFACTORING-PLAN-2.md
CUE-NOTES.md
CUE-REFACTORING.md
Auto\ Run\ Docs/
Work\ Trees/
community-data/
Expand Down
199 changes: 199 additions & 0 deletions src/__tests__/main/cue/cue-concurrency.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -633,4 +633,203 @@ describe('CueEngine Concurrency Control', () => {
engine.stop();
});
});

describe('stress scenarios', () => {
it('processes multiple queued events in FIFO order', async () => {
const resolvers: ((val: CueRunResult) => void)[] = [];
const callOrder: number[] = [];
let callCount = 0;

const deps = createMockDeps({
onCueRun: vi.fn(() => {
const idx = callCount++;
callOrder.push(idx);
return new Promise<CueRunResult>((resolve) => {
resolvers.push(resolve);
});
}),
});

const config = createMockConfig({
settings: {
timeout_minutes: 30,
timeout_on_fail: 'break',
max_concurrent: 1,
queue_size: 10,
},
subscriptions: [
{
name: 'timer',
event: 'time.heartbeat',
enabled: true,
prompt: 'test',
interval_minutes: 1,
},
],
});
mockLoadCueConfig.mockReturnValue(config);
const engine = new CueEngine(deps);
engine.start();

// Initial heartbeat occupies the slot
await vi.advanceTimersByTimeAsync(10);
expect(deps.onCueRun).toHaveBeenCalledTimes(1);

// Queue several events via timer advances
vi.advanceTimersByTime(1 * 60 * 1000); // queued: 1
vi.advanceTimersByTime(1 * 60 * 1000); // queued: 2
vi.advanceTimersByTime(1 * 60 * 1000); // queued: 3
expect(engine.getQueueStatus().get('session-1')).toBe(3);
expect(deps.onCueRun).toHaveBeenCalledTimes(1); // still only 1 dispatched

// Resolve the first run -> drain dispatches next queued event
const makeResult = (runId: string): CueRunResult => ({
runId,
sessionId: 'session-1',
sessionName: 'Test Session',
subscriptionName: 'timer',
event: {} as CueEvent,
status: 'completed',
stdout: '',
stderr: '',
exitCode: 0,
durationMs: 100,
startedAt: new Date().toISOString(),
endedAt: new Date().toISOString(),
});

resolvers[0](makeResult('r0'));
await vi.advanceTimersByTimeAsync(10);
expect(deps.onCueRun).toHaveBeenCalledTimes(2);

// Resolve second -> third dispatched
resolvers[1](makeResult('r1'));
await vi.advanceTimersByTimeAsync(10);
expect(deps.onCueRun).toHaveBeenCalledTimes(3);

// Resolve third -> fourth dispatched
resolvers[2](makeResult('r2'));
await vi.advanceTimersByTimeAsync(10);
expect(deps.onCueRun).toHaveBeenCalledTimes(4);

// Verify events dispatched in sequential order
expect(callOrder).toEqual([0, 1, 2, 3]);

engine.stopAll();
engine.stop();
});

it('all queued events become stale and are dropped', async () => {
let resolveRun: ((val: CueRunResult) => void) | undefined;
const deps = createMockDeps({
onCueRun: vi.fn(
() =>
new Promise<CueRunResult>((resolve) => {
resolveRun = resolve;
})
),
});

const config = createMockConfig({
settings: {
timeout_minutes: 1, // 1 minute timeout
timeout_on_fail: 'break',
max_concurrent: 1,
queue_size: 10,
},
subscriptions: [
{
name: 'timer',
event: 'time.heartbeat',
enabled: true,
prompt: 'test',
interval_minutes: 1,
},
],
});
mockLoadCueConfig.mockReturnValue(config);
const engine = new CueEngine(deps);
engine.start();

// First run occupies the slot
await vi.advanceTimersByTimeAsync(10);
expect(deps.onCueRun).toHaveBeenCalledTimes(1);

// Queue several events
vi.advanceTimersByTime(1 * 60 * 1000); // queued: 1
vi.advanceTimersByTime(1 * 60 * 1000); // queued: 2
expect(engine.getQueueStatus().get('session-1')).toBe(2);

// Advance time past timeout so all queued events become stale (>1 minute)
vi.advanceTimersByTime(3 * 60 * 1000);

// Resolve the in-flight run -> drain should evict all queued events as stale
resolveRun!({
runId: 'r1',
sessionId: 'session-1',
sessionName: 'Test Session',
subscriptionName: 'timer',
event: {} as CueEvent,
status: 'completed',
stdout: '',
stderr: '',
exitCode: 0,
durationMs: 100,
startedAt: new Date().toISOString(),
endedAt: new Date().toISOString(),
});
await vi.advanceTimersByTimeAsync(10);

// All stale events should have been dropped
expect(deps.onLog).toHaveBeenCalledWith(
'cue',
expect.stringContaining('Dropping stale queued event')
);

engine.stopAll();
engine.stop();
});

it('enable/disable toggle during queue drain does not crash', async () => {
const deps = createMockDeps({
onCueRun: vi.fn(() => new Promise<CueRunResult>(() => {})),
});

const config = createMockConfig({
settings: {
timeout_minutes: 30,
timeout_on_fail: 'break',
max_concurrent: 1,
queue_size: 10,
},
subscriptions: [
{
name: 'timer',
event: 'time.heartbeat',
enabled: true,
prompt: 'test',
interval_minutes: 1,
},
],
});
mockLoadCueConfig.mockReturnValue(config);
const engine = new CueEngine(deps);
engine.start();

await vi.advanceTimersByTimeAsync(10);

// Queue some events
vi.advanceTimersByTime(1 * 60 * 1000);
vi.advanceTimersByTime(1 * 60 * 1000);
expect(engine.getQueueStatus().get('session-1')).toBe(2);

// Stop engine mid-drain — should not crash
expect(() => {
engine.stop();
}).not.toThrow();

// Queue should be cleared
expect(engine.getQueueStatus().size).toBe(0);
});
});
});
Loading