-
Notifications
You must be signed in to change notification settings - Fork 4
/
index.js
129 lines (116 loc) · 3.61 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
const EventEmitter = require('events');
const _getReadQuery = function(walOptions, slotName){
let changesSql = '';
Object.keys(walOptions).forEach(function(option){
const value = walOptions[option];
changesSql += `, '${option}', '${value}'`;
});
const sql = `SELECT * FROM pg_catalog.pg_logical_slot_get_changes('${slotName}', NULL, NULL${changesSql});`;
return {
text: sql,
rowMode: 'array'
};
};
const _init = async function(client, slotName, temporary){
const checkQuery = 'SELECT * FROM pg_replication_slots WHERE slot_name = $1;';
const results = await client.query(checkQuery, [slotName]);
if(!results.rows.length){
const startQuery = "SELECT pg_catalog.pg_create_logical_replication_slot($1, 'wal2json', $2);";
await client.query(startQuery, [slotName, temporary]);
}
else if(temporary){
throw new Error('A temporary replication slot with this name already exists.');
}
};
class Wal2JSONListener extends EventEmitter {
constructor(client, {slotName, timeout, temporary}, walOptions={}) {
super();
this.slotName = slotName || 'test_slot';
this.walOptions = walOptions;
this.temporary = temporary;
this.curTimeout = null;
this.client = client;
this.timeout = timeout;
this.running = false;
this.readQuery = _getReadQuery(this.walOptions, this.slotName);
this.client.connect();
}
_readChanges(){
const self = this;
self.client.query(self.readQuery, function(err, results){
if(err){
self.stop(err);
throw (err);
}
else{
self.waiting = false;
self.emit('changes', results.rows);
}
});
}
_error(err){
this._close();
this.emit('error', err);
}
_close(){
this.client.end();
this.client = null;
this.running = false;
this.emit('stop', true);
return true;
}
restart(client){
if(this.running){
this._close();
}
this.client = client;
this.client.connect();
this.start();
}
start(){
if(this.running){
this._error('This listener is already running. If you would like to restart it use the restart method.');
return;
}
this.waiting = false;
this.running = true;
const self = this;
_init(this.client, this.slotName, this.temporary).then(function(){
self.emit('start', true);
self.next();
}).catch(function(err){
self._error(err);
});
}
next(){
const self = this;
if(!this.running){
this._error('Please start the listener before requesting changes.');
}
else if(this.waiting){
this._error('You are trying to read new changes while the previous changes are still being processed.')
}
else if(!this.client){
this._error("This listener doesn't have a valid open client, to add one run restart(client).")
}
else if(this.timeout){
self.waiting = true;
this.curTimeout = setTimeout(function(){
self._readChanges();
}, this.timeout)
}
else{
self.waiting = true;
self._readChanges();
}
}
stop(err){
if(this.curTimeout){
clearTimeout(this.curTimeout);
}
if(this.running){
this._close(err)
}
}
}
module.exports = Wal2JSONListener;