@@ -21,6 +21,7 @@ import (
21
21
"fmt"
22
22
"regexp"
23
23
"strconv"
24
+ "sync/atomic"
24
25
"testing"
25
26
"time"
26
27
@@ -579,7 +580,7 @@ func TestStreamRowsHeartbeat(t *testing.T) {
579
580
ctx , cancel := context .WithTimeout (context .Background (), 1 * time .Second )
580
581
defer cancel ()
581
582
582
- heartbeatCount := 0
583
+ var heartbeatCount int32
583
584
dataReceived := false
584
585
585
586
var options binlogdatapb.VStreamOptions
@@ -589,9 +590,9 @@ func TestStreamRowsHeartbeat(t *testing.T) {
589
590
590
591
err := engine .StreamRows (ctx , "select * from t1" , nil , func (rows * binlogdatapb.VStreamRowsResponse ) error {
591
592
if rows .Heartbeat {
592
- heartbeatCount ++
593
+ atomic . AddInt32 ( & heartbeatCount , 1 )
593
594
// After receiving at least 3 heartbeats, we can be confident the fix is working
594
- if heartbeatCount >= 3 {
595
+ if atomic . LoadInt32 ( & heartbeatCount ) >= 3 {
595
596
cancel ()
596
597
return nil
597
598
}
@@ -616,7 +617,7 @@ func TestStreamRowsHeartbeat(t *testing.T) {
616
617
// This is the critical test: we should receive multiple heartbeats
617
618
// Without the fix (missing for loop), we would only get 1 heartbeat
618
619
// With the fix, we should get at least 3 heartbeats
619
- if heartbeatCount < 3 {
620
+ if atomic . LoadInt32 ( & heartbeatCount ) < 3 {
620
621
t .Errorf ("expected at least 3 heartbeats, got %d. This indicates the heartbeat goroutine is not running continuously" , heartbeatCount )
621
622
}
622
623
}
0 commit comments