Skip to content

Commit 40b6494

Browse files
author
Tim Middleton
authored
Add session timeout (#34)
* Add session timeout
1 parent 811fd8f commit 40b6494

File tree

8 files changed

+307
-61
lines changed

8 files changed

+307
-61
lines changed

coherence/common.go

Lines changed: 164 additions & 39 deletions
Large diffs are not rendered by default.

coherence/doc.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,18 @@ Refer to the section on [NewSession] for more information on setting up a SSL co
6161
6262
See [SessionOptions] which lists all the options supported by the [Session] API.
6363
64+
# Controlling connection timeouts
65+
66+
Most operations you call require you to supply a [context.Context]. If your context does not contain a deadline,
67+
the operation will wrap your context in a new [context.WithTimeout] using either the default timeout of 30,000 millis or
68+
the value you set using option [coherence.WithSessionTimeout] when you called [NewSession].
69+
70+
For example, to override the default timeout of 30,000 millis with one of 5 seconds for a [Session] you can do the following:
71+
72+
session, err = coherence.NewSession(ctx, coherence.WithSessionTimeout(time.Duration(5) * time.Second))
73+
74+
You can also override the default timeout using the environment variable COHERENCE_SESSION_TIMEOUT.
75+
6476
# Obtaining a NamedMap or NamedCache
6577
6678
Once a session has been created, the [GetNamedMap](session, name, ...options) or [GetNamedCache](session, name, ...options)

coherence/iterator.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,14 @@ func (it *streamedKeyIterator[K, V]) getNextPage() error {
101101
return err
102102
}
103103

104+
newCtx, cancel := it.bc.session.ensureContext(it.ctx)
105+
if cancel != nil {
106+
defer cancel()
107+
}
108+
104109
request := &pb.PageRequest{Scope: it.bc.sessionOpts.Scope, Cache: it.bc.name, Format: it.bc.format, Cookie: it.cookie}
105110

106-
if client, err = it.bc.client.NextKeySetPage(it.ctx, request); err != nil {
111+
if client, err = it.bc.client.NextKeySetPage(newCtx, request); err != nil {
107112
return err
108113
}
109114

@@ -221,9 +226,14 @@ func (it *streamedEntryIterator[K, V]) getNextPage() error {
221226
return err
222227
}
223228

229+
newCtx, cancel := it.bc.session.ensureContext(it.ctx)
230+
if cancel != nil {
231+
defer cancel()
232+
}
233+
224234
request := &pb.PageRequest{Scope: it.bc.sessionOpts.Scope, Cache: it.bc.name, Format: it.bc.format, Cookie: it.cookie}
225235

226-
if client, err = it.bc.client.NextEntrySetPage(it.ctx, request); err != nil {
236+
if client, err = it.bc.client.NextEntrySetPage(newCtx, request); err != nil {
227237
return err
228238
}
229239

coherence/named_cache_client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,7 @@ func newNamedCache[K comparable, V any](session *Session, name string, sOpts *Se
575575
unlocked = true
576576
session.mutex.Unlock()
577577

578-
listener := newNamedCacheReconnectListener[K, V](session, *newCache)
578+
listener := newNamedCacheReconnectListener[K, V](*newCache)
579579
newCache.namedCacheReconnectListener = *listener
580580

581581
// unlock before adding reconnect listener
@@ -591,15 +591,15 @@ type namedCacheReconnectListener[K comparable, V any] struct {
591591
}
592592

593593
// newReconnectSessionListener creates a new namedCacheReconnectListener.
594-
func newNamedCacheReconnectListener[K comparable, V any](session *Session, nc NamedCacheClient[K, V]) *namedCacheReconnectListener[K, V] {
594+
func newNamedCacheReconnectListener[K comparable, V any](nc NamedCacheClient[K, V]) *namedCacheReconnectListener[K, V] {
595595
listener := namedCacheReconnectListener[K, V]{
596596
listener: NewSessionLifecycleListener(),
597597
}
598598

599599
listener.listener.OnReconnected(func(e SessionLifecycleEvent) {
600600
// re-register listeners for the NamedCache
601601
namedMap := convertNamedCacheClient[K, V](&nc)
602-
if err := reRegisterListeners[K, V](session.sessionConnectCtx, &namedMap, &nc.baseClient); err != nil {
602+
if err := reRegisterListeners[K, V](context.Background(), &namedMap, &nc.baseClient); err != nil {
603603
log.Println(err)
604604
}
605605
})

coherence/named_map_client.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -547,8 +547,6 @@ func (nm *NamedMapClient[K, V]) KeySetFilter(ctx context.Context, fltr filters.F
547547
// log.Fatal(err)
548548
// }
549549
//
550-
// iter := namedMap.KeySet(ctx)
551-
//
552550
// ch := namedMap.KeySet(ctx)
553551
// for result := range ch {
554552
// if result.Err != nil {
@@ -792,7 +790,7 @@ func newNamedMap[K comparable, V any](session *Session, name string, sOpts *Sess
792790
unlocked = true
793791
session.mutex.Unlock()
794792

795-
listener := newNamedMapReconnectListener[K, V](session, *newMap)
793+
listener := newNamedMapReconnectListener[K, V](*newMap)
796794
newMap.namedMapReconnectListener = *listener
797795

798796
// unlock before adding reconnect listener
@@ -808,15 +806,15 @@ type namedMapReconnectListener[K comparable, V any] struct {
808806
}
809807

810808
// newReconnectSessionListener creates new namedMapReconnectListener.
811-
func newNamedMapReconnectListener[K comparable, V any](session *Session, nm NamedMapClient[K, V]) *namedMapReconnectListener[K, V] {
809+
func newNamedMapReconnectListener[K comparable, V any](nm NamedMapClient[K, V]) *namedMapReconnectListener[K, V] {
812810
listener := namedMapReconnectListener[K, V]{
813811
listener: NewSessionLifecycleListener(),
814812
}
815813

816814
listener.listener.OnReconnected(func(e SessionLifecycleEvent) {
817815
// re-register listeners for the NamedMap
818816
namedMap := convertNamedMapClient[K, V](&nm)
819-
if err := reRegisterListeners[K, V](session.sessionConnectCtx, &namedMap, &nm.baseClient); err != nil {
817+
if err := reRegisterListeners[K, V](context.Background(), &namedMap, &nm.baseClient); err != nil {
820818
log.Println(err)
821819
}
822820
})

coherence/session.go

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,23 @@ import (
1919
"google.golang.org/grpc/credentials/insecure"
2020
"log"
2121
"os"
22+
"strconv"
2223
"strings"
2324
"sync"
25+
"time"
2426
)
2527

2628
// ErrInvalidFormat indicates that the serialization format can only be JSON.
2729
var ErrInvalidFormat = errors.New("format can only be 'json'")
2830

2931
const (
30-
defaultFormat = "json"
31-
mapOrCacheExists = "the %s %s already exists with different type parameters"
32+
defaultFormat = "json"
33+
mapOrCacheExists = "the %s %s already exists with different type parameters"
34+
defaultSessionTimeout = "30000" // millis
3235
)
3336

34-
// Session provides APIs to create NamedCaches. The NewSession() method creates a
35-
// new instance of a Session. This method also takes a variable number of arguments, called options,
37+
// Session provides APIs to create NamedCaches. The [NewSession] method creates a
38+
// new instance of a [Session]. This method also takes a variable number of arguments, called options,
3639
// that can be passed to configure the Session.
3740
type Session struct {
3841
sessionID uuid.UUID
@@ -60,6 +63,7 @@ type SessionOptions struct {
6063
ClientKeyPath string
6164
CaCertPath string
6265
PlainText bool
66+
Timeout time.Duration
6367
}
6468

6569
// NewSession creates a new Session with the specified sessionOptions.
@@ -94,8 +98,8 @@ type SessionOptions struct {
9498
// export COHERENCE_TLS_CERTS_PATH=/path/to/cert/to/be/added/for/trust
9599
// export COHERENCE_IGNORE_INVALID_CERTS=true // option to ignore self-signed certificates - for testing only. Not to be used in production
96100
//
97-
// Finally, the Close() method can be used to close the Session. Once a Session is closed, no APIs
98-
// on the NamedMap instances should be invoked. If invoked they all will return an error.
101+
// Finally, the Close() method can be used to close the [Session]. Once a [Session] is closed, no APIs
102+
// on the [NamedMap] instances should be invoked. If invoked they will return an error.
99103
// [gRPC Naming]: https://github.com/grpc/grpc/blob/master/doc/naming.md
100104
// [gRPC Proxy Server]: https://docs.oracle.com/en/middleware/standalone/coherence/14.1.1.2206/develop-remote-clients/using-coherence-grpc-server.html
101105
func NewSession(ctx context.Context, options ...func(session *SessionOptions)) (*Session, error) {
@@ -111,7 +115,8 @@ func NewSession(ctx context.Context, options ...func(session *SessionOptions)) (
111115
lifecycleListeners: []*SessionLifecycleListener{},
112116
sessOpts: &SessionOptions{
113117
PlainText: false,
114-
Format: defaultFormat},
118+
Format: defaultFormat,
119+
Timeout: time.Duration(0) * time.Second},
115120
}
116121

117122
if getBoolValueFromEnvVarOrDefault(envSessionDebug, false) {
@@ -135,9 +140,18 @@ func NewSession(ctx context.Context, options ...func(session *SessionOptions)) (
135140
session.sessOpts.Address = getStringValueFromEnvVarOrDefault(envHostName, "localhost:1408")
136141
}
137142

143+
// if no timeout then use the env or default
144+
if session.sessOpts.Timeout == time.Duration(0) {
145+
timeoutString := getStringValueFromEnvVarOrDefault(envSessionTimeout, defaultSessionTimeout)
146+
timeout, err := strconv.ParseInt(timeoutString, 10, 64)
147+
if err != nil || timeout <= 0 {
148+
return nil, fmt.Errorf("invalid value of %s for timeout", timeoutString)
149+
}
150+
session.sessOpts.Timeout = time.Duration(timeout) * time.Millisecond
151+
}
152+
138153
// ensure initial connection
139-
err := session.ensureConnection()
140-
return session, err
154+
return session, session.ensureConnection()
141155
}
142156

143157
// WithAddress returns a function to set the address for session.
@@ -169,6 +183,13 @@ func WithPlainText() func(sessionOptions *SessionOptions) {
169183
}
170184
}
171185

186+
// WithSessionTimeout returns a function to set the session timeout.
187+
func WithSessionTimeout(timeout time.Duration) func(sessionOptions *SessionOptions) {
188+
return func(s *SessionOptions) {
189+
s.Timeout = timeout
190+
}
191+
}
192+
172193
// ID returns the identifier of a session.
173194
func (s *Session) ID() string {
174195
return s.sessionID.String()
@@ -190,6 +211,11 @@ func (s *Session) String() string {
190211
len(s.caches), len(s.maps), s.sessOpts)
191212
}
192213

214+
// GetSessionTimeout returns the session timeout in seconds.
215+
func (s *Session) GetSessionTimeout() time.Duration {
216+
return s.sessOpts.Timeout
217+
}
218+
193219
// ensureConnection ensures a session has a valid connection
194220
func (s *Session) ensureConnection() error {
195221
if s.firstConnectAttempted {
@@ -222,7 +248,12 @@ func (s *Session) ensureConnection() error {
222248
s.mutex.Lock()
223249
locked = true
224250

225-
conn, err := grpc.DialContext(s.sessionConnectCtx, s.sessOpts.Address, s.dialOptions...)
251+
newCtx, cancel := s.ensureContext(s.sessionConnectCtx)
252+
if cancel != nil {
253+
defer cancel()
254+
}
255+
256+
conn, err := grpc.DialContext(newCtx, s.sessOpts.Address, s.dialOptions...)
226257

227258
if err != nil {
228259
log.Printf("could not connect. Reason: %v", err)
@@ -469,8 +500,8 @@ func validateFilePath(file string) error {
469500
// String returns a string representation of SessionOptions.
470501
func (s *SessionOptions) String() string {
471502
var sb = strings.Builder{}
472-
sb.WriteString(fmt.Sprintf("SessionOptions{address=%v, tLSEnabled=%v, scope=%v, format=%v,",
473-
s.Address, s.TLSEnabled, s.Scope, s.Format))
503+
sb.WriteString(fmt.Sprintf("SessionOptions{address=%v, tLSEnabled=%v, scope=%v, format=%v, timeout=%v",
504+
s.Address, s.TLSEnabled, s.Scope, s.Format, s.Timeout))
474505

475506
if s.TLSEnabled {
476507
sb.WriteString(fmt.Sprintf(" clientCertPath=%v, clientKeyPath=%v, caCertPath=%v,",
@@ -490,3 +521,13 @@ func (s *Session) dispatch(eventType SessionLifecycleEventType,
490521
}
491522
}
492523
}
524+
525+
// ensureContext will ensure that the context has deadline and if not will apply the timeout from the
526+
// [SessionOptions].
527+
func (s *Session) ensureContext(ctx context.Context) (context.Context, context.CancelFunc) {
528+
if _, ok := ctx.Deadline(); !ok {
529+
// no deadline set, so wrap the context in a Timeout
530+
return context.WithTimeout(ctx, s.sessOpts.Timeout)
531+
}
532+
return ctx, nil
533+
}

coherence/session_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ package coherence
99
import (
1010
"context"
1111
"github.com/onsi/gomega"
12+
"strconv"
1213
"testing"
14+
"time"
1315
)
1416

1517
func TestSessionValidation(t *testing.T) {
@@ -21,4 +23,15 @@ func TestSessionValidation(t *testing.T) {
2123

2224
_, err = NewSession(ctx, WithFormat("not-json"))
2325
g.Expect(err).To(gomega.Equal(ErrInvalidFormat))
26+
27+
// test default timeout
28+
timeout, _ := strconv.ParseInt(defaultSessionTimeout, 10, 64)
29+
s, err := NewSession(ctx)
30+
g.Expect(err).To(gomega.Not(gomega.HaveOccurred()))
31+
g.Expect(s.sessOpts.Timeout).To(gomega.Equal(time.Duration(timeout) * time.Millisecond))
32+
33+
// test setting a timeout
34+
s, err = NewSession(ctx, WithSessionTimeout(time.Duration(33)*time.Millisecond))
35+
g.Expect(err).To(gomega.Not(gomega.HaveOccurred()))
36+
g.Expect(s.sessOpts.Timeout).To(gomega.Equal(time.Duration(33) * time.Millisecond))
2437
}

test/e2e/standalone/named_map_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,53 @@ func TestBasicCrudOperationsVariousTypes(t *testing.T) {
9999
map[int]string{1: "one", 2: "two", 3: "three"})
100100
}
101101

102+
func TestSessionWithSpecifiedTimeout(t *testing.T) {
103+
var (
104+
g = gomega.NewWithT(t)
105+
err error
106+
session *coherence.Session
107+
)
108+
109+
session, err = GetSession(coherence.WithSessionTimeout(time.Duration(10) * time.Second))
110+
g.Expect(err).ShouldNot(gomega.HaveOccurred())
111+
defer session.Close()
112+
113+
runTimeoutTest(g, session)
114+
}
115+
116+
func TestSessionWithEnvTimeout(t *testing.T) {
117+
var (
118+
g = gomega.NewWithT(t)
119+
err error
120+
session *coherence.Session
121+
)
122+
123+
t.Setenv("COHERENCE_SESSION_TIMEOUT", "10000")
124+
125+
session, err = GetSession()
126+
g.Expect(err).ShouldNot(gomega.HaveOccurred())
127+
defer session.Close()
128+
129+
runTimeoutTest(g, session)
130+
}
131+
132+
func runTimeoutTest(g *gomega.WithT, session *coherence.Session) {
133+
// we should get an error as we should be > default timeout
134+
namedMap := getNewNamedMap[int, string](g, session, "timeout-map")
135+
err := namedMap.Clear(ctx)
136+
g.Expect(err).ShouldNot(gomega.HaveOccurred())
137+
138+
namedCache := getNewNamedCache[int, string](g, session, "timeout-cache")
139+
err = namedCache.Clear(ctx)
140+
g.Expect(err).ShouldNot(gomega.HaveOccurred())
141+
142+
// create a new context with an existing deadline, it should be honored
143+
ctxNew, cancel := context.WithTimeout(ctx, time.Duration(1)*time.Nanosecond)
144+
defer cancel()
145+
err = namedCache.Clear(ctxNew)
146+
g.Expect(err).Should(gomega.HaveOccurred())
147+
}
148+
102149
// TestBasicCrudOperationsVariousTypesWithStructKey tests operations against caches that have keys and values as structs.
103150
func TestBasicCrudOperationsVariousTypesWithStructKey(t *testing.T) {
104151
var (

0 commit comments

Comments
 (0)