Skip to content

Commit

Permalink
test(cluster): write tests around cluster messaging
Browse files Browse the repository at this point in the history
  • Loading branch information
thetutlage committed Mar 14, 2018
1 parent 3ec21a9 commit 205773c
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 52 deletions.
10 changes: 10 additions & 0 deletions japaFile.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
'use strict'

/**
* Enable it, since some tests rely on debug
* statements
*
* @type {String}
*/
process.env.DEBUG = 'adonis:websocket'
process.env.DEBUG_COLORS = false
process.env.DEBUG_HIDE_DATE = true

const cli = require('japa/cli')
cli.run('test/**/*.spec.js')
6 changes: 6 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
"japa": "^1.0.6",
"japa-cli": "^1.0.1",
"nyc": "^11.6.0",
"standard": "^11.0.0"
"standard": "^11.0.0",
"test-console": "^1.1.0"
},
"dependencies": {
"@adonisjs/generic-exceptions": "^2.0.0",
Expand All @@ -43,6 +44,7 @@
"exclude": [
"src/Context/index.js",
"src/ClusterHop/index.js",
"src/ClusterHop/sender.js",
"test"
]
},
Expand Down
2 changes: 1 addition & 1 deletion src/Channel/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class Channel {
* @return {void}
*/
broadcast (topic, payload, filterSockets = []) {
this.subscriptions.get(topic).forEach((socket) => {
this.getTopicSubscriptions(topic).forEach((socket) => {
if (filterSockets.indexOf(socket.id) === -1) {
socket.connection.write(payload)
}
Expand Down
57 changes: 8 additions & 49 deletions src/ClusterHop/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,52 +10,9 @@
*/

const cluster = require('cluster')
const debug = require('debug')('adonis:websocket:cluster')
const ChannelsManager = require('../Channel/Manager')

/**
* Delivers the message from process to the channel
*
* @method deliverMessage
*
* @param {String} handle
* @param {String} topic
* @param {String} payload
*
* @return {void}
*/
function deliverMessage (handle, topic, payload) {
if (handle === 'broadcast') {
const channel = ChannelsManager.resolve(topic)
if (!channel) {
return debug('broadcast topic %s cannot be handled by any channel', topic)
}
channel.clusterMessage(topic, payload)
}

debug('dropping packet, since %s handle is not allowed', handle)
}

/**
* Handles the messages received on a given process
*
* @method handleProcessMessage
*
* @param {String} message
*
* @return {void}
*/
function handleProcessMessage (message) {
try {
const { topic, handle, payload } = JSON.parse(message)
if (!handle) {
debug('dropping packet, since handle is missing')
}
deliverMessage(handle, topic, payload)
} catch (error) {
debug('dropping packet, since not valid json')
}
}
const debug = require('debug')('adonis:websocket')
const receiver = require('./receiver')
const sender = require('./sender')

module.exports = {
/**
Expand All @@ -67,7 +24,8 @@ module.exports = {
*/
init () {
if (cluster.isWorker) {
process.on('message', handleProcessMessage)
debug('adding listener from worker to receive node message')
process.on('message', receiver)
}
},

Expand All @@ -85,7 +43,7 @@ module.exports = {
*/
send (handle, topic, payload) {
if (cluster.isWorker) {
process.send(JSON.stringify({ handle, topic, payload }))
sender(handle, topic, payload)
}
},

Expand All @@ -97,6 +55,7 @@ module.exports = {
* @return {void}
*/
destroy () {
process.removeListener('message', handleProcessMessage)
debug('cleaning up cluster listeners')
process.removeListener('message', receiver)
}
}
79 changes: 79 additions & 0 deletions src/ClusterHop/receiver.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
'use strict'

/**
* adonis-websocket
*
* (c) Harminder Virk <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

const ChannelsManager = require('../Channel/Manager')
const debug = require('debug')('adonis:websocket')

/**
* Delivers the message from process to the channel
*
* @method deliverMessage
*
* @param {String} handle
* @param {String} topic
* @param {String} payload
*
* @return {void}
*/
function deliverMessage (handle, topic, payload) {
if (handle === 'broadcast') {
const channel = ChannelsManager.resolve(topic)

if (!channel) {
return debug('broadcast topic %s cannot be handled by any channel', topic)
}

channel.clusterBroadcast(topic, payload)
return
}

debug('dropping packet, since %s handle is not allowed', handle)
}

/**
* Handles the messages received on a given process
*
* @method handleProcessMessage
*
* @param {String} message
*
* @return {void}
*/
module.exports = function handleProcessMessage (message) {
let decoded = null

/**
* Decoding the JSON message
*/
try {
decoded = JSON.parse(message)
} catch (error) {
debug('dropping packet, since not valid json')
return
}

/**
* Ignoring packet when there is no handle
*/
if (!decoded.handle) {
debug('dropping packet, since handle is missing')
return
}

/**
* Safely trying to deliver cluster messages
*/
try {
deliverMessage(decoded.handle, decoded.topic, decoded.payload)
} catch (error) {
debug('unable to process cluster message with error %o', error)
}
}
19 changes: 19 additions & 0 deletions src/ClusterHop/sender.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
'use strict'

/**
* adonis-websocket
*
* (c) Harminder Virk <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
const debug = require('debug')('adonis:websocket')

module.exports = function (handle, topic, payload) {
try {
process.send(JSON.stringify({ handle, topic, payload }))
} catch (error) {
debug('cluster.send error %o', error)
}
}
44 changes: 43 additions & 1 deletion test/unit/channel.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,52 @@ test.group('Channel', () => {
assert.equal(message, 'Cannot join topic')
}
})

test('write payload to socket connection when broadcast is invoked', (assert, done) => {
assert.plan(1)

const connection = new FakeConnection()
connection.write = function (payload) {
assert.equal(payload, 'hello')
done()
}

const ctx = {
socket: new Socket('foo', connection)
}

const channel = new Channel('foo', function () {})
channel
.joinTopic(ctx)
.then(() => {
channel.broadcast('foo', 'hello')
})
})

test('ignore when broadcast is called for a topic, which has zero subscriptions', (assert, done) => {
const connection = new FakeConnection()
connection.write = function (payload) {
assert.throw('Never expected to be called')
}

const ctx = {
socket: new Socket('foo', connection)
}

const channel = new Channel('foo', function () {})
channel
.joinTopic(ctx)
.then(() => {
channel.broadcast('bar', 'hello')
setTimeout(() => {
done()
}, 200)
})
})
})

test.group('Channel Manager', (group) => {
group.beforeEach(() => {
group.afterEach(() => {
Manager.clear()
})

Expand Down
71 changes: 71 additions & 0 deletions test/unit/cluster-receiver.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
'use strict'

/**
* adonis-websocket
*
* (c) Harminder Virk <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

const test = require('japa')
const clusterReceiver = require('../../src/ClusterHop/receiver')
const ChannelManager = require('../../src/Channel/Manager')
const stderr = require('test-console').stderr

test.group('Cluster Receiver', (group) => {
test('ignore message when it\'s not json', (assert) => {
assert.plan(1)
const inspect = stderr.inspect()

clusterReceiver({ name: 'virk' })

inspect.restore()
assert.equal(inspect.output[0].trim(), 'adonis:websocket dropping packet, since not valid json')
})

test('ignore message when handle is missing', (assert) => {
assert.plan(1)
const inspect = stderr.inspect()

clusterReceiver(JSON.stringify({ topic: 'chat' }))

inspect.restore()
assert.equal(inspect.output[0].trim(), 'adonis:websocket dropping packet, since handle is missing')
})

test('ignore message when handle is not one of the allowed handles', (assert) => {
assert.plan(1)
const inspect = stderr.inspect()

clusterReceiver(JSON.stringify({ topic: 'chat', handle: 'foo' }))

inspect.restore()
assert.equal(inspect.output[0].trim(), 'adonis:websocket dropping packet, since foo handle is not allowed')
})

test('ignore message when topic cannot be handled by any channel', (assert) => {
assert.plan(1)
const inspect = stderr.inspect()

clusterReceiver(JSON.stringify({ topic: 'chat', handle: 'broadcast' }))

inspect.restore()
assert.equal(inspect.output[0].trim(), 'adonis:websocket broadcast topic chat cannot be handled by any channel')
})

test('send message to channel responsible for handling the topic', (assert, done) => {
assert.plan(2)
const channel = ChannelManager.add('chat', () => {})

channel.clusterBroadcast = function (topic, payload) {
assert.equal(topic, 'chat')
assert.equal(payload, 'hello')
done()
}

clusterReceiver(JSON.stringify({ topic: 'chat', handle: 'broadcast', payload: 'hello' }))
ChannelManager.clear()
})
})

0 comments on commit 205773c

Please sign in to comment.