This repository was archived by the owner on Jan 24, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstatemachine.js
More file actions
186 lines (153 loc) · 5.43 KB
/
statemachine.js
File metadata and controls
186 lines (153 loc) · 5.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
var async = require('async');
var kue = require('kue');
var reds = require('reds');
var helpers = require('./lib/helpers');
var Procedure = require('./lib/procedure');
function StateMachine(opts) {
if (!(this instanceof StateMachine)) return new StateMachine(opts);
//configure which redis connection should be used.
if (opts.createClient)
kue.redis.createClient = opts.createClient
// use the same createClient function as kue
reds.createClient = kue.redis.createClient;
this.queue = kue.createQueue();
// q:search is the reds-namespace created by kue, so the queue are
// aumatically indexed
this.search = reds.createSearch('q:search');
}
StateMachine.prototype.createProcedure = function(defaults, steps, id) {
return new Procedure({
defaults: defaults,
queue: this.queue,
steps: steps,
// random and unique id for this procedure - used to resume already
// started jobs, among other things
id: id || helpers.randomStr()
});
}
StateMachine.prototype.process = function(event, concurrency, callback) {
var self = this;
if (!callback) {
callback = concurrency;
concurrency = 1;
}
this.queue.process('statemachine:' + event, concurrency, function(job, done) {
self._formatJobData(job);
callback(job, function(err) {
if (err) return done(err);
self.completeJob(job, done);
});
});
}
// TODO: Abstract away this into a lib/job.js-file
StateMachine.prototype._formatJobData = function(job) {
job.rawData = job.data;
job.data = helpers.mergeObjs(
{
stepName: job.data.stepName
},
job.data.defaults,
job.data.processData
);
}
//
// Query the none-completed jobs with a json-object, such that the specified
// json object is subset of the found job-data
//
StateMachine.prototype.query = function(queryObj, callback) {
var self = this;
// flatten the queryObj and use that as the search string
// reds does a AND search as standard between the different values
var searchString = helpers.flatten(queryObj).join(' ');
this.search
.query(searchString)
.end(function(err, ids) {
async.map(
ids,
function(id, done) {
kue.Job.get(id, done);
},
function(err, jobs) {
process.nextTick(function() {
if (err) return callback(err);
jobs = jobs
.map(function(job) {
self._formatJobData(job);
return job;
})
.filter(function(job) {
return helpers.subset(job.data, queryObj);
});
callback(null, jobs);
});
}
);
});
}
//
// reset the active jobs - this should typically be called on initialization.
// The job will first be set to failed, but if there's attempts left then we
// set the state to inactive, so that it can be processed later
//
StateMachine.prototype.resetActiveJobs = function(events, callback) {
if (!Array.isArray(events)) {
throw new Error('events is not an array');
}
var self = this;
function resetJob(job, done) {
job.failed();
// TODO: Write tests for this part
job.attempt(function(err, remaining, attempts, max){
if (err) return done(err);
if (remaining) {
job.inactive();
}
job.update(done);
});
}
async.forEach(events, function(event, done) {
var type = 'statemachine:' + event;
// 0 is first element, -1 is last element
kue.Job.rangeByType(type, 'active', 0, -1, 'asc', function(err, jobs) {
if (err) return done(err);
async.forEach(jobs, resetJob, done);
});
}, callback);
}
// TODO: Abstract this to a lib/job.js-file
StateMachine.prototype.completeJob = function(job, callback) {
callback = callback || function(err) {
if (err) throw err;
}
var data = job.rawData
job.data = job.rawData; //restore original data object
if (data.steps.length === 0) return callback(null);
// TODO: setting createdNextProcedure as a variable in the job is sort of
// stupid - but it works and, more importanly, we get a callback we can rely
// on - that is not the case when you call job.complete.
if (job.get('createdNextProcedure') === true) return callback(null);
this.createProcedure(
data.defaults, data.steps, data.id
).execute(function(err, done) {
if (err) return callback(err);
job.complete();
job.set('createdNextProcedure', true);
job.update(callback);
});
}
// Complete a job based on its ID
StateMachine.prototype.completeJobById = function(jobId, callback) {
var self = this;
callback = callback || function(err) {
if(err) throw err;
}
kue.Job.get(jobId, function(err, job){
if(err) callback(err);
// Make sure we dont complete a job twice
if(job._state === 'complete') return callback(null);
// Comply with what is expected by completeJob
self._formatJobData(job);
self.completeJob(job, callback);
});
}
module.exports = StateMachine;