Skip to content

Commit

Permalink
drainer: Fix #724, Enable drainer to purge old incremental backup dat…
Browse files Browse the repository at this point in the history
…a on disk (#885)
  • Loading branch information
suzaku authored Feb 7, 2020
1 parent d42bd56 commit ff59e72
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 47 deletions.
3 changes: 2 additions & 1 deletion cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ port = 3306
#[syncer.to]
# directory to save binlog file, default same as data-dir(save checkpoint file) if this is not configured.
# dir = "data.drainer"
#
# retention-time = 7


# when db-type is kafka, you can uncomment this to config the down stream kafka, it will be the globle config kafka default
Expand All @@ -139,7 +141,6 @@ port = 3306
# kafka-version = "0.8.2.0"
# kafka-max-messages = 1024
#
#
# the topic name drainer will push msg, the default name is <cluster-id>_obinlog
# be careful don't use the same name if run multi drainer instances
# topic-name = ""
2 changes: 1 addition & 1 deletion drainer/relay/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (r *relayer) WriteBinlog(schema string, table string, tiBinlog *tb.Binlog,
func (r *relayer) GCBinlog(pos tb.Pos) {
// If the file suffix increases, it means previous files are useless.
if pos.Suffix > r.nextGCFileSuffix {
r.binlogger.GC(0, pos)
r.binlogger.GCByPos(pos)
r.nextGCFileSuffix = pos.Suffix
}
}
Expand Down
35 changes: 32 additions & 3 deletions drainer/sync/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
package sync

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/pingcap/tidb-binlog/pkg/binlogfile"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
Expand All @@ -24,21 +28,44 @@ import (
var _ Syncer = &pbSyncer{}

type pbSyncer struct {
binlogger binlogfile.Binlogger

*baseSyncer

binlogger binlogfile.Binlogger
cancel func()
}

// NewPBSyncer sync binlog to files
func NewPBSyncer(dir string, tableInfoGetter translator.TableInfoGetter) (*pbSyncer, error) {
func NewPBSyncer(dir string, retentionDays int, tableInfoGetter translator.TableInfoGetter) (*pbSyncer, error) {
binlogger, err := binlogfile.OpenBinlogger(dir, binlogfile.SegmentSizeBytes)
if err != nil {
return nil, errors.Trace(err)
}

ctx, cancel := context.WithCancel(context.TODO())

s := &pbSyncer{
binlogger: binlogger,
baseSyncer: newBaseSyncer(tableInfoGetter),
cancel: cancel,
}

if retentionDays > 0 {
// TODO: Add support for human readable format input of times like "7d", "12h"
retentionTime := time.Duration(retentionDays) * 24 * time.Hour
ticker := time.NewTicker(time.Hour)
go func() {
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("Binlog GC loop stopped")
return
case <-ticker.C:
log.Info("Trying to GC binlog files")
binlogger.GCByTime(retentionTime)
}
}
}()
}

return s, nil
Expand Down Expand Up @@ -71,6 +98,8 @@ func (p *pbSyncer) saveBinlog(binlog *pb.Binlog) error {
}

func (p *pbSyncer) Close() error {
p.cancel()

err := p.binlogger.Close()
p.setErr(err)
close(p.success)
Expand Down
13 changes: 7 additions & 6 deletions drainer/sync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ type syncerSuite struct {
func (s *syncerSuite) SetUpTest(c *check.C) {
var infoGetter translator.TableInfoGetter
cfg := &DBConfig{
Host: "localhost",
User: "root",
Password: "",
Port: 3306,
KafkaVersion: "0.8.2.0",
Host: "localhost",
User: "root",
Password: "",
Port: 3306,
KafkaVersion: "0.8.2.0",
BinlogFileDir: c.MkDir(),
}

// create pb syncer
pb, err := NewPBSyncer(c.MkDir(), infoGetter)
pb, err := NewPBSyncer(cfg.BinlogFileDir, cfg.BinlogFileRetentionTime, infoGetter)
c.Assert(err, check.IsNil)

s.syncers = append(s.syncers, pb)
Expand Down
11 changes: 6 additions & 5 deletions drainer/sync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ type DBConfig struct {
User string `toml:"user" json:"user"`
Password string `toml:"password" json:"password"`
// if EncryptedPassword is not empty, Password will be ignore.
EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"`
SyncMode int `toml:"sync-mode" json:"sync-mode"`
Port int `toml:"port" json:"port"`
Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"`
BinlogFileDir string `toml:"dir" json:"dir"`
EncryptedPassword string `toml:"encrypted_password" json:"encrypted_password"`
SyncMode int `toml:"sync-mode" json:"sync-mode"`
Port int `toml:"port" json:"port"`
Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"`
BinlogFileDir string `toml:"dir" json:"dir"`
BinlogFileRetentionTime int `toml:"retention-time" json:"retention-time"`

ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"`
KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"`
Expand Down
2 changes: 1 addition & 1 deletion drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func createDSyncer(cfg *SyncerConfig, schema *Schema, info *loopbacksync.LoopBac
return nil, errors.Annotate(err, "fail to create kafka dsyncer")
}
case "file":
dsyncer, err = dsync.NewPBSyncer(cfg.To.BinlogFileDir, schema)
dsyncer, err = dsync.NewPBSyncer(cfg.To.BinlogFileDir, cfg.To.BinlogFileRetentionTime, schema)
if err != nil {
return nil, errors.Annotate(err, "fail to create pb dsyncer")
}
Expand Down
56 changes: 39 additions & 17 deletions pkg/binlogfile/binlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ type Binlogger interface {
// close the binlogger
Close() error

// GC recycles the old binlog file
GC(days time.Duration, pos binlog.Pos)
// GGCByTime delete all files that's older than the specified duration, the latest file is always kept
GCByTime(retentionTime time.Duration)

// GCByPos delete all files that's before the specified position, the latest file is always kept
GCByPos(pos binlog.Pos)
}

// binlogger is a logical representation of the log storage
Expand Down Expand Up @@ -348,8 +351,37 @@ func (b *binlogger) Walk(ctx context.Context, from binlog.Pos, sendBinlog func(e
return nil
}

// GC recycles the old binlog file
func (b *binlogger) GC(days time.Duration, pos binlog.Pos) {
// GCByPos delete all files that's before the specified position, the latest file is always kept
func (b *binlogger) GCByPos(pos binlog.Pos) {
names, err := ReadBinlogNames(b.dir)
if err != nil {
log.Error("read binlog files failed", zap.Error(err))
return
}

if len(names) == 0 {
return
}

// skip the latest binlog file
for _, name := range names[:len(names)-1] {
curSuffix, _, err := ParseBinlogName(name)
if err != nil {
log.Error("parse binlog failed", zap.Error(err))
}
if curSuffix < pos.Suffix {
fileName := path.Join(b.dir, name)
if err := os.Remove(fileName); err != nil {
log.Error("fail to remove old binlog file ", zap.Error(err), zap.String("file name", fileName))
continue
}
log.Info("GC binlog file", zap.String("file name", fileName))
}
}
}

// GGCByTime delete all files that's older than the specified duration, the latest file is always kept
func (b *binlogger) GCByTime(retentionTime time.Duration) {
names, err := ReadBinlogNames(b.dir)
if err != nil {
log.Error("read binlog files failed", zap.Error(err))
Expand All @@ -369,22 +401,12 @@ func (b *binlogger) GC(days time.Duration, pos binlog.Pos) {
continue
}

curSuffix, _, err := ParseBinlogName(name)
if err != nil {
log.Error("parse binlog failed", zap.Error(err))
}

if curSuffix < pos.Suffix {
err := os.Remove(fileName)
if err != nil {
log.Error("remove old binlog file err")
if time.Since(fi.ModTime()) > retentionTime {
if err := os.Remove(fileName); err != nil {
log.Error("fail to remove old binlog file", zap.Error(err), zap.String("file name", fileName))
continue
}
log.Info("GC binlog file", zap.String("file name", fileName))
} else if time.Since(fi.ModTime()) > days {
log.Warn(
"binlog file is old enough to be garbage collected, but the position is behind the safe point",
zap.String("name", fileName), zap.Stringer("position", &pos))
}
}
}
Expand Down
55 changes: 42 additions & 13 deletions pkg/binlogfile/binlogger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,43 @@ func (s *testBinloggerSuite) TestCourruption(c *C) {
c.Assert(errors.Cause(err), Equals, io.ErrUnexpectedEOF)
}

func (s *testBinloggerSuite) TestGC(c *C) {
func assertBinlogsCount(c *C, dir string, expected int) {
names, err := ReadBinlogNames(dir)
c.Assert(err, IsNil)
c.Assert(names, HasLen, expected)
}

func (s *testBinloggerSuite) TestGCByPos(c *C) {
dir := c.MkDir()
bl, err := OpenBinlogger(dir, SegmentSizeBytes)
c.Assert(err, IsNil)
// A binlog file with index 0 is created at this point
defer func() {
err := CloseBinlogger(bl)
c.Assert(err, IsNil)
}()

b, ok := bl.(*binlogger)
c.Assert(ok, IsTrue)
// Call rotate multiple times to create new binlog files
for i := 0; i < 4; i++ {
err = b.rotate()
c.Assert(err, IsNil)
}

// We should have 1 + 4 files by now
assertBinlogsCount(c, b.dir, 5)

b.GCByPos(binlog.Pos{Suffix: 2})

assertBinlogsCount(c, b.dir, 3)

b.GCByPos(binlog.Pos{Suffix: 10})

assertBinlogsCount(c, b.dir, 1)
}

func (s *testBinloggerSuite) TestGCByTime(c *C) {
dir := c.MkDir()
bl, err := OpenBinlogger(dir, SegmentSizeBytes)
c.Assert(err, IsNil)
Expand All @@ -218,22 +254,15 @@ func (s *testBinloggerSuite) TestGC(c *C) {
// 2. rotate creates a new binlog file with index 1
c.Assert(err, IsNil)

// No binlog files should be collected,
// because both of the files has an index that's >= 0
time.Sleep(10 * time.Millisecond)
b.GC(time.Millisecond, binlog.Pos{Suffix: 0})

names, err := ReadBinlogNames(b.dir)
c.Assert(err, IsNil)
c.Assert(names, HasLen, 2)
for i, name := range names {
suffix, _, err := ParseBinlogName(name)
c.Assert(err, IsNil)
c.Assert(suffix, Equals, uint64(i))
}

// The one with index 0 should be garbage collected
b.GC(time.Millisecond, binlog.Pos{Suffix: 1})
// Should collect the first file because it's more than 1ms old after
// the following sleep
time.Sleep(10 * time.Millisecond)
b.GCByTime(time.Millisecond)

names, err = ReadBinlogNames(b.dir)
c.Assert(err, IsNil)
c.Assert(names, HasLen, 1)
Expand Down

0 comments on commit ff59e72

Please sign in to comment.