forked from medic/cht-couch2pg
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
133 lines (114 loc) · 3.25 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
var log = require('loglevel-message-prefix')(require('loglevel'), {
prefixes: ['timestamp', 'level']
});
var Promise = require('rsvp').Promise,
env = require('./env')(),
xmlformsMigrator = require('./libs/xmlforms/migrator');
var couchdb = require('pouchdb')(env.couchdbUrl),
db = require('pg-promise')({ 'promiseLib': Promise })(env.postgresqlUrl);
var couch2pg = require('couch2pg'),
migrator = couch2pg.migrator,
importer = couch2pg.importer(
db, couchdb,
env.couch2pgDocLimit,
env.couch2pgChangesLimit),
xmlforms = require('./libs/xmlforms/updater')(db);
var firstRun = false;
var errorCount = 0;
var sleepMs = function(errored) {
if (errored) {
errorCount++;
if (errorCount === env.couch2pgRetryCount) {
throw new Error('Too many consecutive errors');
}
var backoffMs = errorCount * 1000 * 60;
return Math.min(backoffMs, env.sleepMs);
} else {
errorCount = 0;
return env.sleepMs;
}
};
var migrateCouch2pg = function() {
return migrator(env.postgresqlUrl)();
};
var migrateXmlforms = function() {
return xmlformsMigrator(env.postgresqlUrl)();
};
var delayLoop = function(errored) {
return new Promise(function(resolve) {
var ms = sleepMs(errored);
log.info('Run '+(errored ? 'errored' : 'complete') + '. Next run at ' + new Date(new Date().getTime() + ms));
if (ms === 0) {
resolve();
} else {
setTimeout(resolve, ms);
}
});
};
var run = function() {
log.info('Beginning couch2pg and xmlforms run at ' + new Date());
var runErrored = false;
return importer.importAll()
.catch(function(err) {
log.error('Couch2PG import failed');
log.error(err);
if (err && err.status === 401){
process.exit(1);
}
runErrored = true;
})
.then(function(results) {
// Run secondary tasks if we reasonably think there might be new data
// runErrored || errorCount <- something went wrong, but maybe there is still new data
// firstRun <- this is first run, we don't know what the DB state is in
// results.deleted.length || results.edited.length <- data has changed
if (runErrored || errorCount || firstRun || results.deleted.length || results.edited.length) {
return xmlforms.update();
}
})
.catch(function(err) {
log.error('XMLForms support failed');
log.error(err);
runErrored = true;
})
.then(function() {
if (!runErrored) {
// We have completed a successful run
firstRun = false;
}
return delayLoop(runErrored);
})
.then(run);
};
var legacyRun = function() {
log.info('Beginning couch2pg run at ' + new Date());
return importer.importAll()
.then(
function() {
return delayLoop();
},
function(err) {
log.error('Couch2PG import failed');
log.error(err);
return delayLoop(true);
})
.then(legacyRun);
};
var doRun = function() {
if (env.v04Mode) {
log.info('Adapter is running in 0.4 mode');
return migrateCouch2pg()
.then(legacyRun);
} else {
log.info('Adapter is running in NORMAL mode');
return migrateCouch2pg()
.then(migrateXmlforms)
.then(run);
}
};
doRun().catch(function(err) {
log.error('An unrecoverable error occurred');
log.error(err);
log.error('exiting');
process.exit(1);
});