diff --git a/drainer/collector.go b/drainer/collector.go index 3487d0968..9cdd2ca71 100644 --- a/drainer/collector.go +++ b/drainer/collector.go @@ -209,7 +209,7 @@ func (c *Collector) publish(ctx context.Context, upper, lower int64) { if lower > oldLower { c.window.SaveLower(lower) - c.publishBinlogs(ctx, lower) + c.publishBinlogs(ctx, oldLower, lower) windowGauge.WithLabelValues("lower").Set(float64(lower)) } if upper > oldUpper { @@ -252,7 +252,8 @@ func (c *Collector) LoadHistoryDDLJobs() ([]*model.Job, error) { return jobs, nil } -func (c *Collector) publishBinlogs(ctx context.Context, lower int64) { +// publishBinlogs collects binlogs whose commitTS are in (minTS, maxTS], then publish them in ascending commitTS order +func (c *Collector) publishBinlogs(ctx context.Context, minTS, maxTS int64) { // multiple ways sort: // 1. get multiple way sorted binlogs // 2. use heap to merge sort @@ -260,7 +261,7 @@ func (c *Collector) publishBinlogs(ctx context.Context, lower int64) { bss := make(map[string]binlogItems) binlogOffsets := make(map[string]int) for id, p := range c.pumps { - bs := p.collectBinlogs(lower) + bs := p.collectBinlogs(minTS, maxTS) if bs.Len() > 0 { bss[id] = bs binlogOffsets[id] = 1 diff --git a/drainer/executor.go b/drainer/executor.go index ae7c7688e..6b21b12d6 100644 --- a/drainer/executor.go +++ b/drainer/executor.go @@ -108,7 +108,7 @@ func (d *Executor) prepare(jobs []*model.Job) (*binlogItem, error) { binlog := b.binlog commitTS := binlog.GetCommitTs() jobID := binlog.GetDdlJobId() - if commitTS < d.initCommitTS { + if commitTS <= d.initCommitTS { if jobID > 0 { latestSchemaVersion = b.job.BinlogInfo.SchemaVersion } @@ -478,15 +478,10 @@ func (d *Executor) run(b *binlogItem) error { go d.sync(d.toDBs[i], d.jobCh[i]) } - maxCommitTS := d.initCommitTS for { binlog := b.binlog commitTS := binlog.GetCommitTs() jobID := binlog.GetDdlJobId() - if commitTS <= maxCommitTS { - continue - } - maxCommitTS = commitTS if jobID == 0 { preWriteValue := binlog.GetPrewriteValue() @@ -507,10 +502,7 @@ func (d *Executor) run(b *binlogItem) error { if d.skipDDL(schema, table) { log.Debugf("[skip ddl]db:%s table:%s, sql:%s, commit ts %d, pos %v", schema, table, sql, commitTS, b.pos) - continue - } - - if sql != "" { + } else if sql != "" { sql, err = d.translator.GenDDLSQL(sql, schema) if err != nil { return errors.Trace(err) diff --git a/drainer/pump.go b/drainer/pump.go index 30eb416ea..e6fdd50d5 100644 --- a/drainer/pump.go +++ b/drainer/pump.go @@ -338,11 +338,14 @@ func (p *Pump) getDDLJob(id int64) (*model.Job, error) { return job, nil } -func (p *Pump) collectBinlogs(maxTS int64) binlogItems { +func (p *Pump) collectBinlogs(minTS, maxTS int64) binlogItems { var bs binlogItems item := p.bh.pop() for item != nil && item.binlog.CommitTs <= maxTS { - bs = append(bs, item) + // make sure to discard old binlogs whose commitTS is earlier or equal minTS + if item.binlog.CommitTs > minTS { + bs = append(bs, item) + } // update pump's current position if ComparePos(p.current, item.pos) == -1 { p.current = item.pos