From c79d23f82b479e6ce7cc1f91a18695da919dd5c5 Mon Sep 17 00:00:00 2001 From: shady Date: Mon, 27 Feb 2017 16:54:11 +0800 Subject: [PATCH] drainer: make sure to sync binlog in ascending commitTS order (#123) * drainer: make sure to sync binlog in ascending commitTS order --- drainer/executor.go | 5 +++++ drainer/pump.go | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/drainer/executor.go b/drainer/executor.go index feb37cd58..ae7c7688e 100644 --- a/drainer/executor.go +++ b/drainer/executor.go @@ -478,10 +478,15 @@ func (d *Executor) run(b *binlogItem) error { go d.sync(d.toDBs[i], d.jobCh[i]) } + maxCommitTS := d.initCommitTS for { binlog := b.binlog commitTS := binlog.GetCommitTs() jobID := binlog.GetDdlJobId() + if commitTS <= maxCommitTS { + continue + } + maxCommitTS = commitTS if jobID == 0 { preWriteValue := binlog.GetPrewriteValue() diff --git a/drainer/pump.go b/drainer/pump.go index 9f1ce7240..30eb416ea 100644 --- a/drainer/pump.go +++ b/drainer/pump.go @@ -182,7 +182,7 @@ func (p *Pump) mustFindCommitBinlog(t *tikv.LockResolver, startTS int64) { default: } - b, ok := p.getPrewriteBinlogEntity(startTS) + b, ok := p.getPrewriteBinlogEntity(startTS) if ok { time.Sleep(waitTime) // check again after sleep a moment