diff --git a/drainer/executor.go b/drainer/executor.go index feb37cd58..ae7c7688e 100644 --- a/drainer/executor.go +++ b/drainer/executor.go @@ -478,10 +478,15 @@ func (d *Executor) run(b *binlogItem) error { go d.sync(d.toDBs[i], d.jobCh[i]) } + maxCommitTS := d.initCommitTS for { binlog := b.binlog commitTS := binlog.GetCommitTs() jobID := binlog.GetDdlJobId() + if commitTS <= maxCommitTS { + continue + } + maxCommitTS = commitTS if jobID == 0 { preWriteValue := binlog.GetPrewriteValue() diff --git a/drainer/pump.go b/drainer/pump.go index 9f1ce7240..30eb416ea 100644 --- a/drainer/pump.go +++ b/drainer/pump.go @@ -182,7 +182,7 @@ func (p *Pump) mustFindCommitBinlog(t *tikv.LockResolver, startTS int64) { default: } - b, ok := p.getPrewriteBinlogEntity(startTS) + b, ok := p.getPrewriteBinlogEntity(startTS) if ok { time.Sleep(waitTime) // check again after sleep a moment