-
Notifications
You must be signed in to change notification settings - Fork 5
/
sender.go
73 lines (63 loc) · 1.74 KB
/
sender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package mqtt // import "gosrc.io/mqtt"
import (
"io"
"net"
)
// Sender need the following interface:
// - Net.conn to send TCP packets
// - Error send channel to trigger teardown on send error
// - SendChannel receiving []byte
// - KeepaliveCtl to reset keepalive packet timer after a send
// - Way to stop the sender when client wants to stop / disconnect
type sender struct {
done <-chan struct{}
out chan<- []byte
quit chan<- struct{}
}
func initSender(conn net.Conn, keepalive int) sender {
tearDown := make(chan struct{})
out := make(chan []byte)
quit := make(chan struct{})
// Start go routine that manage keepalive timer:
var keepaliveCtl chan int
if keepalive > 0 {
keepaliveCtl = startKeepalive(keepalive, func() {
pingReq := PingReqPacket{}
buf := pingReq.Marshall()
conn.Write(buf)
})
}
s := sender{done: tearDown, out: out, quit: quit}
go senderLoop(conn, keepaliveCtl, out, quit, tearDown)
return s
}
func senderLoop(conn io.WriteCloser, keepaliveCtl chan int, out <-chan []byte, quit <-chan struct{}, tearDown chan<- struct{}) {
Loop:
for {
select {
case buf := <-out:
conn.Write(buf) // TODO Trigger teardown and stop on write error
keepaliveSignal(keepaliveCtl, keepaliveReset)
case <-quit:
// Client want this sender to terminate
terminateSender(conn, keepaliveCtl)
break Loop
}
}
}
func (s sender) send(buf []byte) {
s.out <- buf
}
// clean-up:
func terminateSender(conn io.Closer, keepaliveCtl chan int) {
keepaliveSignal(keepaliveCtl, keepaliveStop)
_ = conn.Close()
}
// keepaliveSignal sends keepalive commands on keepalive channel (if
// keepalive is not disabled).
func keepaliveSignal(keepaliveCtl chan<- int, signal int) {
if keepaliveCtl == nil {
return
}
keepaliveCtl <- signal
}