@@ -22,16 +22,21 @@ import (
22
22
)
23
23
24
24
import (
25
+ "github.com/AlexStocks/goext/net"
25
26
"github.com/AlexStocks/goext/sync"
26
27
log "github.com/AlexStocks/log4go"
27
28
"github.com/gorilla/websocket"
29
+ jerrors "github.com/juju/errors"
28
30
)
29
31
30
32
const (
31
33
connInterval = 3e9 // 3s
32
34
connectTimeout = 5e9
33
35
maxTimes = 10
34
- pingPacket = "ping"
36
+ )
37
+
38
+ var (
39
+ connectPingPackage = []byte ("connect-ping" )
35
40
)
36
41
37
42
/////////////////////////////////////////
@@ -126,16 +131,17 @@ func (c *client) dialTCP() Session {
126
131
return nil
127
132
}
128
133
conn , err = net .DialTimeout ("tcp" , c .addr , connectTimeout )
129
- if err == nil && conn . LocalAddr (). String () == conn .RemoteAddr (). String ( ) {
134
+ if err == nil && gxnet . IsSameAddr ( conn . RemoteAddr (), conn .LocalAddr () ) {
130
135
conn .Close ()
131
136
err = errSelfConnect
132
137
}
133
138
if err == nil {
134
139
return newTCPSession (conn , c )
135
140
}
136
141
137
- log .Info ("net.DialTimeout(addr:%s, timeout:%v) = error{%s}" , c .addr , err )
138
- time .Sleep (connInterval )
142
+ log .Info ("net.DialTimeout(addr:%s, timeout:%v) = error{%s}" , c .addr , jerrors .ErrorStack (err ))
143
+ // time.Sleep(connInterval)
144
+ <- wheel .After (connInterval )
139
145
}
140
146
}
141
147
@@ -157,34 +163,36 @@ func (c *client) dialUDP() Session {
157
163
return nil
158
164
}
159
165
conn , err = net .DialUDP ("udp" , localAddr , peerAddr )
160
- if err == nil && conn . LocalAddr (). String () == conn .RemoteAddr (). String ( ) {
166
+ if err == nil && gxnet . IsSameAddr ( conn . RemoteAddr (), conn .LocalAddr () ) {
161
167
conn .Close ()
162
168
err = errSelfConnect
163
169
}
164
170
if err != nil {
165
- log .Warn ("net.DialTimeout(addr:%s, timeout:%v) = error{%s}" , c .addr , err )
166
- time .Sleep (connInterval )
171
+ log .Warn ("net.DialTimeout(addr:%s, timeout:%v) = error{%s}" , c .addr , jerrors .ErrorStack (err ))
172
+ // time.Sleep(connInterval)
173
+ <- wheel .After (connInterval )
167
174
continue
168
175
}
169
176
170
177
// check connection alive by write/read action
171
- copy (buf , []byte (pingPacket ))
172
178
conn .SetWriteDeadline (wheel .Now ().Add (1e9 ))
173
- if length , err = conn .Write (buf [: len ( pingPacket ) ]); err != nil {
179
+ if length , err = conn .Write (connectPingPackage [: ]); err != nil {
174
180
conn .Close ()
175
- log .Warn ("conn.Write(%s) = {length:%d, err:%s}" , pingPacket , length , err )
176
- time .Sleep (connInterval )
181
+ log .Warn ("conn.Write(%s) = {length:%d, err:%s}" , string (connectPingPackage ), length , jerrors .ErrorStack (err ))
182
+ // time.Sleep(connInterval)
183
+ <- wheel .After (connInterval )
177
184
continue
178
185
}
179
186
conn .SetReadDeadline (wheel .Now ().Add (1e9 ))
180
187
length , err = conn .Read (buf )
181
- if netErr , ok := err .(net.Error ); ok && netErr .Timeout () {
188
+ if netErr , ok := jerrors . Cause ( err ) .(net.Error ); ok && netErr .Timeout () {
182
189
err = nil
183
190
}
184
191
if err != nil {
185
- log .Info ("conn{%#v}.Read() = {length:%d, err:%s}" , conn , length , err )
192
+ log .Info ("conn{%#v}.Read() = {length:%d, err:%s}" , conn , length , jerrors . ErrorStack ( err ) )
186
193
conn .Close ()
187
- time .Sleep (connInterval )
194
+ // time.Sleep(connInterval)
195
+ <- wheel .After (connInterval )
188
196
continue
189
197
}
190
198
//if err == nil {
@@ -207,8 +215,8 @@ func (c *client) dialWS() Session {
207
215
return nil
208
216
}
209
217
conn , _ , err = dialer .Dial (c .addr , nil )
210
- log .Info ("websocket.dialer.Dial(addr:%s) = error:%s" , c .addr , err )
211
- if err == nil && conn . LocalAddr (). String () == conn .RemoteAddr (). String ( ) {
218
+ log .Info ("websocket.dialer.Dial(addr:%s) = error:%s" , c .addr , jerrors . ErrorStack ( err ) )
219
+ if err == nil && gxnet . IsSameAddr ( conn . RemoteAddr (), conn .LocalAddr () ) {
212
220
conn .Close ()
213
221
err = errSelfConnect
214
222
}
@@ -221,8 +229,9 @@ func (c *client) dialWS() Session {
221
229
return ss
222
230
}
223
231
224
- log .Info ("websocket.dialer.Dial(addr:%s) = error:%s" , c .addr , err )
225
- time .Sleep (connInterval )
232
+ log .Info ("websocket.dialer.Dial(addr:%s) = error:%s" , c .addr , jerrors .ErrorStack (err ))
233
+ // time.Sleep(connInterval)
234
+ <- wheel .After (connInterval )
226
235
}
227
236
}
228
237
@@ -247,7 +256,7 @@ func (c *client) dialWSS() Session {
247
256
if c .cert != "" {
248
257
certPEMBlock , err := ioutil .ReadFile (c .cert )
249
258
if err != nil {
250
- panic (fmt .Sprintf ("ioutil.ReadFile(cert:%s) = error{%s}" , c .cert , err ))
259
+ panic (fmt .Sprintf ("ioutil.ReadFile(cert:%s) = error{%s}" , c .cert , jerrors . ErrorStack ( err ) ))
251
260
}
252
261
253
262
var cert tls.Certificate
@@ -269,7 +278,7 @@ func (c *client) dialWSS() Session {
269
278
for _ , c := range config .Certificates {
270
279
roots , err = x509 .ParseCertificates (c .Certificate [len (c .Certificate )- 1 ])
271
280
if err != nil {
272
- panic (fmt .Sprintf ("error parsing server's root cert: %s\n " , err ))
281
+ panic (fmt .Sprintf ("error parsing server's root cert: %s\n " , jerrors . ErrorStack ( err ) ))
273
282
}
274
283
for _ , root = range roots {
275
284
certPool .AddCert (root )
@@ -285,7 +294,7 @@ func (c *client) dialWSS() Session {
285
294
return nil
286
295
}
287
296
conn , _ , err = dialer .Dial (c .addr , nil )
288
- if err == nil && conn . LocalAddr (). String () == conn .RemoteAddr (). String ( ) {
297
+ if err == nil && gxnet . IsSameAddr ( conn . RemoteAddr (), conn .LocalAddr () ) {
289
298
conn .Close ()
290
299
err = errSelfConnect
291
300
}
@@ -299,8 +308,9 @@ func (c *client) dialWSS() Session {
299
308
return ss
300
309
}
301
310
302
- log .Info ("websocket.dialer.Dial(addr:%s) = error{%s}" , c .addr , err )
303
- time .Sleep (connInterval )
311
+ log .Info ("websocket.dialer.Dial(addr:%s) = error{%s}" , c .addr , jerrors .ErrorStack (err ))
312
+ // time.Sleep(connInterval)
313
+ <- wheel .After (connInterval )
304
314
}
305
315
}
306
316
@@ -361,11 +371,19 @@ func (c *client) connect() {
361
371
}
362
372
}
363
373
374
+ // there are two methods to keep connection pool. the first approch is like
375
+ // redigo's lazy connection pool(https://github.com/gomodule/redigo/blob/master/redis/pool.go:),
376
+ // in which you should apply testOnBorrow to check alive of the connection.
377
+ // the second way is as follows. @RunEventLoop detects the aliveness of the connection
378
+ // in regular time interval.
379
+ // the active method maybe overburden the cpu slightly.
380
+ // however, you can get a active tcp connection very quickly.
364
381
func (c * client ) RunEventLoop (newSession NewSessionCallback ) {
365
382
c .Lock ()
366
383
c .newSession = newSession
367
384
c .Unlock ()
368
385
386
+ log .Info ("run" )
369
387
c .wg .Add (1 )
370
388
// a for-loop goroutine to make sure the connection is valid
371
389
go func () {
@@ -389,7 +407,8 @@ func (c *client) RunEventLoop(newSession NewSessionCallback) {
389
407
if maxTimes < times {
390
408
times = maxTimes
391
409
}
392
- time .Sleep (time .Duration (int64 (times ) * connInterval ))
410
+ // time.Sleep(time.Duration(int64(times) * connInterval))
411
+ <- wheel .After (time .Duration (int64 (times ) * connInterval ))
393
412
continue
394
413
}
395
414
times = 0
0 commit comments