-
Notifications
You must be signed in to change notification settings - Fork 38
/
Copy pathsubscribe.go
188 lines (173 loc) · 4.63 KB
/
subscribe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package mqtt
import (
"errors"
"time"
paho "github.com/eclipse/paho.mqtt.golang"
"github.com/grafana/sobek"
"github.com/mstoykov/k6-taskqueue-lib/taskqueue"
"go.k6.io/k6/js/common"
"go.k6.io/k6/metrics"
)
// Subscribe to the given topic message will be received using addEventListener
func (c *client) Subscribe(
// Topic to consume messages from
topic string,
// The QoS of messages
qos,
// timeout ms
timeout uint,
) error {
rt := c.vu.Runtime()
if c.pahoClient == nil || !c.pahoClient.IsConnected() {
common.Throw(rt, ErrClient)
return ErrClient
}
// check timeout value
timeoutValue, err := safeUintToInt64(timeout)
if err != nil {
common.Throw(rt, ErrTimeoutToLong)
return ErrTimeoutToLong
}
c.messageChan = make(chan paho.Message)
messageCB := func(_ paho.Client, msg paho.Message) {
go func(msg paho.Message) {
c.messageChan <- msg
}(msg)
}
token := c.pahoClient.Subscribe(topic, byte(qos), messageCB)
if !token.WaitTimeout(time.Duration(timeoutValue) * time.Millisecond) {
common.Throw(rt, ErrTimeout)
return ErrTimeout
}
if err := token.Error(); err != nil {
common.Throw(rt, err)
return ErrTimeout
}
registerCallback := func() func(func() error) {
callback := c.vu.RegisterCallback()
return func(f func() error) {
callback(f)
}
}
c.tq = taskqueue.New(registerCallback)
go c.loop(c.messageChan, timeoutValue)
return nil
}
func (c *client) receiveMessageMetric(msgLen float64) error {
// publish metrics
now := time.Now()
state := c.vu.State()
if state == nil {
return ErrState
}
ctx := c.vu.Context()
if ctx == nil {
return ErrState
}
metrics.PushIfNotDone(ctx, state.Samples, metrics.Sample{
TimeSeries: metrics.TimeSeries{Metric: c.metrics.ReceivedMessages, Tags: c.metrics.TagsAndMeta.Tags},
Time: now,
Value: float64(1),
})
metrics.PushIfNotDone(ctx, state.Samples, metrics.Sample{
TimeSeries: metrics.TimeSeries{Metric: c.metrics.ReceivedBytes, Tags: c.metrics.TagsAndMeta.Tags},
Time: now,
Value: msgLen,
})
return nil
}
//nolint:gocognit // todo improve this
func (c *client) loop(messageChan <-chan paho.Message, timeout int64) {
ctx := c.vu.Context()
stop := make(chan struct{})
defer c.tq.Close()
for {
select {
case msg, ok := <-messageChan:
if !ok {
// wanted exit in case of chan close
return
}
c.tq.Queue(func() error {
payload := string(msg.Payload())
ev := c.newMessageEvent(msg.Topic(), payload)
// publish associated metric
err := c.receiveMessageMetric(float64(len(payload)))
if err != nil {
return err
}
// TODO authorize multiple listeners
if c.messageListener != nil {
if _, err := c.messageListener(ev); err != nil {
return err
}
}
// if the client is waiting for multiple messages
// TODO handle multiple // subscribe case
if c.subRefCount > 0 {
c.subRefCount--
} else {
// exit the handle from evloop async
stop <- struct{}{}
}
return nil
})
case <-stop:
return
// TODO handle the context better in case of interuption
case <-ctx.Done():
c.tq.Queue(func() error {
ev := c.newErrorEvent("message vu cancel occurred")
if c.errorListener != nil {
if _, err := c.errorListener(ev); err != nil {
// only seen in case of sigint
return err
}
}
return nil
})
return
case <-time.After(time.Millisecond * time.Duration(timeout)):
c.tq.Queue(func() error {
ev := c.newErrorEvent("message timeout")
if c.errorListener != nil {
if _, err := c.errorListener(ev); err != nil {
return err
}
}
return nil
})
return
}
}
}
// AddEventListener expose the js method to listen for events
func (c *client) AddEventListener(event string, listener func(sobek.Value) (sobek.Value, error)) {
switch event {
case "message":
c.messageListener = listener
case "error":
c.errorListener = listener
default:
rt := c.vu.Runtime()
common.Throw(rt, errors.New("event: "+event+" does not exists"))
}
}
// SubContinue to be call in message callback to wait for on more message
// be careful this must be called only in the event loop and it not thread safe
func (c *client) SubContinue() {
c.subRefCount++
}
//nolint:nosnakecase // their choice not mine
func (c *client) newMessageEvent(topic, msg string) *sobek.Object {
rt := c.vu.Runtime()
o := rt.NewObject()
must := func(err error) {
if err != nil {
common.Throw(rt, err)
}
}
must(o.DefineDataProperty("topic", rt.ToValue(topic), sobek.FLAG_FALSE, sobek.FLAG_FALSE, sobek.FLAG_TRUE))
must(o.DefineDataProperty("message", rt.ToValue(msg), sobek.FLAG_FALSE, sobek.FLAG_FALSE, sobek.FLAG_TRUE))
return o
}