diff --git a/machine/src/activities/parallel-activity/parallel-activity.spec.ts b/machine/src/activities/parallel-activity/parallel-activity.spec.ts index 0a67172..df43bf5 100644 --- a/machine/src/activities/parallel-activity/parallel-activity.spec.ts +++ b/machine/src/activities/parallel-activity/parallel-activity.spec.ts @@ -3,7 +3,8 @@ import { createActivitySet } from '../../core'; import { createAtomActivityFromHandler } from '../atom-activity'; import { createParallelActivity } from './parallel-activity'; import { createWorkflowMachineBuilder } from '../../workflow-machine-builder'; -import { branchName } from '../results'; +import { branchName, interrupt } from '../results'; +import { ParallelActivityHandler } from './types'; interface ParallelTestGlobalState { logger: string; @@ -78,22 +79,34 @@ function createDefinition0(activeBranchNames: string[]) { }; } -function createTest(definition: Definition) { +function createTest( + definition: Definition, + customParallelActivityHandler?: ParallelActivityHandler +) { const activitySet = createActivitySet([ createAtomActivityFromHandler('log', async (step, globalState) => { globalState.logger += `;${step.id};`; - const delay = Math.ceil(20 * Math.random()); - await new Promise(resolve => setTimeout(resolve, delay)); + const delay = Math.ceil(10 * Math.random()); + await sleep(delay); }), createAtomActivityFromHandler('job', async (step, globalState) => { globalState.logger += ';job;'; - if (step.properties.job === 'fail') { + const { job } = step.properties; + if (job === 'fail') { throw new Error('Job failed!'); } + if (job === 'interrupt') { + return interrupt(); + } + if (job.startsWith('sleep:')) { + await sleep(Number(job.substring(6))); + return; + } + throw new Error('Unknown job'); }), createParallelActivity('parallel', { init: () => ({}), - handler: async step => step.properties.activeBranchNames.map(branchName) + handler: customParallelActivityHandler ?? (async step => step.properties.activeBranchNames.map(branchName)) }) ]); @@ -272,8 +285,87 @@ describe('ParallelActivity', () => { }); interpreter.start(); }); + + it('interrupts the execution if a parallel activity handler returns interrupt()', done => { + const definition: Definition = { + sequence: [ + createLogStep('before'), + createParallelStep('parallel', ['thread0'], { + thread0: [] + }), + createLogStep('after') + ], + properties: {} + }; + + const interpreter = createTest(definition, async (_, globalState) => { + globalState.logger += ';interrupt;'; + return interrupt(); + }); + + interpreter.onDone(() => { + const snapshot = interpreter.getSnapshot(); + const logger = snapshot.globalState.logger; + + expect(logger).toBe(';before;;interrupt;'); + expect(snapshot.isFailed()).toEqual(false); + expect(snapshot.isInterrupted()).toEqual(true); + expect(snapshot.isFinished()).toEqual(false); + + done(); + }); + interpreter.start(); + }); + + // TODO: This test may be fragile on slow machines (it uses timeouts). For now I leave it as it is. + it('interrupts the execution if a step inside a parallel section returns interrupt()', done => { + const definition: Definition = { + sequence: [ + createLogStep('before'), + createParallelStep('parallel', ['thread0', 'thread1'], { + thread0: [ + createLogStep('thread0_0'), + createJobStep('job', 'sleep:100'), + createJobStep('job', 'interrupt'), + createLogStep('thread0_1') + ], + thread1: [createLogStep('thread1_0'), createJobStep('job', 'sleep:300'), createLogStep('thread1_1')] + }), + createLogStep('after') + ], + properties: {} + }; + + const interpreter = createTest(definition); + + interpreter.onDone(() => { + const snapshot = interpreter.getSnapshot(); + const logger = snapshot.globalState.logger; + + expect(logger).toContain(';before;'); + expect(logger).toContain(';thread0_0;'); + expect(logger).toContain(';thread1_0;'); + expect(logger).toContain(';job;'); + expect(snapshot.isFailed()).toEqual(false); + expect(snapshot.isInterrupted()).toEqual(true); + expect(snapshot.isFinished()).toEqual(false); + + setTimeout(() => { + expect(logger).not.toContain('thread0_1'); + // We expect the `thread1_1` step won't be executed if the machine is interrupted. + // The second "thread" should be also interrupted. + expect(logger).not.toContain('thread1_1'); + done(); + }, 600); + }); + interpreter.start(); + }); }); function extractBetween(log: string, start: string, end: string) { return log.split(start)[1].split(end)[0]; } + +function sleep(ms: number) { + return new Promise(resolve => setTimeout(resolve, ms)); +}