Skip to content

Commit

Permalink
add 2 more unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
b4rtaz committed Jun 8, 2024
1 parent 253d37c commit 6cd22e1
Showing 1 changed file with 98 additions and 6 deletions.
104 changes: 98 additions & 6 deletions machine/src/activities/parallel-activity/parallel-activity.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import { createActivitySet } from '../../core';
import { createAtomActivityFromHandler } from '../atom-activity';
import { createParallelActivity } from './parallel-activity';
import { createWorkflowMachineBuilder } from '../../workflow-machine-builder';
import { branchName } from '../results';
import { branchName, interrupt } from '../results';
import { ParallelActivityHandler } from './types';

interface ParallelTestGlobalState {
logger: string;
Expand Down Expand Up @@ -78,22 +79,34 @@ function createDefinition0(activeBranchNames: string[]) {
};
}

function createTest(definition: Definition) {
function createTest(
definition: Definition,
customParallelActivityHandler?: ParallelActivityHandler<ParallelStep, ParallelTestGlobalState, unknown>
) {
const activitySet = createActivitySet<ParallelTestGlobalState>([
createAtomActivityFromHandler<LogStep, ParallelTestGlobalState>('log', async (step, globalState) => {
globalState.logger += `;${step.id};`;
const delay = Math.ceil(20 * Math.random());
await new Promise(resolve => setTimeout(resolve, delay));
const delay = Math.ceil(10 * Math.random());
await sleep(delay);
}),
createAtomActivityFromHandler<JobStep, ParallelTestGlobalState>('job', async (step, globalState) => {
globalState.logger += ';job;';
if (step.properties.job === 'fail') {
const { job } = step.properties;
if (job === 'fail') {
throw new Error('Job failed!');
}
if (job === 'interrupt') {
return interrupt();
}
if (job.startsWith('sleep:')) {
await sleep(Number(job.substring(6)));
return;
}
throw new Error('Unknown job');
}),
createParallelActivity<ParallelStep, ParallelTestGlobalState>('parallel', {
init: () => ({}),
handler: async step => step.properties.activeBranchNames.map(branchName)
handler: customParallelActivityHandler ?? (async step => step.properties.activeBranchNames.map(branchName))
})
]);

Expand Down Expand Up @@ -272,8 +285,87 @@ describe('ParallelActivity', () => {
});
interpreter.start();
});

it('interrupts the execution if a parallel activity handler returns interrupt()', done => {
const definition: Definition = {
sequence: [
createLogStep('before'),
createParallelStep('parallel', ['thread0'], {
thread0: []
}),
createLogStep('after')
],
properties: {}
};

const interpreter = createTest(definition, async (_, globalState) => {
globalState.logger += ';interrupt;';
return interrupt();
});

interpreter.onDone(() => {
const snapshot = interpreter.getSnapshot();
const logger = snapshot.globalState.logger;

expect(logger).toBe(';before;;interrupt;');
expect(snapshot.isFailed()).toEqual(false);
expect(snapshot.isInterrupted()).toEqual(true);
expect(snapshot.isFinished()).toEqual(false);

done();
});
interpreter.start();
});

// TODO: This test may be fragile on slow machines (it uses timeouts). For now I leave it as it is.
it('interrupts the execution if a step inside a parallel section returns interrupt()', done => {
const definition: Definition = {
sequence: [
createLogStep('before'),
createParallelStep('parallel', ['thread0', 'thread1'], {
thread0: [
createLogStep('thread0_0'),
createJobStep('job', 'sleep:100'),
createJobStep('job', 'interrupt'),
createLogStep('thread0_1')
],
thread1: [createLogStep('thread1_0'), createJobStep('job', 'sleep:300'), createLogStep('thread1_1')]
}),
createLogStep('after')
],
properties: {}
};

const interpreter = createTest(definition);

interpreter.onDone(() => {
const snapshot = interpreter.getSnapshot();
const logger = snapshot.globalState.logger;

expect(logger).toContain(';before;');
expect(logger).toContain(';thread0_0;');
expect(logger).toContain(';thread1_0;');
expect(logger).toContain(';job;');
expect(snapshot.isFailed()).toEqual(false);
expect(snapshot.isInterrupted()).toEqual(true);
expect(snapshot.isFinished()).toEqual(false);

setTimeout(() => {
expect(logger).not.toContain('thread0_1');
// We expect the `thread1_1` step won't be executed if the machine is interrupted.
// The second "thread" should be also interrupted.
expect(logger).not.toContain('thread1_1');
done();
}, 600);
});
interpreter.start();
});
});

function extractBetween(log: string, start: string, end: string) {
return log.split(start)[1].split(end)[0];
}

function sleep(ms: number) {
return new Promise(resolve => setTimeout(resolve, ms));
}

0 comments on commit 6cd22e1

Please sign in to comment.