-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpubsub.go
121 lines (109 loc) · 2.57 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package gredis
import (
"regexp"
"sync"
"github.com/leslie-fei/gredis/resp"
"github.com/panjf2000/gnet/v2"
)
type subChannel struct {
conn gnet.Conn
channels []string
pattern bool
}
func newPubSub() *pubSub {
return &pubSub{
conns: make(map[gnet.Conn]*subChannel),
psubs: make(map[string][]gnet.Conn),
subs: make(map[string][]gnet.Conn),
}
}
type pubSub struct {
rw sync.RWMutex
conns map[gnet.Conn]*subChannel
psubs map[string][]gnet.Conn
subs map[string][]gnet.Conn
}
func (p *pubSub) Subscribe(conn gnet.Conn, pattern bool, channels []string) {
p.rw.Lock()
defer p.rw.Unlock()
sc := &subChannel{channels: channels, pattern: pattern, conn: conn}
p.conns[conn] = sc
for _, channel := range channels {
if pattern {
p.psubs[channel] = append(p.psubs[channel], conn)
} else {
p.subs[channel] = append(p.subs[channel], conn)
}
}
// send a message to the client
var outs [][]byte
for i, channel := range channels {
var out []byte
out = resp.AppendArray(out, 3)
if pattern {
out = resp.AppendBulkString(out, "psubscribe")
} else {
out = resp.AppendBulkString(out, "subscribe")
}
out = resp.AppendBulkString(out, channel)
out = resp.AppendInt(out, int64(i+1))
outs = append(outs, out)
}
if len(outs) > 0 {
_, _ = conn.Writev(outs)
}
}
func (p *pubSub) Publish(channel, message string) int {
p.rw.RLock()
defer p.rw.RUnlock()
var sent int
if conns, ok := p.subs[channel]; ok {
for _, conn := range conns {
_, _ = conn.Write(p.writeMessage(false, "", channel, message))
sent++
}
}
for pchan, conns := range p.psubs {
re, err := regexp.Compile(pchan)
if err != nil {
continue
}
if re.MatchString(channel) {
for _, conn := range conns {
_, _ = conn.Write(p.writeMessage(true, pchan, channel, message))
sent++
}
}
}
return sent
}
func (p *pubSub) writeMessage(pat bool, pchan, channel, msg string) []byte {
var out []byte
if pat {
out = resp.AppendArray(out, 4)
out = resp.AppendBulkString(out, "pmessage")
out = resp.AppendBulkString(out, pchan)
out = resp.AppendBulkString(out, channel)
out = resp.AppendBulkString(out, msg)
} else {
out = resp.AppendArray(out, 3)
out = resp.AppendBulkString(out, "message")
out = resp.AppendBulkString(out, channel)
out = resp.AppendBulkString(out, msg)
}
return out
}
func (p *pubSub) OnClose(conn gnet.Conn) {
p.rw.Lock()
defer p.rw.Unlock()
if sc, ok := p.conns[conn]; ok {
for _, channel := range sc.channels {
if sc.pattern {
delete(p.psubs, channel)
} else {
delete(p.subs, channel)
}
}
delete(p.conns, conn)
}
}