diff --git a/pkg/syncer/server.go b/pkg/syncer/server.go index 549ef1da619..d7daefc6f5b 100644 --- a/pkg/syncer/server.go +++ b/pkg/syncer/server.go @@ -80,12 +80,11 @@ type RegionSyncer struct { clientCtx context.Context clientCancel context.CancelFunc } - broadcastDone chan struct{} - server Server - wg sync.WaitGroup - history *historyBuffer - limit *ratelimit.RateLimiter - tlsConfig *grpcutil.TLSConfig + server Server + wg sync.WaitGroup + history *historyBuffer + limit *ratelimit.RateLimiter + tlsConfig *grpcutil.TLSConfig // status when as client streamingRunning atomic.Bool } @@ -98,11 +97,10 @@ func NewRegionSyncer(s Server) *RegionSyncer { return nil } syncer := &RegionSyncer{ - server: s, - history: newHistoryBuffer(defaultHistoryBufferSize, regionStorage.(kv.Base)), - limit: ratelimit.NewRateLimiter(defaultBucketRate, defaultBucketCapacity), - tlsConfig: s.GetTLSConfig(), - broadcastDone: make(chan struct{}, 1), + server: s, + history: newHistoryBuffer(defaultHistoryBufferSize, regionStorage.(kv.Base)), + limit: ratelimit.NewRateLimiter(defaultBucketRate, defaultBucketCapacity), + tlsConfig: s.GetTLSConfig(), } syncer.mu.streams = make(map[string]ServerStream) return syncer @@ -348,6 +346,7 @@ func (s *RegionSyncer) bindStream(name string, stream ServerStream) { } func (s *RegionSyncer) broadcast(ctx context.Context, regions *pdpb.SyncRegionResponse) { + broadcastDone := make(chan struct{}, 1) go func() { defer logutil.LogPanic() var failed []string @@ -368,10 +367,10 @@ func (s *RegionSyncer) broadcast(ctx context.Context, regions *pdpb.SyncRegionRe } s.mu.Unlock() } - s.broadcastDone <- struct{}{} + close(broadcastDone) }() select { - case <-s.broadcastDone: + case <-broadcastDone: case <-ctx.Done(): } }