Skip to content

Commit 5be9454

Browse files
committed
first merge
1 parent c6c0963 commit 5be9454

File tree

3 files changed

+53
-26
lines changed

3 files changed

+53
-26
lines changed

lib/queue/job.js

+28-24
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,12 @@ function get( fn, order, jobType) {
8888
, jobs = {};
8989
if( !pending ) return fn(null, ids);
9090
ids.forEach(function( id ) {
91+
id = redis.client().stripFIFO(id); // turn zid back to regular job id
9192
exports.get(id, jobType, function( err, job ) {
9293
if( err ) {
9394
console.error(err);
9495
} else {
95-
jobs[ job.id ] = job;
96+
jobs[ redis.client().createFIFO(job.id) ] = job;
9697
}
9798
--pending || fn(null, 'desc' == order
9899
? map(jobs, ids).reverse()
@@ -169,6 +170,7 @@ exports.get = function( id, jobType, fn ) {
169170
, job = new Job;
170171

171172
job.id = id;
173+
job.zid = client.createFIFO(id);
172174
client.hgetall(client.getKey('job:' + job.id), function( err, hash ) {
173175
if( err ) return fn(err);
174176
if( !hash ) {
@@ -224,20 +226,21 @@ exports.get = function( id, jobType, fn ) {
224226

225227
exports.removeBadJob = function( id, jobType) {
226228
var client = redis.client();
229+
var zid = client.createFIFO(id);
227230
client.multi()
228231
.del(client.getKey('job:' + id + ':log'))
229232
.del(client.getKey('job:' + id))
230-
.zrem(client.getKey('jobs:inactive'), id)
231-
.zrem(client.getKey('jobs:active'), id)
232-
.zrem(client.getKey('jobs:complete'), id)
233-
.zrem(client.getKey('jobs:failed'), id)
234-
.zrem(client.getKey('jobs:delayed'), id)
235-
.zrem(client.getKey('jobs'), id)
236-
.zrem(client.getKey('jobs:' + jobType + ':inactive'), id)
237-
.zrem(client.getKey('jobs:' + jobType+ ':active'), id)
238-
.zrem(client.getKey('jobs:' + jobType + ':complete'), id)
239-
.zrem(client.getKey('jobs:' + jobType + ':failed'), id)
240-
.zrem(client.getKey('jobs:' + jobType + ':delayed'), id)
233+
.zrem(client.getKey('jobs:inactive'), zid)
234+
.zrem(client.getKey('jobs:active'), zid)
235+
.zrem(client.getKey('jobs:complete'), zid)
236+
.zrem(client.getKey('jobs:failed'), zid)
237+
.zrem(client.getKey('jobs:delayed'), zid)
238+
.zrem(client.getKey('jobs'), zid)
239+
.zrem(client.getKey('jobs:' + jobType + ':inactive'), zid)
240+
.zrem(client.getKey('jobs:' + jobType+ ':active'), zid)
241+
.zrem(client.getKey('jobs:' + jobType + ':complete'), zid)
242+
.zrem(client.getKey('jobs:' + jobType + ':failed'), zid)
243+
.zrem(client.getKey('jobs:' + jobType + ':delayed'), zid)
241244
.exec();
242245
if( !exports.disableSearch ) {
243246
getSearch().remove(id);
@@ -337,7 +340,7 @@ Job.prototype.toJSON = function() {
337340

338341

339342
Job.prototype.refreshTtl = function() {
340-
('active' === this.state() && this._ttl > 0) ? this.client.zadd(this.client.getKey('jobs:' + this.state()), Date.now() + parseInt(this._ttl), this.id) : noop();
343+
('active' === this.state() && this._ttl > 0) ? this.client.zadd(this.client.getKey('jobs:' + this.state()), Date.now() + parseInt(this._ttl), this.zid) : noop();
341344
};
342345

343346

@@ -625,9 +628,9 @@ Job.prototype.searchKeys = function( keys ) {
625628
Job.prototype.remove = function( fn ) {
626629
var client = this.client;
627630
client.multi()
628-
.zrem(client.getKey('jobs:' + this.state()), this.id)
629-
.zrem(client.getKey('jobs:' + this.type + ':' + this.state()), this.id)
630-
.zrem(client.getKey('jobs'), this.id)
631+
.zrem(client.getKey('jobs:' + this.state()), this.zid)
632+
.zrem(client.getKey('jobs:' + this.type + ':' + this.state()), this.zid)
633+
.zrem(client.getKey('jobs'), this.zid)
631634
.del(client.getKey('job:' + this.id + ':log'))
632635
.del(client.getKey('job:' + this.id))
633636
.exec(function( err ) {
@@ -658,18 +661,18 @@ Job.prototype.state = function( state, fn ) {
658661
var multi = client.multi();
659662
if( oldState && oldState != '' && oldState != state ) {
660663
multi
661-
.zrem(client.getKey('jobs:' + oldState), this.id)
662-
.zrem(client.getKey('jobs:' + this.type + ':' + oldState), this.id);
664+
.zrem(client.getKey('jobs:' + oldState), this.zid)
665+
.zrem(client.getKey('jobs:' + this.type + ':' + oldState), this.zid);
663666
}
664667
multi
665668
.hset(client.getKey('job:' + this.id), 'state', state)
666-
.zadd(client.getKey('jobs:' + state), this._priority, this.id)
667-
.zadd(client.getKey('jobs:' + this.type + ':' + state), this._priority, this.id);
669+
.zadd(client.getKey('jobs:' + state), this._priority, this.zid)
670+
.zadd(client.getKey('jobs:' + this.type + ':' + state), this._priority, this.zid);
668671

669672
// use promote_at as score when job moves to delayed
670-
('delayed' === state) ? multi.zadd(client.getKey('jobs:' + state), parseInt(this.promote_at), this.id) : noop();
671-
('active' === state && this._ttl > 0) ? multi.zadd(client.getKey('jobs:' + state), Date.now() + parseInt(this._ttl), this.id) : noop();
672-
('active' === state && !this._ttl) ? multi.zadd(client.getKey('jobs:' + state), this._priority<0?this._priority:-this._priority, this.id) : noop();
673+
('delayed' === state) ? multi.zadd(client.getKey('jobs:' + state), parseInt(this.promote_at), this.zid) : noop();
674+
('active' === state && this._ttl > 0) ? multi.zadd(client.getKey('jobs:' + state), Date.now() + parseInt(this._ttl), this.zid) : noop();
675+
('active' === state && !this._ttl) ? multi.zadd(client.getKey('jobs:' + state), this._priority<0?this._priority:-this._priority, this.zid) : noop();
673676
('inactive' === state) ? multi.lpush(client.getKey(this.type + ':jobs'), 1) : noop();
674677

675678
this.set('updated_at', Date.now());
@@ -776,6 +779,7 @@ Job.prototype.save = function( fn ) {
776779
// add the job for event mapping
777780
var key = client.getKey('job:' + id);
778781
self.id = id;
782+
self.zid = client.createFIFO(id);
779783
self.subscribe(function() {
780784
self._state = self._state || (this._delay ? 'delayed' : 'inactive');
781785
if( max ) client.hset(key, 'max_attempts', max);
@@ -836,7 +840,7 @@ Job.prototype.update = function( fn ) {
836840
// priority
837841
this.set('priority', this._priority);
838842

839-
this.client.zadd(this.client.getKey('jobs'), this._priority, this.id);
843+
this.client.zadd(this.client.getKey('jobs'), this._priority, this.zid);
840844

841845
// data
842846
this.set('data', json, function() {

lib/queue/worker.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,8 @@ Worker.prototype.zpop = function( key, fn ) {
247247
.exec(function( err, res ) {
248248
if( err ) return fn(err);
249249
var id = res[ 0 ][ 0 ] || res[ 0 ][ 1 ][ 0 ];
250-
fn(null, id);
251-
});
250+
fn(null, this.client.stripFIFO(id));
251+
}.bind(this));
252252
};
253253

254254
/**

lib/redis.js

+23
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,13 @@ exports.configureFactory = function( options, queue ) {
6060
exports.createClient = function() {
6161
var clientFactoryMethod = options.redis.createClientFactory || exports.createClientFactory;
6262
var client = clientFactoryMethod(options);
63+
6364
client.on('error', function( err ) {
6465
queue.emit('error', err);
6566
});
67+
6668
client.prefix = options.prefix;
69+
6770
// redefine getKey to use the configured prefix
6871
client.getKey = function( key ) {
6972
if( client.constructor.name == 'Redis' ) {
@@ -73,6 +76,26 @@ exports.configureFactory = function( options, queue ) {
7376
}
7477
return this.prefix + ':' + key;
7578
};
79+
80+
client.createFIFO = function( id ) {
81+
//Create an id for the zset to preserve FIFO order
82+
var idLen = '' + id.toString().length;
83+
var len = 2 - idLen.length;
84+
while (len--) idLen = '0' + idLen;
85+
return idLen + '|' + id;
86+
};
87+
88+
// Parse out original ID from zid
89+
client.stripFIFO = function( zid ) {
90+
if ( typeof zid === 'string' ) {
91+
return zid.substr(zid.indexOf('|')+1);
92+
} else {
93+
// Sometimes this gets called with an undefined
94+
// it seems to be OK to have that not resolve to an id
95+
return zid;
96+
}
97+
};
98+
7699
return client;
77100
};
78101
};

0 commit comments

Comments
 (0)