@@ -30,9 +30,11 @@ import (
3030var ErrInvalidFormat = errors .New ("format can only be 'json'" )
3131
3232const (
33- defaultFormat = "json"
34- mapOrCacheExists = "the %s %s already exists with different type parameters"
35- defaultSessionTimeout = "30000" // millis
33+ defaultFormat = "json"
34+ mapOrCacheExists = "the %s %s already exists with different type parameters"
35+ defaultRequestTimeout = "30000" // millis
36+ defaultDisconnectTimeout = "30000" // millis
37+ defaultReadyTimeout = "0" // millis
3638)
3739
3840// Session provides APIs to create NamedCaches. The [NewSession] method creates a
@@ -56,15 +58,17 @@ type Session struct {
5658
5759// SessionOptions holds the session attributes like host, port, tls attributes etc.
5860type SessionOptions struct {
59- Address string
60- TLSEnabled bool
61- Scope string
62- Format string
63- ClientCertPath string
64- ClientKeyPath string
65- CaCertPath string
66- PlainText bool
67- Timeout time.Duration
61+ Address string
62+ TLSEnabled bool
63+ Scope string
64+ Format string
65+ ClientCertPath string
66+ ClientKeyPath string
67+ CaCertPath string
68+ PlainText bool
69+ RequestTimeout time.Duration
70+ DisconnectTimeout time.Duration
71+ ReadyTimeout time.Duration
6872}
6973
7074// NewSession creates a new Session with the specified sessionOptions.
@@ -115,9 +119,11 @@ func NewSession(ctx context.Context, options ...func(session *SessionOptions)) (
115119 caches : make (map [string ]interface {}, 0 ),
116120 lifecycleListeners : []* SessionLifecycleListener {},
117121 sessOpts : & SessionOptions {
118- PlainText : false ,
119- Format : defaultFormat ,
120- Timeout : time .Duration (0 ) * time .Second },
122+ PlainText : false ,
123+ Format : defaultFormat ,
124+ RequestTimeout : time .Duration (0 ) * time .Second ,
125+ ReadyTimeout : time .Duration (0 ) * time .Second ,
126+ DisconnectTimeout : time .Duration (0 ) * time .Second },
121127 }
122128
123129 if getBoolValueFromEnvVarOrDefault (envSessionDebug , false ) {
@@ -141,20 +147,46 @@ func NewSession(ctx context.Context, options ...func(session *SessionOptions)) (
141147 session .sessOpts .Address = getStringValueFromEnvVarOrDefault (envHostName , "localhost:1408" )
142148 }
143149
144- // if no timeout then use the env or default
145- if session .sessOpts .Timeout == time .Duration (0 ) {
146- timeoutString := getStringValueFromEnvVarOrDefault (envSessionTimeout , defaultSessionTimeout )
147- timeout , err := strconv .ParseInt (timeoutString , 10 , 64 )
148- if err != nil || timeout <= 0 {
149- return nil , fmt .Errorf ("invalid value of %s for timeout" , timeoutString )
150+ // if no request timeout then use the env or default
151+ if session .sessOpts .RequestTimeout == time .Duration (0 ) {
152+ timeout , err := getTimeoutValue (envRequestTimeout , defaultRequestTimeout , "request timeout" )
153+ if err != nil {
154+ return nil , err
150155 }
151- session .sessOpts .Timeout = time .Duration (timeout ) * time .Millisecond
156+ session .sessOpts .RequestTimeout = timeout
157+ }
158+
159+ // if no disconnect timeout then use the env or default
160+ if session .sessOpts .DisconnectTimeout == time .Duration (0 ) {
161+ timeout , err := getTimeoutValue (envDisconnectTimeout , defaultDisconnectTimeout , "disconnect timeout" )
162+ if err != nil {
163+ return nil , err
164+ }
165+ session .sessOpts .DisconnectTimeout = timeout
166+ }
167+
168+ // if no ready timeout then use the env or default
169+ if session .sessOpts .ReadyTimeout == time .Duration (0 ) {
170+ timeout , err := getTimeoutValue (envReadyTimeout , defaultReadyTimeout , "ready timeout" )
171+ if err != nil {
172+ return nil , err
173+ }
174+ session .sessOpts .ReadyTimeout = timeout
152175 }
153176
154177 // ensure initial connection
155178 return session , session .ensureConnection ()
156179}
157180
181+ func getTimeoutValue (envVar , defaultValue , description string ) (time.Duration , error ) {
182+ timeoutString := getStringValueFromEnvVarOrDefault (envVar , defaultValue )
183+ timeout , err := strconv .ParseInt (timeoutString , 10 , 64 )
184+ if err != nil || timeout < 0 {
185+ return 0 , fmt .Errorf ("invalid value of %s for %s" , timeoutString , description )
186+ }
187+ return time .Duration (timeout ) * time .Millisecond , nil
188+ }
189+
158190// WithAddress returns a function to set the address for session.
159191func WithAddress (host string ) func (sessionOptions * SessionOptions ) {
160192 return func (s * SessionOptions ) {
@@ -184,10 +216,27 @@ func WithPlainText() func(sessionOptions *SessionOptions) {
184216 }
185217}
186218
187- // WithSessionTimeout returns a function to set the session timeout.
188- func WithSessionTimeout (timeout time.Duration ) func (sessionOptions * SessionOptions ) {
219+ // WithRequestTimeout returns a function to set the request timeout in millis.
220+ func WithRequestTimeout (timeout time.Duration ) func (sessionOptions * SessionOptions ) {
221+ return func (s * SessionOptions ) {
222+ s .RequestTimeout = timeout
223+ }
224+ }
225+
226+ // WithDisconnectTimeout returns a function to set the maximum amount of time, in millis, a [Session]
227+ // may remain in a disconnected state without successfully reconnecting.
228+ func WithDisconnectTimeout (timeout time.Duration ) func (sessionOptions * SessionOptions ) {
189229 return func (s * SessionOptions ) {
190- s .Timeout = timeout
230+ s .DisconnectTimeout = timeout
231+ }
232+ }
233+
234+ // WithReadyTimeout returns a function to set the maximum amount of time an [NamedMap] or [NamedCache]
235+ // operations may wait for the underlying gRPC channel to be ready. This is independent of the request
236+ // timeout which sets a deadline on how long the call may take after being dispatched.
237+ func WithReadyTimeout (timeout time.Duration ) func (sessionOptions * SessionOptions ) {
238+ return func (s * SessionOptions ) {
239+ s .ReadyTimeout = timeout
191240 }
192241}
193242
@@ -217,16 +266,30 @@ func (s *Session) String() string {
217266 len (s .caches ), len (s .maps ), s .sessOpts )
218267}
219268
220- // GetSessionTimeout returns the session timeout in seconds.
221- func (s * Session ) GetSessionTimeout () time.Duration {
222- return s .sessOpts .Timeout
269+ // GetRequestTimeout returns the session timeout in millis.
270+ func (s * Session ) GetRequestTimeout () time.Duration {
271+ return s .sessOpts .RequestTimeout
272+ }
273+
274+ // GetDisconnectTimeout returns the session disconnect timeout in millis.
275+ func (s * Session ) GetDisconnectTimeout () time.Duration {
276+ return s .sessOpts .DisconnectTimeout
277+ }
278+
279+ // GetReadyTimeout returns the session disconnect timeout in millis.
280+ func (s * Session ) GetReadyTimeout () time.Duration {
281+ return s .sessOpts .ReadyTimeout
223282}
224283
225284// ensureConnection ensures a session has a valid connection
226285func (s * Session ) ensureConnection () error {
227286 if s .firstConnectAttempted {
228287 // We have previously tried to connect so check that the connect state is connected
229288 if s .conn .GetState () != connectivity .Ready {
289+ // if the readyTime is set, and we are not connected then block and wait for connection
290+ if s .GetReadyTimeout () != 0 {
291+ return waitForReady (s )
292+ }
230293 s .debug (fmt .Sprintf ("session: %s attempting connection to address %s" , s .sessionID , s .sessOpts .Address ))
231294 s .conn .Connect ()
232295 return nil
@@ -285,11 +348,11 @@ func (s *Session) ensureConnection() error {
285348 // refer: https://grpc.github.io/grpc/core/md_doc_connectivity-semantics-and-api.html
286349 go func (session * Session ) {
287350 var (
288- firstConnect = true
289- connected = false
290- ctx = context .Background ()
291- lastState = session .conn .GetState ()
292- disconnectTime int64 = 0
351+ firstConnect = true
352+ connected = false
353+ ctx = context .Background ()
354+ lastState = session .conn .GetState ()
355+ disconnectTime int64
293356 )
294357
295358 for {
@@ -341,8 +404,9 @@ func (s *Session) ensureConnection() error {
341404 disconnectTime = time .Now ().UnixMilli ()
342405 } else {
343406 waited := time .Now ().UnixMilli () - disconnectTime
344- if waited >= session .GetSessionTimeout ().Milliseconds () {
345- log .Printf ("session: %s unable to reconnect within [%s]. Closing session." , session .sessionID , session .GetSessionTimeout ().String ())
407+ if waited >= session .GetDisconnectTimeout ().Milliseconds () {
408+ log .Printf ("session: %s unable to reconnect within disconnect timeout of [%s]. Closing session." ,
409+ session .sessionID , session .GetDisconnectTimeout ().String ())
346410 session .Close ()
347411 return
348412 }
@@ -361,6 +425,38 @@ func (s *Session) ensureConnection() error {
361425 return nil
362426}
363427
428+ // waitForReady waits until the connection is ready up to the ready session timeout and will
429+ // return nil if the session was connected, otherwise an error is returned.
430+ // We intentionally do no use the gRPC WaitForReady as this can cause a race condition in the session
431+ // events code.
432+ func waitForReady (s * Session ) error {
433+ var (
434+ readyTimeout = s .GetReadyTimeout ()
435+ messageLogged = false
436+ )
437+
438+ // try to connect up until timeout, then throw err if not available
439+ timeout := time .Now ().Add (readyTimeout )
440+ for {
441+ if time .Now ().After (timeout ) {
442+ return fmt .Errorf ("unable to connect to %s after ready timeout of %v" , s .sessOpts .Address , readyTimeout )
443+ }
444+
445+ s .conn .Connect ()
446+
447+ time .Sleep (time .Duration (250 ) * time .Millisecond )
448+ state := s .conn .GetState ()
449+
450+ if state == connectivity .Ready {
451+ return nil
452+ }
453+ if ! messageLogged {
454+ log .Printf ("State is %v, waiting until ready timeout of %v for valid connection" , state , readyTimeout )
455+ messageLogged = true
456+ }
457+ }
458+ }
459+
364460// GetOptions returns the options that were passed during this session creation.
365461func (s * Session ) GetOptions () * SessionOptions {
366462 return s .sessOpts
@@ -530,8 +626,8 @@ func validateFilePath(file string) error {
530626// String returns a string representation of SessionOptions.
531627func (s * SessionOptions ) String () string {
532628 var sb = strings.Builder {}
533- sb .WriteString (fmt .Sprintf ("SessionOptions{address=%v, tLSEnabled=%v, scope=%v, format=%v, timeout=%v" ,
534- s .Address , s .TLSEnabled , s .Scope , s .Format , s .Timeout ))
629+ sb .WriteString (fmt .Sprintf ("SessionOptions{address=%v, tLSEnabled=%v, scope=%v, format=%v, request timeout=%v, disconnect timeout=%v, ready timeout=%v" ,
630+ s .Address , s .TLSEnabled , s .Scope , s .Format , s .RequestTimeout , s . DisconnectTimeout , s . ReadyTimeout ))
535631
536632 if s .TLSEnabled {
537633 sb .WriteString (fmt .Sprintf (" clientCertPath=%v, clientKeyPath=%v, caCertPath=%v," ,
@@ -556,8 +652,8 @@ func (s *Session) dispatch(eventType SessionLifecycleEventType,
556652// [SessionOptions].
557653func (s * Session ) ensureContext (ctx context.Context ) (context.Context , context.CancelFunc ) {
558654 if _ , ok := ctx .Deadline (); ! ok {
559- // no deadline set, so wrap the context in a Timeout
560- return context .WithTimeout (ctx , s .sessOpts .Timeout )
655+ // no deadline set, so wrap the context in a RequestTimeout
656+ return context .WithTimeout (ctx , s .sessOpts .RequestTimeout )
561657 }
562658 return ctx , nil
563659}
0 commit comments