forked from anycable/anycable-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.go
126 lines (93 loc) · 2.49 KB
/
app.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package main
import (
"encoding/json"
pb "github.com/anycable/anycable-go/protos"
)
type App struct {
Pinger *Pinger
Subscriber *Subscriber
Disconnector *DisconnectNotifier
}
const (
PING = "ping"
)
type Message struct {
Command string `json:"command"`
Identifier string `json:"identifier"`
Data string `json:"data"`
}
type Reply struct {
Type string `json:"type"`
Identifier string `json:"identifier"`
Message interface{} `json:"message"`
}
func (r *Reply) toJSON() []byte {
jsonStr, err := json.Marshal(&r)
if err != nil {
panic("Failed to build JSON")
}
return jsonStr
}
var app = &App{}
func (app *App) Connected(conn *Conn, transmissions []string) {
app.Pinger.Increment()
hub.register <- conn
Transmit(conn, transmissions)
}
func (app *App) Subscribe(conn *Conn, msg *Message) {
if _, ok := conn.subscriptions[msg.Identifier]; ok {
log.Warningf("Already Subscribed to %s", msg.Identifier)
return
}
res := rpc.Subscribe(conn.identifiers, msg.Identifier)
if res.Status == 1 {
conn.subscriptions[msg.Identifier] = true
}
log.Debugf("Subscribe %s", res)
HandleReply(conn, msg, res)
}
func (app *App) Unsubscribe(conn *Conn, msg *Message) {
if _, ok := conn.subscriptions[msg.Identifier]; !ok {
log.Warningf("Unknown subscription %s", msg.Identifier)
return
}
res := rpc.Unsubscribe(conn.identifiers, msg.Identifier)
if res.Status == 1 {
delete(conn.subscriptions, msg.Identifier)
}
HandleReply(conn, msg, res)
}
func (app *App) Perform(conn *Conn, msg *Message) {
if _, ok := conn.subscriptions[msg.Identifier]; !ok {
log.Warningf("Unknown subscription %s", msg.Identifier)
return
}
res := rpc.Perform(conn.identifiers, msg.Identifier, msg.Data)
log.Debugf("Perform %s", res)
HandleReply(conn, msg, res)
}
func (app *App) Disconnected(conn *Conn) {
app.Pinger.Decrement()
hub.unregister <- conn
app.Disconnector.Notify(conn)
}
func (app *App) BroadcastAll(message []byte) {
hub.broadcast <- message
}
func Transmit(conn *Conn, transmissions []string) {
for _, msg := range transmissions {
conn.send <- []byte(msg)
}
}
func HandleReply(conn *Conn, msg *Message, reply *pb.CommandResponse) {
if reply.Disconnect {
defer conn.ws.Close()
}
if reply.StopStreams {
hub.unsubscribe <- &SubscriptionInfo{conn: conn, identifier: msg.Identifier}
}
for _, s := range reply.Streams {
hub.subscribe <- &SubscriptionInfo{conn: conn, stream: s, identifier: msg.Identifier}
}
Transmit(conn, reply.Transmissions)
}