Skip to content

Commit ed5ab2c

Browse files
Fix test synchronization and remove time.Sleep dependencies
- Add testOnlyInitialReadyDone channel for proper test synchronization - Signal when monitoring goroutine processes initial READY state - Tests wait for this signal instead of using time.Sleep - All synchronization now uses channels/callbacks - no arbitrary sleeps - Tests pass consistently with race detector Addresses review feedback about removing time.Sleep for state transitions.
1 parent 943240d commit ed5ab2c

File tree

2 files changed

+38
-56
lines changed

2 files changed

+38
-56
lines changed

balancer/rls/control_channel.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,18 +63,22 @@ type controlChannel struct {
6363
connectivityStateCh *buffer.Unbounded
6464
unsubscribe func()
6565
monitorDoneCh chan struct{}
66+
// testOnlyInitialReadyDone is closed when the monitoring goroutine
67+
// processes the initial READY state. Only used in tests.
68+
testOnlyInitialReadyDone chan struct{}
6669
}
6770

6871
// newControlChannel creates a controlChannel to rlsServerName and uses
6972
// serviceConfig, if non-empty, as the default service config for the underlying
7073
// gRPC channel.
7174
func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) {
7275
ctrlCh := &controlChannel{
73-
rpcTimeout: rpcTimeout,
74-
backToReadyFunc: backToReadyFunc,
75-
throttler: newAdaptiveThrottler(),
76-
connectivityStateCh: buffer.NewUnbounded(),
77-
monitorDoneCh: make(chan struct{}),
76+
rpcTimeout: rpcTimeout,
77+
backToReadyFunc: backToReadyFunc,
78+
throttler: newAdaptiveThrottler(),
79+
connectivityStateCh: buffer.NewUnbounded(),
80+
monitorDoneCh: make(chan struct{}),
81+
testOnlyInitialReadyDone: make(chan struct{}),
7882
}
7983
ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh))
8084

@@ -187,6 +191,9 @@ func (cc *controlChannel) monitorConnectivityState() {
187191
cc.connectivityStateCh.Load()
188192
cc.logger.Infof("Connectivity state is READY")
189193

194+
// Signal tests that initial READY has been processed
195+
close(cc.testOnlyInitialReadyDone)
196+
190197
// Track whether we've seen TRANSIENT_FAILURE since the last READY state.
191198
// We only want to reset backoff when recovering from an actual failure,
192199
// not when transitioning through benign states like IDLE.

balancer/rls/control_channel_test.go

Lines changed: 26 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -520,17 +520,14 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) {
520520
// Setup callback to count invocations
521521
var mu sync.Mutex
522522
var callbackCount int
523-
// Buffered channel to collect callback invocations without blocking
524-
callbackInvoked := make(chan struct{}, tt.wantCallbackCount+5)
523+
// Buffered channel large enough to never block
524+
callbackInvoked := make(chan struct{}, 100)
525525
callback := func() {
526526
mu.Lock()
527527
callbackCount++
528528
mu.Unlock()
529-
// Non-blocking send - if channel is full, we still counted it
530-
select {
531-
case callbackInvoked <- struct{}{}:
532-
default:
533-
}
529+
// Send to channel - should never block with large buffer
530+
callbackInvoked <- struct{}{}
534531
}
535532

536533
// Create control channel
@@ -540,57 +537,35 @@ func (s) TestControlChannelConnectivityStateTransitions(t *testing.T) {
540537
}
541538
defer ctrlCh.close()
542539

543-
// Wait for initial READY state using state change notifications
544-
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
545-
defer cancel()
546-
547-
readyCh := make(chan struct{})
548-
go func() {
549-
for {
550-
state := ctrlCh.cc.GetState()
551-
if state == connectivity.Ready {
552-
close(readyCh)
553-
return
554-
}
555-
if !ctrlCh.cc.WaitForStateChange(ctx, state) {
556-
return
557-
}
558-
}
559-
}()
560-
540+
// Wait for the monitoring goroutine to process the initial READY state
541+
// before injecting test states. This ensures our injected states are
542+
// processed in the main monitoring loop, not consumed during initialization.
561543
select {
562-
case <-readyCh:
563-
// Initial READY state achieved
564-
case <-ctx.Done():
565-
t.Fatal("Timeout waiting for initial READY state")
544+
case <-ctrlCh.testOnlyInitialReadyDone:
545+
// Initial READY processed by monitoring goroutine
546+
case <-time.After(defaultTestTimeout):
547+
t.Fatal("Timeout waiting for monitoring goroutine to process initial READY state")
566548
}
567549

568-
// Process states sequentially, waiting for callbacks when expected
569-
seenTransientFailure := false
570-
expectedCallbacks := 0
571-
550+
// Inject all test states
572551
for _, state := range tt.states {
573-
// Inject the state
574552
ctrlCh.OnMessage(state)
553+
}
575554

576-
// Track if we're in a failure state
577-
if state == connectivity.TransientFailure {
578-
seenTransientFailure = true
579-
}
555+
// Wait for all expected callbacks with timeout
556+
callbackTimeout := time.NewTimer(defaultTestTimeout)
557+
defer callbackTimeout.Stop()
580558

581-
// If transitioning to READY after a failure, wait for callback
582-
if state == connectivity.Ready && seenTransientFailure {
583-
expectedCallbacks++
584-
select {
585-
case <-callbackInvoked:
586-
// Callback received as expected
587-
seenTransientFailure = false
588-
case <-time.After(defaultTestTimeout):
589-
mu.Lock()
590-
got := callbackCount
591-
mu.Unlock()
592-
t.Fatalf("Timeout waiting for callback %d/%d after TRANSIENT_FAILURE→READY (got %d callbacks so far)", expectedCallbacks, tt.wantCallbackCount, got)
593-
}
559+
receivedCallbacks := 0
560+
for receivedCallbacks < tt.wantCallbackCount {
561+
select {
562+
case <-callbackInvoked:
563+
receivedCallbacks++
564+
case <-callbackTimeout.C:
565+
mu.Lock()
566+
got := callbackCount
567+
mu.Unlock()
568+
t.Fatalf("Timeout waiting for callbacks: expected %d, received %d via channel, callback count is %d", tt.wantCallbackCount, receivedCallbacks, got)
594569
}
595570
}
596571

0 commit comments

Comments
 (0)