Skip to content

Commit

Permalink
shutdown timeout problems and races, fixes #406
Browse files Browse the repository at this point in the history
  • Loading branch information
behrad committed Sep 12, 2014
1 parent 70d551f commit 601d2c4
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 19 deletions.
5 changes: 4 additions & 1 deletion lib/kue.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ exports.workers = [];
function Queue( options ) {
options = options || {};
redis.configureFactory( options, this );
// console.log( "******************** creating Kue client... " );
this.client = Worker.client = Job.client = redis.createClient();
this.promoter = null;
this.workers = exports.workers;
Expand Down Expand Up @@ -247,7 +248,9 @@ Queue.prototype.shutdown = function (fn, timeout, type) {

// Wrap `fn` to only call after all workers finished
fn = function (err) {
if (err) return origFn(err);
if (err) {
return origFn(err);
}
if (!--n) {
cleanup();
origFn.apply(null, arguments);
Expand Down
11 changes: 7 additions & 4 deletions lib/queue/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,16 @@ function get(fn, order) {
if (!pending) return fn(null, ids);
ids.forEach(function (id) {
exports.get(id, function (err, job) {
if (err)
/*fn*/console.log(err);
else
if (err) {
// events.emit(id, 'error', err);
console.error(err);
} else {
jobs[job.id] = job;
}
--pending || fn(null, 'desc' == order
? map(jobs, ids).reverse()
: map(jobs, ids));

});
});
}
Expand Down Expand Up @@ -346,7 +349,7 @@ Job.prototype.set = function (key, val, fn) {
* Get job `key`
*
* @param {String} key
* @param {String} fn
* @param {Function} fn
* @return {Job} for chaining
* @api public
*/
Expand Down
37 changes: 23 additions & 14 deletions lib/queue/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var clients = {};
function Worker(queue, type) {
this.queue = queue;
this.type = type;
// console.log( "******************** creating Worker client... " );
this.client = Worker.client || (Worker.client = redis.createClient());
this.running = true;
this.job = null;
Expand Down Expand Up @@ -107,11 +108,12 @@ Worker.prototype.failed = function (job, err, fn) {
var self = this;
job.error(err).failed( function(){
job.attempt(function (error, remaining, attempts, max) {
if (error) return self.error(error, job);
if (error) {
return self.error(error, job);
}
if (remaining) {
var emit = function () {
self.emit('job failed attempt', job);
events.emit(job.id, 'failed attempt', attempts);
self.emitJobEvent('failed attempt', job, attempts );
};

if ( job.backoff() ) {
Expand All @@ -128,8 +130,7 @@ Worker.prototype.failed = function (job, err, fn) {
job.inactive(emit);
}
} else {
self.emit('job failed', job);
events.emit(job.id, 'failed');
self.emitJobEvent('failed', job);
}
self.start(fn);
}.bind(this));
Expand All @@ -155,6 +156,7 @@ Worker.prototype.process = function (job, fn) {
fn(
job,
function (err, result) {
if( self.drop_user_callbacks ) return;
if (err) {
return self.failed(job, err, fn);
}
Expand All @@ -169,11 +171,10 @@ Worker.prototype.process = function (job, fn) {
}
job.complete( function(){
job.attempt( function(){
self.emit('job complete', job);
events.emit(job.id, 'complete', result);
if( job.removeOnComplete() ) {
job.remove();
}
self.emitJobEvent('complete', job, result);
});
}.bind(this));
self.start(fn);
Expand Down Expand Up @@ -230,14 +231,12 @@ Worker.prototype.zpop = function (key, fn) {

Worker.prototype.getJob = function (fn) {
var self = this;

// alloc a client for this job type
var client = clients[self.type]
|| (clients[self.type] = redis.createClient());

if (!self.running) {
return fn("Already Shutdown");
}
// alloc a client for this job type
// if(!clients[self.type]) console.log( "******************** creating BLPOP client... " );
var client = clients[self.type] || (clients[self.type] = redis.createClient());
// BLPOP indicates we have a new inactive job to process
client.blpop(client.getKey(self.type + ':jobs'), 0, function (err) {
if (err || !self.running) {
Expand Down Expand Up @@ -280,6 +279,7 @@ Worker.prototype.shutdown = function (fn, timeout) {
//Safeyly kill any blpop's that are waiting.
(self.type in clients) && clients[self.type].quit();
delete clients[self.type];
self.cleaned_up = true;
fn();
};
if (!this.running) return _fn();
Expand All @@ -297,8 +297,11 @@ Worker.prototype.shutdown = function (fn, timeout) {
shutdownTimer = setTimeout(function () {
if (self.job) {
if( self.job !== true ) {
// a job is running - fail it and call _fn when complete
self.failed( self.job, { error: true, message: "Shutdown" }, _fn);
self.drop_user_callbacks = true;
self.removeAllListeners();
// a job is running - fail it and call _fn when failed
self.once('job failed', _fn);
self.failed( self.job, { error: true, message: "Shutdown" });
}
} else {
// no job running - just finish immediately
Expand All @@ -308,6 +311,12 @@ Worker.prototype.shutdown = function (fn, timeout) {
}
};

Worker.prototype.emitJobEvent = function( event, job, args ) {
if( this.cleaned_up ) return;
events.emit(job.id, event, args);
this.emit('job ' + event, job);
};

Worker.prototype.resume = function () {
if (this.running) return false;
this.running = true;
Expand Down
2 changes: 2 additions & 0 deletions lib/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ exports.createClientFactory = function( options ) {
*/

exports.client = function () {
// if (!exports._client) console.log( "******************** creating _client client... " );
return exports._client || (exports._client = exports.createClient());
};

Expand All @@ -88,6 +89,7 @@ exports.client = function () {
*/

exports.pubsubClient = function () {
// if (!exports._pubsub) console.log( "******************** creating _pubsub client... " );
return exports._pubsub || (exports._pubsub = exports.createClient());
};

Expand Down

0 comments on commit 601d2c4

Please sign in to comment.