-
Notifications
You must be signed in to change notification settings - Fork 2
/
index.js
104 lines (83 loc) · 2.96 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
'use strict'
exports = module.exports = SSEBroadcasterRedisAdapter
var assert = require('assert'),
inherits = require('util').inherits,
EventEmitter = require('events'),
Broadcaster = require('sse-broadcast'),
redis = require('redis'),
id = require('mdbid')
function SSEBroadcasterRedisAdapter(broadcaster, optionsOrClient) {
if (!(this instanceof SSEBroadcasterRedisAdapter))
return new SSEBroadcasterRedisAdapter(broadcaster, optionsOrClient)
assert(broadcaster, 'a broadcaster instance is required')
assert(
broadcaster instanceof Broadcaster,
'broadcaster must be an instance of SSEBroadcaster'
)
this.id = id()
this.broadcaster = broadcaster
if (optionsOrClient instanceof redis.RedisClient) {
this.pub = optionsOrClient
this.sub = optionsOrClient.duplicate()
}
else {
this.pub = redis.createClient(optionsOrClient)
this.sub = redis.createClient(optionsOrClient)
}
broadcaster.on('publish', this.onpublish.bind(this))
broadcaster.on('subscribe', this.onsubscribe.bind(this))
broadcaster.on('unsubscribe', this.onunsubscribe.bind(this))
this.pub.on('error', this.onerror.bind(this))
this.sub.on('error', this.onerror.bind(this))
this.sub.on('pmessage', this.onpmessage.bind(this))
}
inherits(SSEBroadcasterRedisAdapter, EventEmitter)
// static properties
Object.defineProperties(exports, {
Adapter: {
enumerable: true,
value: SSEBroadcasterRedisAdapter
},
version: {
enumerable: true,
get: function () {
return require('./package.json').version
}
}
})
SSEBroadcasterRedisAdapter.prototype.onerror = function onerror(err) {
this.emit('error', err)
}
SSEBroadcasterRedisAdapter.prototype.onpmessage = function onpmessage(pattern, channel, message) {
var id = channel.substring(0, 24)
// we've got back our own message
if (this.id === id)
return
message = JSON.parse(message)
// do not re-emit this publish
// (and start an infinite ping-pong match)
message.emit = false
this.broadcaster.publish(channel.substring(30), message)
}
SSEBroadcasterRedisAdapter.prototype.onpublish = function onpublish(name, message) {
this.pub.publish(this.id + ':sseb:' + name, JSON.stringify(message))
}
SSEBroadcasterRedisAdapter.prototype.onsubscribe = function onsubscribe(name) {
this.sub.psubscribe('*:sseb:' + name)
}
SSEBroadcasterRedisAdapter.prototype.onunsubscribe = function onunsubscribe(name) {
if (!this.broadcaster.subscriberCount(name))
this.sub.punsubscribe('*:sseb:' + name)
}
SSEBroadcasterRedisAdapter.prototype.quit = function quit() {
this.pub.quit()
this.sub.quit()
}
SSEBroadcasterRedisAdapter.prototype.unref = function unref() {
this.pub.unref()
this.sub.unref()
}
SSEBroadcasterRedisAdapter.prototype.end = function end(flush) {
this.pub.end(flush)
this.sub.end(flush)
}