-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.js
65 lines (59 loc) · 1.94 KB
/
worker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
'use strict';
var config = require('./config').config;
var schema = require('raintank-core/schema');
var util = require('util');
var serviceTypes = require('raintank-core/serviceTypes');
var continousQueryTypes = require('raintank-core/queryTypes');
var queue = require('raintank-queue');
var consumer = new queue.Consumer({
mgmtUrl: config.queue.mgmtUrl
});
var running = false;
var client;
function init() {
consumer.on('connect', function() {
consumer.join('tasks', 'taskWorker');
});
consumer.on('message', function (topic, partition, message) {
var ts = message
console.log(ts);
if (!(ts._id)) {
console.log("invalid msg content.");
return;
}
if (ts.timestamp < (new Date().getTime() - 60000)) {
console.log('task is too old. skipping.');
return;
}
var populate = null;
var cls = null;
if (ts.parent.class == 'services') {
populate = 'serviceType';
cls = serviceTypes;
} else if (ts.parent.class == 'continuousQuery') {
populate = 'continuousQueryType';
cls = ccontinousQueryTypes;
}
schema[ts.parent.class].model.findOne({_id: ts.parent.id}).populate(populate).exec(function(err, task) {
if (err) {
console.log(err);
}
if (!task) {
console.log('task not found in DB.');
return;
}
if (task.enabled) {
var Task = cls[task[populate]._id].factory(task);
console.log(util.format('disptching %s task: %s', ts.parent.class, task._id));
Task.run(ts.timestamp);
} else {
console.log(ts.parent.class + " task " + ts.parent.id + " is paused.");
}
});
});
}
process.on( "SIGINT", function() {
console.log('CLOSING [SIGINT]');
process.exit();
});
init();