Skip to content

Commit

Permalink
mateodelnorte#77 - handling bad JSON
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Kazantsev committed May 11, 2017
1 parent d9d929a commit c148566
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 73 deletions.
23 changes: 15 additions & 8 deletions bus/formatters/json.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,30 @@
module.exports.deserialize = function deserialize (content) {

module.exports.deserialize = function deserialize(content, cb) {

if (content === null || content === undefined) return cb(null, content);

/**
* Return input string
*/
try {
content = JSON.parse(content);
} catch (err) {
throw err;
console.log('Content - is not valid JSON: ', err);
return cb(err);
}

return content;
cb(null, content);

};

module.exports.serialize = function serialize (content) {
module.exports.serialize = function serialize(content, cb) {

try {
content = JSON.stringify(content);
} catch (err) {
throw err;
console.log('Content can\'t serialized to JSON format: ', err);
return cb(err);
}

return content;
cb(null, content);

};
64 changes: 35 additions & 29 deletions bus/rabbitmq/pubsubqueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ var extend = require('extend');
var newId = require('node-uuid').v4;
var util = require('util');

function PubSubQueue (options) {
function PubSubQueue(options) {
options = options || {};
var exchangeOptions = options.exchangeOptions || {};
var queueOptions = options.queueOptions || {};

extend(queueOptions, {
autoDelete: ! (options.ack || options.acknowledge),
autoDelete: !(options.ack || options.acknowledge),
contentType: options.contentType || 'application/json',
durable: Boolean(options.ack || options.acknowledge),
exclusive: options.exclusive || false,
Expand Down Expand Up @@ -43,24 +43,25 @@ function PubSubQueue (options) {
this.sendChannel.assertExchange(this.exchangeName, this.exchangeOptions.type || 'topic', this.exchangeOptions);
}

PubSubQueue.prototype.publish = function publish (event, options) {
PubSubQueue.prototype.publish = function publish(event, options) {
options = options || {};
var self = this;

options.contentType = options.contentType || this.contentType;

self.sendChannel.publish(self.exchangeName, self.routingKey || self.queueName, new Buffer(options.formatter.serialize(event)), options);

options.formatter.serialize(event, function (err, content) {
if (err) return null;
self.sendChannel.publish(self.exchangeName, self.routingKey || self.queueName, new Buffer(content), options);
});
};

PubSubQueue.prototype.subscribe = function subscribe (options, callback) {
PubSubQueue.prototype.subscribe = function subscribe(options, callback) {
var self = this;

function _unsubscribe (cb) {
function _unsubscribe(cb) {
self.listenChannel.cancel(self.subscription.consumerTag, cb);
}

function _subscribe (uniqueName) {
function _subscribe(uniqueName) {
self.listenChannel.consume(uniqueName, function (message) {
/*
Note from http://www.squaremobius.net/amqp.node/doc/channel_api.html
Expand All @@ -69,27 +70,32 @@ PubSubQueue.prototype.subscribe = function subscribe (options, callback) {
If the consumer is cancelled by RabbitMQ, the message callback will be invoked with null.
*/
if (message === null) {
return;
return;
}
// todo: map contentType to default formatters
message.content = options.formatter.deserialize(message.content);
options.queueType = 'pubsubqueue';
self.bus.handleIncoming(self.listenChannel, message, options, function (channel, message, options) {
// amqplib intercepts errors and closes connections before bubbling up
// to domain error handlers when they occur non-asynchronously within
// callback. Therefore, if there is a process domain, we try-catch to
// redirect the error, assuming the domain creator's intentions.
try {
callback(message.content, message);
} catch (err) {
if (process.domain && process.domain.listeners('error')) {
process.domain.emit('error', err);
} else {
self.emit('error', err);
options.formatter.deserialize(message.content, function (err, content) {
if (err) return callback(err);

message.content = content;
options.queueType = 'pubsubqueue';

self.bus.handleIncoming(self.listenChannel, message, options, function (channel, message, options) {
// amqplib intercepts errors and closes connections before bubbling up
// to domain error handlers when they occur non-asynchronously within
// callback. Therefore, if there is a process domain, we try-catch to
// redirect the error, assuming the domain creator's intentions.
try {
callback(message.content, message);
} catch (err) {
if (process.domain && process.domain.listeners('error')) {
process.domain.emit('error', err);
} else {
self.emit('error', err);
}
}
}
});
});
}, { noAck: ! self.ack })
}, { noAck: !self.ack })
.then(function (ok) {
self.subscription = { consumerTag: ok.consumerTag };
});
Expand All @@ -104,9 +110,9 @@ PubSubQueue.prototype.subscribe = function subscribe (options, callback) {
if (self.ack) {
self.log('asserting error queue ' + self.errorQueueName);
self.listenChannel.assertQueue(self.errorQueueName, self.queueOptions)
.then(function (_qok) {
_subscribe(uniqueName);
});
.then(function (_qok) {
_subscribe(uniqueName);
});
} else {
_subscribe(uniqueName);
}
Expand Down
81 changes: 45 additions & 36 deletions bus/rabbitmq/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ var EventEmitter = require('events').EventEmitter;
var extend = require('extend');
var util = require('util');

function Queue (options) {
function Queue(options) {
options = options || {};
var queueOptions = options.queueOptions || {};

extend(queueOptions, {
autoDelete: ! (options.ack || options.acknowledge),
autoDelete: !(options.ack || options.acknowledge),
contentType: options.contentType || 'application/json',
durable: Boolean(options.ack || options.acknowledge),
exclusive: options.exclusive || false,
Expand Down Expand Up @@ -38,10 +38,10 @@ function Queue (options) {
if (self.ack) {
self.log('asserting error queue %s', self.errorQueueName);
self.listenChannel.assertQueue(self.errorQueueName, self.queueOptions)
.then(function (_qok) {
self.initialized = true;
self.emit('ready');
});
.then(function (_qok) {
self.initialized = true;
self.emit('ready');
});
} else {
self.initialized = true;
self.emit('ready');
Expand All @@ -55,15 +55,15 @@ function Queue (options) {

util.inherits(Queue, EventEmitter);

Queue.prototype.listen = function listen (callback, options) {
Queue.prototype.listen = function listen(callback, options) {
options = options || {};
queueOptions = options.queueOptions || {};

var self = this;

this.log('listening to queue %s', this.queueName);

if ( ! this.initialized) {
if (!this.initialized) {
return this.on('ready', listen.bind(this, callback, options));
}

Expand All @@ -75,33 +75,38 @@ Queue.prototype.listen = function listen (callback, options) {
If the consumer is cancelled by RabbitMQ, the message callback will be invoked with null.
*/
if (message === null) {
return;
return;
}
message.content = options.formatter.deserialize(message.content);
options.queueType = 'queue';
self.bus.handleIncoming(self.listenChannel, message, options, function (channel, message, options) {
// amqplib intercepts errors and closes connections before bubbling up
// to domain error handlers when they occur non-asynchronously within
// callback. Therefore, if there is a process domain, we try-catch to
// redirect the error, assuming the domain creator's intentions.
try {
callback(message.content, message);
} catch (err) {
if (process.domain && process.domain.listeners('error')) {
process.domain.emit('error', err);
} else {
self.emit('error', err);
options.formatter.deserialize(message.content, function (err, content) {
if (err) return callback(err);

message.content = content;
options.queueType = 'queue';

self.bus.handleIncoming(self.listenChannel, message, options, function (channel, message, options) {
// amqplib intercepts errors and closes connections before bubbling up
// to domain error handlers when they occur non-asynchronously within
// callback. Therefore, if there is a process domain, we try-catch to
// redirect the error, assuming the domain creator's intentions.
try {
callback(message.content, message);
} catch (err) {
if (process.domain && process.domain.listeners('error')) {
process.domain.emit('error', err);
} else {
self.emit('error', err);
}
}
}
});
});
}, { noAck: ! self.ack })
}, { noAck: !self.ack })
.then(function (ok) {
self.subscription = { consumerTag: ok.consumerTag };
});

};

Queue.prototype.destroy = function destroy (options) {
Queue.prototype.destroy = function destroy(options) {
options = options || {};
var em = new EventEmitter();
this.log('deleting queue %s', this.queueName);
Expand All @@ -115,36 +120,40 @@ Queue.prototype.destroy = function destroy (options) {
return em;
};

Queue.prototype.unlisten = function unlisten () {
Queue.prototype.unlisten = function unlisten() {
var em = new EventEmitter();
var self = this;

if (this.subscription) {
this.listenChannel.cancel(this.subscription.consumerTag)
.then(function (err, ok) {
delete self.subscription;
em.emit('success');
});
delete self.subscription;
em.emit('success');
});
} else {
throw new Error('Attempted to unlisten a queue that is not yet listening.');
}

return em;
};

Queue.prototype.send = function send (event, options) {
Queue.prototype.send = function send(event, options) {
options = options || {};
var self = this;

if ( ! this.initialized) {
if (!this.initialized) {
return this.on('ready', send.bind(this, event, options));
}

options.contentType = options.contentType || this.contentType;
options.persistent = Boolean(options.ack || options.acknowledge || options.persistent || self.ack);

this.sendChannel.sendToQueue(this.routingKey || this.queueName, new Buffer(options.formatter.serialize(event)), options);

options.formatter.serialize(event, function (err, content) {
if (err) return null;

this.sendChannel.sendToQueue(this.routingKey || this.queueName, new Buffer(content), options);
});

};

module.exports = Queue;

0 comments on commit c148566

Please sign in to comment.