Skip to content

Commit d776c7e

Browse files
author
Tim Middleton
authored
Fix race conditions (#51)
* Fix race conditions * updates
1 parent e4287ca commit d776c7e

File tree

5 files changed

+15
-11
lines changed

5 files changed

+15
-11
lines changed

coherence/event.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func newEventEmitter[L comparable, E any]() *eventEmitter[L, E] {
100100
}
101101
}
102102

103-
// on register a callback to be notified when an event associated with
103+
// on registers a callback to be notified when an event associated with
104104
// the specified label is raised.
105105
func (ee *eventEmitter[L, E]) on(label L, callback func(E)) {
106106
cbs, present := ee.callbacks[label]

coherence/session.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,17 +285,22 @@ func (s *Session) ID() string {
285285

286286
// Close closes a connection.
287287
func (s *Session) Close() {
288+
s.mutex.Lock()
288289
if !s.closed {
289290
s.maps = make(map[string]interface{}, 0)
290291
s.caches = make(map[string]interface{}, 0)
291292
err := s.conn.Close()
292293
s.closed = true
294+
295+
s.mutex.Unlock()
293296
s.dispatch(Closed, func() SessionLifecycleEvent {
294297
return newSessionLifecycleEvent(s, Closed)
295298
})
296299
if err != nil {
297-
log.Printf("Unable to close session %s %v", s.sessionID, err)
300+
log.Printf("unable to close session %s %v", s.sessionID, err)
298301
}
302+
} else {
303+
defer s.mutex.Unlock()
299304
}
300305
}
301306

@@ -697,8 +702,7 @@ func (s *SessionOptions) String() string {
697702
return sb.String()
698703
}
699704

700-
func (s *Session) dispatch(eventType SessionLifecycleEventType,
701-
creator func() SessionLifecycleEvent) {
705+
func (s *Session) dispatch(eventType SessionLifecycleEventType, creator func() SessionLifecycleEvent) {
702706
if len(s.lifecycleListeners) > 0 {
703707
event := creator()
704708
for _, l := range s.lifecycleListeners {

examples/basic/expiry/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,5 @@ func main() {
6060
if value, err = namedCache.Get(ctx, 1); err != nil {
6161
panic(err)
6262
}
63-
fmt.Printf("Value for key 1 is %v, should be nil pointer as entry longer exists\n", value)
63+
fmt.Printf("Value for key 1 is %v, should be nil pointer as entry no longer exists\n", value)
6464
}

examples/basic/expiry_cache/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func main() {
5353
if value, err = namedCache.Get(ctx, 1); err != nil {
5454
panic(err)
5555
}
56-
fmt.Printf("Value for key 1 is %v, should be nil pointer as entry longer exists\n", value)
56+
fmt.Printf("Value for key 1 is %v, should be nil pointer as entry no longer exists\n", value)
5757

5858
// If we do call PutWithExpiry, this expiry value will override the default
5959
fmt.Println("Issue PutWithExpiry key 1, value \"one\" with expiry 10 seconds")

test/e2e/standalone/event_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,13 @@ func TestMapAndLifecycleEventsAll(t *testing.T) {
104104
runBasicTests(g, namedMap, namedMap.Name(), &expected, -1)
105105
}
106106

107+
// TestEventDisconnect tests to ensure that if we get a disconnect, then we can
107108
func TestEventDisconnect(t *testing.T) {
108109
t.Setenv("COHERENCE_SESSION_DEBUG", "true")
109110
//g, session := initTest(t)
110-
g, session := initTest(t, coherence.WithDisconnectTimeout(time.Duration(130000)*time.Millisecond))
111+
g, session := initTest(t,
112+
coherence.WithDisconnectTimeout(time.Duration(130000)*time.Millisecond),
113+
coherence.WithReadyTimeout(time.Duration(70)*time.Second))
111114
defer session.Close()
112115

113116
namedCache := GetNamedCache[string, string](g, session, "test-reconnect-cache")
@@ -119,7 +122,7 @@ func TestEventDisconnect(t *testing.T) {
119122
}
120123

121124
// TestEventDisconnectWithReadyTimeoutDelay tests that the ready timeout is honoured,
122-
// as we have stopped the gRPC proxy.
125+
// as we have stopped the gRPC proxy before the test runs.
123126
func TestEventDisconnectWithReadyTimeoutDelay(t *testing.T) {
124127
t.Setenv("COHERENCE_SESSION_DEBUG", "true")
125128

@@ -275,9 +278,6 @@ func RunTestReconnect(g *gomega.WithT, namedMap coherence.NamedMap[string, strin
275278
fmt.Println("Issue stop of $GRPC:GrpcProxy")
276279
_, err = IssuePostRequest("http://127.0.0.1:30000/management/coherence/cluster/services/$GRPC:GrpcProxy/members/1/stop")
277280
g.Expect(err).ShouldNot(gomega.HaveOccurred())
278-
279-
// sleep for 70 seconds to give the shutdown time to take effect
280-
Sleep(70)
281281
}
282282

283283
// add another 'additional' mutations

0 commit comments

Comments
 (0)