Skip to content

Commit

Permalink
drainer: fix bug and tiny clean up (#124)
Browse files Browse the repository at this point in the history
* drainer: fix tiny bug and tiny clean
  • Loading branch information
IANTHEREAL authored Feb 28, 2017
1 parent c79d23f commit 92dd34f
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 15 deletions.
7 changes: 4 additions & 3 deletions drainer/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (c *Collector) publish(ctx context.Context, upper, lower int64) {

if lower > oldLower {
c.window.SaveLower(lower)
c.publishBinlogs(ctx, lower)
c.publishBinlogs(ctx, oldLower, lower)
windowGauge.WithLabelValues("lower").Set(float64(lower))
}
if upper > oldUpper {
Expand Down Expand Up @@ -252,15 +252,16 @@ func (c *Collector) LoadHistoryDDLJobs() ([]*model.Job, error) {
return jobs, nil
}

func (c *Collector) publishBinlogs(ctx context.Context, lower int64) {
// publishBinlogs collects binlogs whose commitTS are in (minTS, maxTS], then publish them in ascending commitTS order
func (c *Collector) publishBinlogs(ctx context.Context, minTS, maxTS int64) {
// multiple ways sort:
// 1. get multiple way sorted binlogs
// 2. use heap to merge sort
// todo: use multiple goroutines to collect sorted binlogs
bss := make(map[string]binlogItems)
binlogOffsets := make(map[string]int)
for id, p := range c.pumps {
bs := p.collectBinlogs(lower)
bs := p.collectBinlogs(minTS, maxTS)
if bs.Len() > 0 {
bss[id] = bs
binlogOffsets[id] = 1
Expand Down
12 changes: 2 additions & 10 deletions drainer/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (d *Executor) prepare(jobs []*model.Job) (*binlogItem, error) {
binlog := b.binlog
commitTS := binlog.GetCommitTs()
jobID := binlog.GetDdlJobId()
if commitTS < d.initCommitTS {
if commitTS <= d.initCommitTS {
if jobID > 0 {
latestSchemaVersion = b.job.BinlogInfo.SchemaVersion
}
Expand Down Expand Up @@ -478,15 +478,10 @@ func (d *Executor) run(b *binlogItem) error {
go d.sync(d.toDBs[i], d.jobCh[i])
}

maxCommitTS := d.initCommitTS
for {
binlog := b.binlog
commitTS := binlog.GetCommitTs()
jobID := binlog.GetDdlJobId()
if commitTS <= maxCommitTS {
continue
}
maxCommitTS = commitTS

if jobID == 0 {
preWriteValue := binlog.GetPrewriteValue()
Expand All @@ -507,10 +502,7 @@ func (d *Executor) run(b *binlogItem) error {

if d.skipDDL(schema, table) {
log.Debugf("[skip ddl]db:%s table:%s, sql:%s, commit ts %d, pos %v", schema, table, sql, commitTS, b.pos)
continue
}

if sql != "" {
} else if sql != "" {
sql, err = d.translator.GenDDLSQL(sql, schema)
if err != nil {
return errors.Trace(err)
Expand Down
7 changes: 5 additions & 2 deletions drainer/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,14 @@ func (p *Pump) getDDLJob(id int64) (*model.Job, error) {
return job, nil
}

func (p *Pump) collectBinlogs(maxTS int64) binlogItems {
func (p *Pump) collectBinlogs(minTS, maxTS int64) binlogItems {
var bs binlogItems
item := p.bh.pop()
for item != nil && item.binlog.CommitTs <= maxTS {
bs = append(bs, item)
// make sure to discard old binlogs whose commitTS is earlier or equal minTS
if item.binlog.CommitTs > minTS {
bs = append(bs, item)
}
// update pump's current position
if ComparePos(p.current, item.pos) == -1 {
p.current = item.pos
Expand Down

0 comments on commit 92dd34f

Please sign in to comment.