diff --git a/packages/radiaction/mocks/no-kafka.js b/packages/radiaction/mocks/no-kafka.js index 8731810..58a28e5 100644 --- a/packages/radiaction/mocks/no-kafka.js +++ b/packages/radiaction/mocks/no-kafka.js @@ -1,6 +1,24 @@ const _ = require('lodash') const sinon = require('sinon') const mock = require('mock-require') +const NoKafka = require('no-kafka') + +const STORE = {} +const SUBSCRIBERS = {} + +function writeMessage({ topic, partition, message }) { + const identifier = `${topic}:${partition}` + const store = (STORE[identifier] = STORE[identifier] || []) + const offset = store.length + + store[offset] = { message: _.mapValues(message, String), offset } + + for (const subscriber of SUBSCRIBERS[identifier] || []) { + subscriber([store[offset]], topic, partition) + } + + return offset +} class Producer { close() {} @@ -16,14 +34,47 @@ Producer.prototype.send = sinon .named('send') .callsFake(messages => Promise.resolve( - (Array.isArray(messages) ? messages : [messages]).map(x => ({ - topic: x.topic, - error: null, - partition: parseInt(_.uniqueId()), - offset: parseInt(_.uniqueId()), - })) + (Array.isArray(messages) ? messages : [messages]).map(x => { + const { topic, message } = x + const partition = 0 + const offset = writeMessage({ + topic, + message, + partition, + }) + + return { + error: null, + topic, + partition, + offset, + } + }) ) ) -mock('no-kafka', { Producer }) +class SimpleConsumer { + close() {} +} + +SimpleConsumer.prototype.init = sinon + .stub() + .named('init') + .resolves('hello!') + +SimpleConsumer.prototype.subscribe = sinon + .stub() + .named('subscribe') + .callsFake((topic, partition, options, dataHandler) => + Promise.resolve().then(() => { + const identifier = `${topic}:${partition}` + const messages = STORE[identifier] || [] + + messages.length && dataHandler(messages, topic, partition) + SUBSCRIBERS[identifier] = SUBSCRIBERS[identifier] || [] + SUBSCRIBERS[identifier].push(dataHandler) + }) + ) + +mock('no-kafka', { ...NoKafka, Producer, SimpleConsumer }) module.exports = mock diff --git a/packages/radiaction/package.json b/packages/radiaction/package.json index 26e13fb..589605a 100644 --- a/packages/radiaction/package.json +++ b/packages/radiaction/package.json @@ -33,7 +33,7 @@ "change-case": "~3.0.2", "kafka-node": "~2.4.1", "lodash": "~4.17.5", - "no-kafka": "~3.2.8" + "no-kafka": "~3.2.9" }, "devDependencies": { "@babel/cli": "7.0.0-beta.43", @@ -43,7 +43,7 @@ "@babel/preset-env": "7.0.0-beta.43", "@babel/register": "7.0.0-beta.43", "ava": "~1.0.0-beta.3", - "mock-require": "~3.0.1", + "mock-require": "~3.0.2", "sinon": "~4.4.9" }, "ava": { diff --git a/packages/radiaction/src/createTopics.js b/packages/radiaction/src/createTopics.js index 26b9883..fed2c02 100644 --- a/packages/radiaction/src/createTopics.js +++ b/packages/radiaction/src/createTopics.js @@ -1,18 +1,21 @@ const _ = require('lodash') const chalk = require('chalk') const { Client, Producer } = require('kafka-node') +const actionName = require('./helpers/actionName') const { RESULT_SUFFIX } = require('./config') export default actions => new Promise((resolve, reject) => { - let topics = Object.values(actions).map(action => action.toString()) + let topics = Object.values(actions).map(actionName) if (!topics || !topics.length) { throw new Error('no topics found to be created') } - const waiters = Object.values(actions).filter(action => action.__radiaction.wait) + const waiters = Object.values(actions).filter( + action => action.__radiaction && action.__radiaction.wait + ) if (waiters.length) { - topics = topics.concat(waiters.map(action => `${action.name}${RESULT_SUFFIX}`)) + topics = topics.concat(waiters.map(action => `${actionName(action)}${RESULT_SUFFIX}`)) } console.log(`Creating topics with the following names:`) diff --git a/packages/radiaction/src/emitter.js b/packages/radiaction/src/emitter.js index 82ed44c..591afe0 100644 --- a/packages/radiaction/src/emitter.js +++ b/packages/radiaction/src/emitter.js @@ -1,6 +1,6 @@ const _ = require('lodash') const { Producer } = require('no-kafka') -const { keepName } = require('./helpers') +const { actionName, keepName } = require('./helpers') const stopAll = [] @@ -8,16 +8,45 @@ process.on('exit', () => { stopAll.forEach(stop => stop()) }) +function forceStructure(output) { + if (!_.isPlainObject(output)) { + return { value: output } + } + + const keys = new Set(Object.keys(output)) + + if (!keys.has('value')) { + throw new Error(`plain objects sent to a emitter need to contain the 'value' field`) + } + + keys.delete('value') + keys.delete('key') + + if (keys.size > 0) { + throw new Error(`plain objects sent to a emitter can only contain fields 'value' and 'key'`) + } + + return output +} + function wrap(action) { const producer = new Producer() stopAll.push(producer.close) producer.init() const newAction = (...args) => - producer.send({ - topic: action.name, - message: action.apply(action, args), - }) + producer + .send({ + topic: actionName(action), + message: forceStructure(action.apply(action, args)), + }) + .then(results => { + if (!results || results.length !== 1) { + throw new Error(`Unexpected data received from message broker: ${results}`) + } + + return results[0] + }) newAction.__radiaction = { ...action.__radiaction, diff --git a/packages/radiaction/src/emitter.test.js b/packages/radiaction/src/emitter.test.js index deab9e5..d56b16f 100644 --- a/packages/radiaction/src/emitter.test.js +++ b/packages/radiaction/src/emitter.test.js @@ -1,6 +1,6 @@ import test from 'ava' import sinon from 'sinon' -import noKafkaMock from '../mocks/no-kafka' +import NoKafkaMock from '../mocks/no-kafka' import emitter from './emitter' test('flag is ON', t => { @@ -66,44 +66,56 @@ test('arguments are passed down', async t => { t.true(actions.d.calledOnceWithExactly('d0', 'd1', 'd3'), actions.d.args.toString()) }) -test('message broker is called', async t => { +test('message broker is called with compatible descriptor', async t => { const processed = emitter({ - a: () => 1, - b: () => 2, - c: () => 3, - d: () => 4, + a: () => 'A', + b: () => ({ b: 'B' }), + c: () => ({ foo: 'bar', value: 'C' }), + d: () => ({ value: 'D' }), + e: () => ({ key: 'e', value: 'E' }), }) + await t.throws(processed.b, `plain objects sent to a emitter need to contain the 'value' field`) + await t.throws( + processed.c, + `plain objects sent to a emitter can only contain fields 'value' and 'key'` + ) + const output = { a: await processed.a(), - b: await processed.b(), - c: await processed.c(), d: await processed.d(), + e: await processed.e(), } - t.is(output.a[0].partition.constructor, Number) - t.is(output.a[0].offset.constructor, Number) - t.is(output.b[0].partition.constructor, Number) - t.is(output.b[0].offset.constructor, Number) - t.is(output.c[0].partition.constructor, Number) - t.is(output.c[0].offset.constructor, Number) - t.is(output.d[0].partition.constructor, Number) - t.is(output.d[0].offset.constructor, Number) - - t.deepEqual(output.a, [ - { topic: 'a', error: null, partition: output.a[0].partition, offset: output.a[0].offset }, - ]) - t.deepEqual(output.b, [ - { topic: 'b', error: null, partition: output.b[0].partition, offset: output.b[0].offset }, - ]) - t.deepEqual(output.c, [ - { topic: 'c', error: null, partition: output.c[0].partition, offset: output.c[0].offset }, - ]) - t.deepEqual(output.d, [ - { topic: 'd', error: null, partition: output.d[0].partition, offset: output.d[0].offset }, - ]) + t.is(output.a.partition.constructor, Number) + t.is(output.a.offset.constructor, Number) + t.is(output.d.partition.constructor, Number) + t.is(output.d.offset.constructor, Number) + t.is(output.e.partition.constructor, Number) + t.is(output.e.offset.constructor, Number) + + t.deepEqual(output.a, { + topic: 'a', + error: null, + partition: output.a.partition, + offset: output.a.offset, + }) + + t.deepEqual(output.d, { + topic: 'd', + error: null, + partition: output.d.partition, + offset: output.d.offset, + }) + + t.deepEqual(output.e, { + topic: 'e', + error: null, + partition: output.e.partition, + offset: output.e.offset, + }) }) test.after.always(() => { - noKafkaMock.stopAll() + NoKafkaMock.stopAll() }) diff --git a/packages/radiaction/src/helpers/actionName.js b/packages/radiaction/src/helpers/actionName.js new file mode 100644 index 0000000..9f24da9 --- /dev/null +++ b/packages/radiaction/src/helpers/actionName.js @@ -0,0 +1,3 @@ +const _ = require('lodash') + +module.exports = action => _.get(action, '__radiaction.topic', action.name) diff --git a/packages/radiaction/src/helpers/index.js b/packages/radiaction/src/helpers/index.js index c75247e..4a2c5f3 100644 --- a/packages/radiaction/src/helpers/index.js +++ b/packages/radiaction/src/helpers/index.js @@ -1 +1,2 @@ +export { default as actionName } from './actionName' export { default as keepName } from './keepName' diff --git a/packages/radiaction/src/helpers/keepName.test.js b/packages/radiaction/src/helpers/keepName.test.js new file mode 100644 index 0000000..4e7f527 --- /dev/null +++ b/packages/radiaction/src/helpers/keepName.test.js @@ -0,0 +1,23 @@ +// it is not running +// https://github.com/Quadric/radiaction/issues/5 + +import test from 'ava' +import keepName from './keepName' + +test('accept arrow functions', t => { + function myFunction() {} + const newFn = () => {} + + const processedFn = keepName(newFn, myFunction) + + t.is(processedFn.name, 'myFunction') +}) + +test('accept anonymous fns', async t => { + const myFunction = () => {} + const newFn = () => {} + + const processedFn = keepName(newFn, myFunction) + + t.is(processedFn.name, 'myFunction') +}) diff --git a/packages/radiaction/src/runner.js b/packages/radiaction/src/runner.js index c9c09a2..ec4eb26 100644 --- a/packages/radiaction/src/runner.js +++ b/packages/radiaction/src/runner.js @@ -1,52 +1,61 @@ +const _ = require('lodash') +const { keepName, actionName } = require('./helpers') const { Producer, SimpleConsumer, LATEST_OFFSET } = require('no-kafka') const { RESULT_SUFFIX, IDLE_TIMEOUT } = require('./config') -export default reactions => { +const stopAll = [] + +process.on('exit', () => { + stopAll.forEach(stop => stop()) +}) + +const setup = async (reaction, actionName) => { const producer = new Producer() - producer.init() + const consumer = new SimpleConsumer({ idleTimeout: IDLE_TIMEOUT }) - // close connection when process is killed. - process.on('exit', () => { - producer.close() - }) - - const consumers = Object.keys(reactions).map(key => { - const consumer = new SimpleConsumer({ idleTimeout: IDLE_TIMEOUT }) - - // close connection when process is killed. - process.on('exit', () => { - consumer.close() - }) - - return consumer.init().then(() => { - return consumer.subscribe(key, 0, { time: LATEST_OFFSET }, (messageSet, topic, partition) => { - messageSet.forEach(async m => { - const reaction = reactions[key] - - const result = await reaction( - m.message.value.toString('utf8'), - m.message.key && m.message.key.toString('utf8') - ) - - producer - .send({ - topic: `${key}${RESULT_SUFFIX}`, - partition, - message: { - value: result || '', - key: m.offset, - }, - }) - // .then(results => - // results.filter(result => result.error).map(result => throwError(result.error)) - // ) - .catch(e => { - throw new Error(e) - }) - }) + stopAll.push(producer.close) + stopAll.push(consumer.close) + + producer.init() + await consumer.init() + return await consumer.subscribe( + actionName, + 0, + { time: LATEST_OFFSET }, + (messageSet, topic, partition) => { + messageSet.forEach(async m => { + const { value, key } = m.message + const result = await reaction(value && value.toString('utf8'), key && key.toString('utf8')) + + producer + .send({ + topic: `${actionName}${RESULT_SUFFIX}`, + partition, + message: { + value: result || '', + key: m.offset, + }, + }) + // .then(results => + // results.filter(result => result.error).map(result => throwError(result.error)) + // ) + .catch(e => { + throw new Error(e) + }) }) - }) - }) + } + ) +} + +function wrap(reaction, key) { + const newReaction = () => setup(reaction, key) - return Promise.all(consumers) + newReaction.__radiaction = { + ...reaction.__radiaction, + run: true, + } + + return keepName(newReaction, reaction) } + +module.exports = descriptor => _.mapValues(descriptor, wrap) diff --git a/packages/radiaction/src/runner.test.js b/packages/radiaction/src/runner.test.js new file mode 100644 index 0000000..e02bef3 --- /dev/null +++ b/packages/radiaction/src/runner.test.js @@ -0,0 +1,188 @@ +import _ from 'lodash' +import test from 'ava' +import sinon from 'sinon' +import NoKafkaMock from '../mocks/no-kafka' +import { Producer, SimpleConsumer } from 'no-kafka' +import runner from './runner' + +const DeferredPromise = () => { + let resolver + + const q = new Promise(resolve => { + resolver = resolve + }) + + q.resolve = resolver + return q +} + +const fetchTopic = topic => + new Promise((resolve, reject) => { + const consumer = new SimpleConsumer() + + consumer.init().then(() => + consumer.subscribe(topic, 0, {}, (messageSet, topic, partition) => { + consumer.close() + + resolve( + messageSet.map(m => { + _.mapValues(m.message, x => x.toString('utf8')) + + return { + ...m.message, + offset: m.offset, + } + }) + ) + }) + ) + }) + +const emit = (topic, message = {}) => new Producer().send({ topic, message }) + +test('flag is ON', t => { + const processed = runner({ + adventurous: () => 'A', + beautiful: () => ({ b: 'B' }), + charming: () => ({ value: 'C' }), + delightful: () => ({ key: 'd', value: 'D' }), + }) + + t.is(processed.adventurous.__radiaction.run, true) + t.is(processed.beautiful.__radiaction.run, true) + t.is(processed.charming.__radiaction.run, true) + t.is(processed.delightful.__radiaction.run, true) +}) + +test('previous metadata is not lost', t => { + const reaction = () => 'A' + reaction.__radiaction = { + x: -1, + y: -2, + z: -3, + } + + const processed = runner({ + amazing: reaction, + }) + + t.deepEqual(processed.amazing.__radiaction, { run: true, z: -3, y: -2, x: -1 }) +}) + +test(`names don't change`, t => { + const processed = runner({ + amusing: () => 'A', + brave: () => ({ b: 'B' }), + calm: () => ({ value: 'C' }), + distinguished: () => ({ key: 'd', value: 'D' }), + }) + + t.is(processed.amusing.name, 'amusing') + t.is(processed.brave.name, 'brave') + t.is(processed.calm.name, 'calm') + t.is(processed.distinguished.name, 'distinguished') +}) + +test('arguments are NOT passed down', async t => { + const ACTION = `Let's pretend I am the output of an action ;)` + const q = DeferredPromise() + + const myUniqueReaction = (value, key) => q.resolve({ value, key }) + const runners = runner({ myUniqueReaction }) + await runners.myUniqueReaction('a0', 'a1', 'a3') + + await emit('myUniqueReaction', { key: 10, value: ACTION }) + t.deepEqual(await q, { value: ACTION, key: '10' }) +}) + +test('reactions are executed by the runner', async t => { + const promises = { + awesome: DeferredPromise(), + bravery: DeferredPromise(), + comfort: DeferredPromise(), + } + + const runners = runner({ + awesome: (value, key) => promises.awesome.resolve({ value, key }), + bravery: (value, key) => promises.bravery.resolve({ value, key }), + comfort: (value, key) => promises.comfort.resolve({ value, key }), + }) + + await runners.awesome() + await emit('awesome', { key: 'something', value: 'action of a' }) + t.deepEqual(await promises.awesome, { value: 'action of a', key: 'something' }) + + await runners.bravery() + await emit('bravery', { key: 1 }) + t.deepEqual(await promises.bravery, { key: '1', value: undefined }) + + await runners.comfort() + await emit('comfort', { value: 'action of c' }) + t.deepEqual(await promises.comfort, { value: 'action of c', key: undefined }) +}) + +test('reactions send output back to the message queue', async t => { + const GARBAGE = 'garbage - not generated by a reaction' + + const runners = runner({ + authentic: x => `reaction a: ${Math.pow(x * 10, 1)}`, + brilliant: x => `reaction b: ${Math.pow(x * 10, 2)}`, + }) + + for (const actionName of Object.keys(runners)) { + await runners[actionName]() + + await emit(`${actionName}.output`, { value: GARBAGE }) + await emit(actionName, { value: 3 }) + await emit(actionName, { value: 2 }) + await emit(actionName, { value: 1 }) + } + + t.deepEqual(await fetchTopic('authentic.output'), [ + { + offset: 0, + value: 'garbage - not generated by a reaction', + }, + { + offset: 1, + value: 'reaction a: 30', + key: '0', + }, + { + offset: 2, + value: 'reaction a: 20', + key: '1', + }, + { + offset: 3, + value: 'reaction a: 10', + key: '2', + }, + ]) + + t.deepEqual(await fetchTopic('brilliant.output'), [ + { + offset: 0, + value: 'garbage - not generated by a reaction', + }, + { + offset: 1, + value: 'reaction b: 900', + key: '0', + }, + { + offset: 2, + value: 'reaction b: 400', + key: '1', + }, + { + offset: 3, + value: 'reaction b: 100', + key: '2', + }, + ]) +}) + +test.after.always(() => { + NoKafkaMock.stopAll() +}) diff --git a/packages/radiaction/src/waiter.js b/packages/radiaction/src/waiter.js index c0fe25f..53ab2cd 100644 --- a/packages/radiaction/src/waiter.js +++ b/packages/radiaction/src/waiter.js @@ -1,25 +1,76 @@ const _ = require('lodash') const { SimpleConsumer, LATEST_OFFSET } = require('no-kafka') -const { keepName } = require('./helpers') +const { keepName, actionName } = require('./helpers') const { IDLE_TIMEOUT, RESULT_SUFFIX, WAITER_TIMEOUT } = require('./config') +const stopAll = [] const records = {} const listeners = {} let initiated = {} -async function setup(key) { +process.on('exit', () => { + stopAll.forEach(stop => stop()) +}) + +function validateInput(input) { + if (!_.isPlainObject(input)) { + throw new Error( + `waiters expect to receive an object containing 'topic', 'partition' and 'offset' fields` + ) + } + + if (input.error) { + throw new Error(input.error) + } + + if (!_.has(input, 'topic') || input.topic.constructor !== String) { + throw new Error(`waiters expect to receive an object containing a valid 'topic' field`) + } + + if (!_.has(input, 'partition') || input.partition.constructor !== Number) { + throw new Error(`waiters expect to receive an object containing a valid 'partition' field`) + } + + if (!_.has(input, 'offset') || input.offset.constructor !== Number) { + throw new Error(`waiters expect to receive an object containing a valid 'offset' field`) + } + + return input +} + +function validateOutput(output) { + const keys = new Set(Object.keys(output)) + + if (!keys.has('key')) { + throw new Error(`waiters require output objects to contain the 'key' field`) + } + + if (!keys.has('value')) { + throw new Error(`waiters require output objects to contain the 'value' field`) + } + + keys.delete('value') + keys.delete('key') + + if (keys.size > 0) { + throw new Error(`output objects returned to a waiter can only contain fields 'value' and 'key'`) + } + + return output +} + +async function setup(action) { + const key = actionName(action) + if (initiated[key]) { return true } initiated[key] = true const consumer = new SimpleConsumer({ idleTimeout: IDLE_TIMEOUT }) + stopAll.push(consumer.close) await consumer.init() - process.on('exit', () => { - consumer.close() - }) - await consumer .subscribe( `${key}${RESULT_SUFFIX}`, @@ -27,9 +78,7 @@ async function setup(key) { { time: LATEST_OFFSET }, (messageSet, topic, partition) => { messageSet.forEach(({ message }) => { - if (!message.key) { - throw new Error(`waiters can't handle falsy keys`) - } + validateOutput(message) const identifier = `${topic}:${partition}:${message.key.toString('utf8')}` const value = message.value && message.value.toString('utf8') @@ -51,7 +100,7 @@ async function setup(key) { function spread({ topic, partition, offset }) { const identifier = `${topic}${RESULT_SUFFIX}:${partition}:${offset}` - if (records[identifier]) { + if (_.has(records, identifier)) { const result = records[identifier] delete records[identifier] @@ -74,10 +123,11 @@ function spread({ topic, partition, offset }) { function wrap(action) { const newAction = async (...args) => { - await setup(action.name) - const output = await action.apply(action, args) + await setup(action) + const input = await action.apply(action, args) + validateInput(input) - return await spread(output[0]) + return await spread(input) } newAction.__radiaction = { diff --git a/packages/radiaction/src/waiter.test.js b/packages/radiaction/src/waiter.test.js new file mode 100644 index 0000000..00936b9 --- /dev/null +++ b/packages/radiaction/src/waiter.test.js @@ -0,0 +1,138 @@ +import test from 'ava' +import sinon from 'sinon' +import NoKafkaMock from '../mocks/no-kafka' +import { Producer } from 'no-kafka' +import waiter from './waiter' + +const emit = (topic, message = {}) => new Producer().send({ topic, message }) + +test('flag is ON', t => { + const processed = waiter({ + adventurous: () => 'A', + beautiful: () => ({ b: 'B' }), + charming: () => ({ value: 'C' }), + delightful: () => ({ key: 'd', value: 'D' }), + }) + + t.is(processed.adventurous.__radiaction.wait, true) + t.is(processed.beautiful.__radiaction.wait, true) + t.is(processed.charming.__radiaction.wait, true) + t.is(processed.delightful.__radiaction.wait, true) +}) + +test('previous metadata is not lost', t => { + const action = () => 'A' + action.__radiaction = { + x: -1, + y: -2, + z: -3, + } + + const processed = waiter({ + amazing: action, + }) + + t.deepEqual(processed.amazing.__radiaction, { wait: true, z: -3, y: -2, x: -1 }) +}) + +test(`names don't change`, t => { + const processed = waiter({ + amusing: () => 'A', + brave: () => ({ b: 'B' }), + calm: () => ({ value: 'C' }), + distinguished: () => ({ key: 'd', value: 'D' }), + }) + + t.is(processed.amusing.name, 'amusing') + t.is(processed.brave.name, 'brave') + t.is(processed.calm.name, 'calm') + t.is(processed.distinguished.name, 'distinguished') +}) + +test('arguments are passed down', async t => { + const REACTION = `Let's pretend I am the output of a reaction ;)` + + const myUniqueAction = sinon.stub().returns({ topic: 'myUniqueAction', partition: 0, offset: 0 }) + Object.defineProperty(myUniqueAction, 'name', { value: 'myUniqueAction' }) + + const processed = waiter({ myUniqueAction }) + const reaction = processed.myUniqueAction('a0', 'a1', 'a3') + await emit('myUniqueAction.output', { key: 0, value: REACTION }) + const result = await reaction + + t.true(myUniqueAction.calledOnceWithExactly('a0', 'a1', 'a3'), myUniqueAction.args.toString()) + t.is(result, REACTION) +}) + +test('forces actions to return compatible descriptor', async t => { + const processed = waiter({ + authentic: () => 'A', + brilliant: () => ({ error: null, partition: 1, offset: 10 }), + creative: () => ({ topic: 'creative', error: null, offset: 20 }), + delicious: () => ({ topic: 'delicious', error: null, partition: 3 }), + effective: () => ({ topic: 'effective', error: new Error('E'), partition: 4, offset: 40 }), + fantastic: () => ({ topic: 'fantastic', error: null, partition: 0, offset: 0 }), + }) + + await t.throws( + processed.authentic, + `waiters expect to receive an object containing 'topic', 'partition' and 'offset' fields` + ) + await t.throws( + processed.brilliant, + `waiters expect to receive an object containing a valid 'topic' field` + ) + await t.throws( + processed.creative, + `waiters expect to receive an object containing a valid 'partition' field` + ) + await t.throws( + processed.delicious, + `waiters expect to receive an object containing a valid 'offset' field` + ) + await t.throws(processed.effective, `Error: E`) + await emit('fantastic.output', { key: 0, value: '' }) // emits whatever just to avoid 'f' from timing out + await t.notThrows(processed.fantastic) +}) + +test('reactions are returned by the waiter', async t => { + const processed = waiter({ + awesome: () => ({ topic: 'awesome', partition: 0, offset: 0 }), + bravery: () => ({ topic: 'bravery', partition: 0, offset: 0 }), + comfort: () => ({ topic: 'comfort', partition: 0, offset: 0 }), + dignity: () => ({ topic: 'dignity', partition: 0, offset: 0 }), + empathy: () => ({ topic: 'empathy', partition: 0, offset: 0 }), + festive: () => ({ topic: 'festive', partition: 0, offset: 0 }), + }) + + const a = processed.awesome() + await emit('awesome.output', { key: 0, value: 'reaction of a' }) + t.is(await a, 'reaction of a') + + await t.throws(async () => { + await emit('bravery.output', { key: 0 }) + await processed.bravery() + }, `waiters require output objects to contain the 'value' field`) + + await t.throws(async () => { + await emit('comfort.output', { value: 'reaction of c' }) + await processed.comfort() + }, `waiters require output objects to contain the 'key' field`) + + await t.throws(async () => { + await emit('dignity.output', { key: 0, value: 'reaction of d', d: 'dignity' }) + await processed.dignity() + }, `output objects returned to a waiter can only contain fields 'value' and 'key'`) + + const e = processed.empathy() + await emit('empathy.output', { key: 0, value: 'reaction of e' }) + t.is(await e, 'reaction of e') + + const f = processed.festive() + await emit('festive.output', { key: 0, value: 'reaction of f' }) + t.is(await f, 'reaction of f') +}) + +test.after.always(() => { + NoKafkaMock.stopAll() +})