Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 58 additions & 7 deletions packages/radiaction/mocks/no-kafka.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
const _ = require('lodash')
const sinon = require('sinon')
const mock = require('mock-require')
const NoKafka = require('no-kafka')

const STORE = {}
const SUBSCRIBERS = {}

function writeMessage({ topic, partition, message }) {
const identifier = `${topic}:${partition}`
const store = (STORE[identifier] = STORE[identifier] || [])
const offset = store.length

store[offset] = { message: _.mapValues(message, String), offset }

for (const subscriber of SUBSCRIBERS[identifier] || []) {
subscriber([store[offset]], topic, partition)
}

return offset
}

class Producer {
close() {}
Expand All @@ -16,14 +34,47 @@ Producer.prototype.send = sinon
.named('send')
.callsFake(messages =>
Promise.resolve(
(Array.isArray(messages) ? messages : [messages]).map(x => ({
topic: x.topic,
error: null,
partition: parseInt(_.uniqueId()),
offset: parseInt(_.uniqueId()),
}))
(Array.isArray(messages) ? messages : [messages]).map(x => {
const { topic, message } = x
const partition = 0
const offset = writeMessage({
topic,
message,
partition,
})

return {
error: null,
topic,
partition,
offset,
}
})
)
)

mock('no-kafka', { Producer })
class SimpleConsumer {
close() {}
}

SimpleConsumer.prototype.init = sinon
.stub()
.named('init')
.resolves('hello!')

SimpleConsumer.prototype.subscribe = sinon
.stub()
.named('subscribe')
.callsFake((topic, partition, options, dataHandler) =>
Promise.resolve().then(() => {
const identifier = `${topic}:${partition}`
const messages = STORE[identifier] || []

messages.length && dataHandler(messages, topic, partition)
SUBSCRIBERS[identifier] = SUBSCRIBERS[identifier] || []
SUBSCRIBERS[identifier].push(dataHandler)
})
)

mock('no-kafka', { ...NoKafka, Producer, SimpleConsumer })
module.exports = mock
4 changes: 2 additions & 2 deletions packages/radiaction/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"change-case": "~3.0.2",
"kafka-node": "~2.4.1",
"lodash": "~4.17.5",
"no-kafka": "~3.2.8"
"no-kafka": "~3.2.9"
},
"devDependencies": {
"@babel/cli": "7.0.0-beta.43",
Expand All @@ -43,7 +43,7 @@
"@babel/preset-env": "7.0.0-beta.43",
"@babel/register": "7.0.0-beta.43",
"ava": "~1.0.0-beta.3",
"mock-require": "~3.0.1",
"mock-require": "~3.0.2",
"sinon": "~4.4.9"
},
"ava": {
Expand Down
9 changes: 6 additions & 3 deletions packages/radiaction/src/createTopics.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
const _ = require('lodash')
const chalk = require('chalk')
const { Client, Producer } = require('kafka-node')
const actionName = require('./helpers/actionName')
const { RESULT_SUFFIX } = require('./config')

export default actions =>
new Promise((resolve, reject) => {
let topics = Object.values(actions).map(action => action.toString())
let topics = Object.values(actions).map(actionName)
if (!topics || !topics.length) {
throw new Error('no topics found to be created')
}

const waiters = Object.values(actions).filter(action => action.__radiaction.wait)
const waiters = Object.values(actions).filter(
action => action.__radiaction && action.__radiaction.wait
)
if (waiters.length) {
topics = topics.concat(waiters.map(action => `${action.name}${RESULT_SUFFIX}`))
topics = topics.concat(waiters.map(action => `${actionName(action)}${RESULT_SUFFIX}`))
}

console.log(`Creating topics with the following names:`)
Expand Down
39 changes: 34 additions & 5 deletions packages/radiaction/src/emitter.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,52 @@
const _ = require('lodash')
const { Producer } = require('no-kafka')
const { keepName } = require('./helpers')
const { actionName, keepName } = require('./helpers')

const stopAll = []

process.on('exit', () => {
stopAll.forEach(stop => stop())
})

function forceStructure(output) {
if (!_.isPlainObject(output)) {
return { value: output }
}

const keys = new Set(Object.keys(output))

if (!keys.has('value')) {
throw new Error(`plain objects sent to a emitter need to contain the 'value' field`)
}

keys.delete('value')
keys.delete('key')

if (keys.size > 0) {
throw new Error(`plain objects sent to a emitter can only contain fields 'value' and 'key'`)
}

return output
}

function wrap(action) {
const producer = new Producer()
stopAll.push(producer.close)
producer.init()

const newAction = (...args) =>
producer.send({
topic: action.name,
message: action.apply(action, args),
})
producer
.send({
topic: actionName(action),
message: forceStructure(action.apply(action, args)),
})
.then(results => {
if (!results || results.length !== 1) {
throw new Error(`Unexpected data received from message broker: ${results}`)
}

return results[0]
})

newAction.__radiaction = {
...action.__radiaction,
Expand Down
72 changes: 42 additions & 30 deletions packages/radiaction/src/emitter.test.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import test from 'ava'
import sinon from 'sinon'
import noKafkaMock from '../mocks/no-kafka'
import NoKafkaMock from '../mocks/no-kafka'
import emitter from './emitter'

test('flag is ON', t => {
Expand Down Expand Up @@ -66,44 +66,56 @@ test('arguments are passed down', async t => {
t.true(actions.d.calledOnceWithExactly('d0', 'd1', 'd3'), actions.d.args.toString())
})

test('message broker is called', async t => {
test('message broker is called with compatible descriptor', async t => {
const processed = emitter({
a: () => 1,
b: () => 2,
c: () => 3,
d: () => 4,
a: () => 'A',
b: () => ({ b: 'B' }),
c: () => ({ foo: 'bar', value: 'C' }),
d: () => ({ value: 'D' }),
e: () => ({ key: 'e', value: 'E' }),
})

await t.throws(processed.b, `plain objects sent to a emitter need to contain the 'value' field`)
await t.throws(
processed.c,
`plain objects sent to a emitter can only contain fields 'value' and 'key'`
)

const output = {
a: await processed.a(),
b: await processed.b(),
c: await processed.c(),
d: await processed.d(),
e: await processed.e(),
}

t.is(output.a[0].partition.constructor, Number)
t.is(output.a[0].offset.constructor, Number)
t.is(output.b[0].partition.constructor, Number)
t.is(output.b[0].offset.constructor, Number)
t.is(output.c[0].partition.constructor, Number)
t.is(output.c[0].offset.constructor, Number)
t.is(output.d[0].partition.constructor, Number)
t.is(output.d[0].offset.constructor, Number)

t.deepEqual(output.a, [
{ topic: 'a', error: null, partition: output.a[0].partition, offset: output.a[0].offset },
])
t.deepEqual(output.b, [
{ topic: 'b', error: null, partition: output.b[0].partition, offset: output.b[0].offset },
])
t.deepEqual(output.c, [
{ topic: 'c', error: null, partition: output.c[0].partition, offset: output.c[0].offset },
])
t.deepEqual(output.d, [
{ topic: 'd', error: null, partition: output.d[0].partition, offset: output.d[0].offset },
])
t.is(output.a.partition.constructor, Number)
t.is(output.a.offset.constructor, Number)
t.is(output.d.partition.constructor, Number)
t.is(output.d.offset.constructor, Number)
t.is(output.e.partition.constructor, Number)
t.is(output.e.offset.constructor, Number)

t.deepEqual(output.a, {
topic: 'a',
error: null,
partition: output.a.partition,
offset: output.a.offset,
})

t.deepEqual(output.d, {
topic: 'd',
error: null,
partition: output.d.partition,
offset: output.d.offset,
})

t.deepEqual(output.e, {
topic: 'e',
error: null,
partition: output.e.partition,
offset: output.e.offset,
})
})

test.after.always(() => {
noKafkaMock.stopAll()
NoKafkaMock.stopAll()
})
3 changes: 3 additions & 0 deletions packages/radiaction/src/helpers/actionName.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const _ = require('lodash')

module.exports = action => _.get(action, '__radiaction.topic', action.name)
1 change: 1 addition & 0 deletions packages/radiaction/src/helpers/index.js
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export { default as actionName } from './actionName'
export { default as keepName } from './keepName'
23 changes: 23 additions & 0 deletions packages/radiaction/src/helpers/keepName.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// it is not running
// https://github.com/Quadric/radiaction/issues/5

import test from 'ava'
import keepName from './keepName'

test('accept arrow functions', t => {
function myFunction() {}
const newFn = () => {}

const processedFn = keepName(newFn, myFunction)

t.is(processedFn.name, 'myFunction')
})

test('accept anonymous fns', async t => {
const myFunction = () => {}
const newFn = () => {}

const processedFn = keepName(newFn, myFunction)

t.is(processedFn.name, 'myFunction')
})
Loading