@@ -14,6 +14,7 @@ import (
1414 "fmt"
1515 "github.com/google/uuid"
1616 "google.golang.org/grpc"
17+ "google.golang.org/grpc/backoff"
1718 "google.golang.org/grpc/connectivity"
1819 "google.golang.org/grpc/credentials"
1920 "google.golang.org/grpc/credentials/insecure"
@@ -197,12 +198,17 @@ func (s *Session) ID() string {
197198
198199// Close closes a connection.
199200func (s * Session ) Close () {
200- s .maps = make (map [string ]interface {}, 0 )
201- s .caches = make (map [string ]interface {}, 0 )
202- err := s .conn .Close ()
203- s .closed = true
204- if err != nil {
205- log .Printf ("Unable to close session %s %v" , s .sessionID , err )
201+ if ! s .closed {
202+ s .maps = make (map [string ]interface {}, 0 )
203+ s .caches = make (map [string ]interface {}, 0 )
204+ err := s .conn .Close ()
205+ s .closed = true
206+ s .dispatch (Closed , func () SessionLifecycleEvent {
207+ return newSessionLifecycleEvent (s , Closed )
208+ })
209+ if err != nil {
210+ log .Printf ("Unable to close session %s %v" , s .sessionID , err )
211+ }
206212 }
207213}
208214
@@ -245,6 +251,17 @@ func (s *Session) ensureConnection() error {
245251 }
246252 s .dialOptions = append (s .dialOptions , tlsOpt )
247253
254+ connOpt := grpc .WithConnectParams (grpc.ConnectParams {
255+ Backoff : backoff.Config {
256+ BaseDelay : 1.0 * time .Second ,
257+ Multiplier : 1.1 ,
258+ Jitter : 0.0 ,
259+ MaxDelay : 3.0 * time .Second ,
260+ },
261+ MinConnectTimeout : 10 * time .Second ,
262+ })
263+ s .dialOptions = append (s .dialOptions , connOpt )
264+
248265 s .mutex .Lock ()
249266 locked = true
250267
@@ -268,10 +285,11 @@ func (s *Session) ensureConnection() error {
268285 // refer: https://grpc.github.io/grpc/core/md_doc_connectivity-semantics-and-api.html
269286 go func (session * Session ) {
270287 var (
271- firstConnect = true
272- connected = false
273- ctx = context .Background ()
274- lastState = session .conn .GetState ()
288+ firstConnect = true
289+ connected = false
290+ ctx = context .Background ()
291+ lastState = session .conn .GetState ()
292+ disconnectTime int64 = 0
275293 )
276294
277295 for {
@@ -285,40 +303,52 @@ func (s *Session) ensureConnection() error {
285303 session .debug ("connection:" , lastState , "=>" , newState )
286304
287305 if newState == connectivity .Shutdown {
288- log .Printf ("closed session %v" , s .sessionID )
289- s .dispatch (Closed , func () SessionLifecycleEvent {
290- return newSessionLifecycleEvent (session , Closed )
291- })
292- session .closed = true
306+ log .Printf ("closed session %v" , session .sessionID )
307+ session .Close ()
293308 return
294309 }
295310
296311 if newState == connectivity .Ready {
297312 if ! firstConnect && ! connected {
313+ disconnectTime = 0
298314 log .Printf ("session: %s re-connected to address %s" , session .sessionID , session .sessOpts .Address )
299- s .dispatch (Reconnected , func () SessionLifecycleEvent {
315+ session .dispatch (Reconnected , func () SessionLifecycleEvent {
300316 return newSessionLifecycleEvent (session , Reconnected )
301317 })
302318 session .closed = false
303319 connected = true
304320 } else if firstConnect && ! connected {
305321 firstConnect = false
306322 connected = true
307- s .hasConnected = true
323+ session .hasConnected = true
308324 log .Printf ("session: %s connected to address %s" , session .sessionID , session .sessOpts .Address )
309- s .dispatch (Connected , func () SessionLifecycleEvent {
325+ session .dispatch (Connected , func () SessionLifecycleEvent {
310326 return newSessionLifecycleEvent (session , Connected )
311327 })
312328 }
313329 } else {
314330 if connected {
331+ disconnectTime = - 1
315332 log .Printf ("session: %s disconnected from address %s" , session .sessionID , session .sessOpts .Address )
316- s .dispatch (Disconnected , func () SessionLifecycleEvent {
333+ session .dispatch (Disconnected , func () SessionLifecycleEvent {
317334 return newSessionLifecycleEvent (session , Disconnected )
318335 })
319336 connected = false
320337 }
321338
339+ if disconnectTime != 0 {
340+ if disconnectTime == - 1 {
341+ disconnectTime = time .Now ().UnixMilli ()
342+ } else {
343+ waited := time .Now ().UnixMilli () - disconnectTime
344+ if waited >= session .GetSessionTimeout ().Milliseconds () {
345+ log .Printf ("session: %s unable to reconnect within [%s]. Closing session." , session .sessionID , session .GetSessionTimeout ().String ())
346+ session .Close ()
347+ return
348+ }
349+ }
350+ }
351+
322352 // trigger a reconnection on state change
323353 if newState != connectivity .Connecting {
324354 conn .Connect ()
0 commit comments