forked from jjeffery/stomp
-
Notifications
You must be signed in to change notification settings - Fork 97
/
Copy pathconn_options.go
351 lines (298 loc) · 12 KB
/
conn_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
package stomp
import (
"fmt"
"strings"
"time"
"github.com/go-stomp/stomp/v3/frame"
"github.com/go-stomp/stomp/v3/internal/log"
)
// ConnOptions is an opaque structure used to collection options
// for connecting to the other server.
type connOptions struct {
FrameCommand string
Host string
ReadTimeout time.Duration
WriteTimeout time.Duration
HeartBeatError time.Duration
MsgSendTimeout time.Duration
RcvReceiptTimeout time.Duration
DisconnectReceiptTimeout time.Duration
UnsubscribeReceiptTimeout time.Duration
HeartBeatGracePeriodMultiplier float64
Login, Passcode string
AcceptVersions []string
Header *frame.Header
ReadChannelCapacity, WriteChannelCapacity int
ReadBufferSize, WriteBufferSize int
ResponseHeadersCallback func(*frame.Header)
Logger Logger
}
func newConnOptions(conn *Conn, opts []func(*Conn) error) (*connOptions, error) {
co := &connOptions{
FrameCommand: frame.CONNECT,
ReadTimeout: time.Minute,
WriteTimeout: time.Minute,
HeartBeatGracePeriodMultiplier: 1.0,
HeartBeatError: DefaultHeartBeatError,
MsgSendTimeout: DefaultMsgSendTimeout,
RcvReceiptTimeout: DefaultRcvReceiptTimeout,
DisconnectReceiptTimeout: DefaultDisconnectReceiptTimeout,
UnsubscribeReceiptTimeout: DefaultUnsubscribeReceiptTimeout,
Logger: log.StdLogger{},
}
// This is a slight of hand, attach the options to the Conn long
// enough to run the options functions and then detach again.
// The reason we do this is to allow for future options to be able
// to modify the Conn object itself, in case that becomes desirable.
conn.options = co
defer func() { conn.options = nil }()
// compatibility with previous version: ignore nil options
for _, opt := range opts {
if opt != nil {
err := opt(conn)
if err != nil {
return nil, err
}
}
}
if len(co.AcceptVersions) == 0 {
co.AcceptVersions = append(co.AcceptVersions, string(V10), string(V11), string(V12))
}
return co, nil
}
func (co *connOptions) NewFrame() (*frame.Frame, error) {
f := frame.New(co.FrameCommand)
if co.Host != "" {
f.Header.Set(frame.Host, co.Host)
}
// heart-beat
{
send := co.WriteTimeout / time.Millisecond
recv := co.ReadTimeout / time.Millisecond
f.Header.Set(frame.HeartBeat, fmt.Sprintf("%d,%d", send, recv))
}
// login, passcode
if co.Login != "" || co.Passcode != "" {
f.Header.Set(frame.Login, co.Login)
f.Header.Set(frame.Passcode, co.Passcode)
}
// accept-version
f.Header.Set(frame.AcceptVersion, strings.Join(co.AcceptVersions, ","))
// custom header entries -- note that these do not override
// header values already set as they are added to the end of
// the header array
f.Header.AddHeader(co.Header)
return f, nil
}
// Options for connecting to the STOMP server. Used with the
// stomp.Dial and stomp.Connect functions, both of which have examples.
var ConnOpt struct {
// Login is a connect option that allows the calling program to
// specify the "login" and "passcode" values to send to the STOMP
// server.
Login func(login, passcode string) func(*Conn) error
// Host is a connect option that allows the calling program to
// specify the value of the "host" header.
Host func(host string) func(*Conn) error
// UseStomp is a connect option that specifies that the client
// should use the "STOMP" command instead of the "CONNECT" command.
// Note that using "STOMP" is only valid for STOMP version 1.1 and later.
UseStomp func(*Conn) error
// AcceptVersoin is a connect option that allows the client to
// specify one or more versions of the STOMP protocol that the
// client program is prepared to accept. If this option is not
// specified, the client program will accept any of STOMP versions
// 1.0, 1.1 or 1.2.
AcceptVersion func(versions ...Version) func(*Conn) error
// HeartBeat is a connect option that allows the client to specify
// the send and receive timeouts for the STOMP heartbeat negotiation mechanism.
// The sendTimeout parameter specifies the maximum amount of time
// between the client sending heartbeat notifications from the server.
// The recvTimeout paramter specifies the minimum amount of time between
// the client expecting to receive heartbeat notifications from the server.
// If not specified, this option defaults to one minute for both send and receive
// timeouts.
HeartBeat func(sendTimeout, recvTimeout time.Duration) func(*Conn) error
// HeartBeatError is a connect option that will normally only be specified during
// testing. It specifies a short time duration that is larger than the amount of time
// that will take for a STOMP frame to be transmitted from one station to the other.
// When not specified, this value defaults to 5 seconds. This value is set to a much
// shorter time duration during unit testing.
HeartBeatError func(errorTimeout time.Duration) func(*Conn) error
// MsgSendTimeout is a connect option that allows the client to specify
// the timeout for the Conn.Send function.
// The msgSendTimeout parameter specifies maximum blocking time for calling
// the Conn.Send function.
// If not specified, this option defaults to 10 seconds.
// Less than or equal to zero means infinite
MsgSendTimeout func(msgSendTimeout time.Duration) func(*Conn) error
// RcvReceiptTimeout is a connect option that allows the client to specify
// how long to wait for a receipt in the Conn.Send function. This helps
// avoid deadlocks. If this is not specified, the default is 30 seconds.
RcvReceiptTimeout func(rcvReceiptTimeout time.Duration) func(*Conn) error
// DisconnectReceiptTimeout is a connect option that allows the client to specify
// how long to wait for a receipt in the Conn.Disconnect function. This helps
// avoid deadlocks. If this is not specified, the default is 30 seconds.
DisconnectReceiptTimeout func(disconnectReceiptTimeout time.Duration) func(*Conn) error
// UnsubscribeReceiptTimeout is a connect option that allows the client to specify
// how long to wait for a receipt in the Conn.Unsubscribe function. This helps
// avoid deadlocks. If this is not specified, the default is 30 seconds.
UnsubscribeReceiptTimeout func(unsubscribeReceiptTimeout time.Duration) func(*Conn) error
// HeartBeatGracePeriodMultiplier is used to calculate the effective read heart-beat timeout
// the broker will enforce for each client’s connection. The multiplier is applied to
// the read-timeout interval the client specifies in its CONNECT frame
HeartBeatGracePeriodMultiplier func(multiplier float64) func(*Conn) error
// Header is a connect option that allows the client to specify a custom
// header entry in the STOMP frame. This connect option can be specified
// multiple times for multiple custom headers.
Header func(key, value string) func(*Conn) error
// ReadChannelCapacity is the number of messages that can be on the read channel at the
// same time. A high number may affect memory usage while a too low number may lock the
// system up. Default is set to 20.
ReadChannelCapacity func(capacity int) func(*Conn) error
// WriteChannelCapacity is the number of messages that can be on the write channel at the
// same time. A high number may affect memory usage while a too low number may lock the
// system up. Default is set to 20.
WriteChannelCapacity func(capacity int) func(*Conn) error
// ReadBufferSize specifies number of bytes that can be used to read the message
// A high number may affect memory usage while a too low number may lock the
// system up. Default is set to 4096.
ReadBufferSize func(size int) func(*Conn) error
// WriteBufferSize specifies number of bytes that can be used to write the message
// A high number may affect memory usage while a too low number may lock the
// system up. Default is set to 4096.
WriteBufferSize func(size int) func(*Conn) error
// ResponseHeaders lets you provide a callback function to get the headers from the CONNECT response
ResponseHeaders func(func(*frame.Header)) func(*Conn) error
// Logger lets you provide a callback function that sets the logger used by a connection
Logger func(logger Logger) func(*Conn) error
// Enable statistical gathering for reading/writing messages from the server
WithStats func() func(*Conn) error
}
func init() {
ConnOpt.Login = func(login, passcode string) func(*Conn) error {
return func(c *Conn) error {
c.options.Login = login
c.options.Passcode = passcode
return nil
}
}
ConnOpt.Host = func(host string) func(*Conn) error {
return func(c *Conn) error {
c.options.Host = host
return nil
}
}
ConnOpt.UseStomp = func(c *Conn) error {
c.options.FrameCommand = frame.STOMP
return nil
}
ConnOpt.AcceptVersion = func(versions ...Version) func(*Conn) error {
return func(c *Conn) error {
for _, version := range versions {
if err := version.CheckSupported(); err != nil {
return err
}
c.options.AcceptVersions = append(c.options.AcceptVersions, string(version))
}
return nil
}
}
ConnOpt.HeartBeat = func(sendTimeout, recvTimeout time.Duration) func(*Conn) error {
return func(c *Conn) error {
c.options.WriteTimeout = sendTimeout
c.options.ReadTimeout = recvTimeout
return nil
}
}
ConnOpt.HeartBeatError = func(errorTimeout time.Duration) func(*Conn) error {
return func(c *Conn) error {
c.options.HeartBeatError = errorTimeout
return nil
}
}
ConnOpt.MsgSendTimeout = func(msgSendTimeout time.Duration) func(*Conn) error {
return func(c *Conn) error {
c.options.MsgSendTimeout = msgSendTimeout
return nil
}
}
ConnOpt.RcvReceiptTimeout = func(rcvReceiptTimeout time.Duration) func(*Conn) error {
return func(c *Conn) error {
c.options.RcvReceiptTimeout = rcvReceiptTimeout
return nil
}
}
ConnOpt.DisconnectReceiptTimeout = func(disconnectReceiptTimeout time.Duration) func(*Conn) error {
return func(c *Conn) error {
c.options.DisconnectReceiptTimeout = disconnectReceiptTimeout
return nil
}
}
ConnOpt.UnsubscribeReceiptTimeout = func(unsubscribeReceiptTimeout time.Duration) func(*Conn) error {
return func(c *Conn) error {
c.options.UnsubscribeReceiptTimeout = unsubscribeReceiptTimeout
return nil
}
}
ConnOpt.HeartBeatGracePeriodMultiplier = func(multiplier float64) func(*Conn) error {
return func(c *Conn) error {
c.options.HeartBeatGracePeriodMultiplier = multiplier
return nil
}
}
ConnOpt.Header = func(key, value string) func(*Conn) error {
return func(c *Conn) error {
if c.options.Header == nil {
c.options.Header = frame.NewHeader(key, value)
} else {
c.options.Header.Add(key, value)
}
return nil
}
}
ConnOpt.ReadChannelCapacity = func(capacity int) func(*Conn) error {
return func(c *Conn) error {
c.options.ReadChannelCapacity = capacity
return nil
}
}
ConnOpt.WriteChannelCapacity = func(capacity int) func(*Conn) error {
return func(c *Conn) error {
c.options.WriteChannelCapacity = capacity
return nil
}
}
ConnOpt.ReadBufferSize = func(size int) func(*Conn) error {
return func(c *Conn) error {
c.options.ReadBufferSize = size
return nil
}
}
ConnOpt.WriteBufferSize = func(size int) func(*Conn) error {
return func(c *Conn) error {
c.options.WriteBufferSize = size
return nil
}
}
ConnOpt.ResponseHeaders = func(callback func(*frame.Header)) func(*Conn) error {
return func(c *Conn) error {
c.options.ResponseHeadersCallback = callback
return nil
}
}
ConnOpt.Logger = func(log Logger) func(*Conn) error {
return func(c *Conn) error {
if log != nil {
c.options.Logger = log
}
return nil
}
}
ConnOpt.WithStats = func() func(*Conn) error {
return func(conn *Conn) error {
conn.statsEnabled = true
return nil
}
}
}