Skip to content

Commit

Permalink
drainer: remove pb compress config && change 'pb' to 'file' (#559)
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored and july2993 committed Apr 26, 2019
1 parent 69abfec commit 8d39e26
Show file tree
Hide file tree
Showing 31 changed files with 109 additions and 237 deletions.
2 changes: 1 addition & 1 deletion cmd/drainer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Usage of drainer:
-data-dir string
drainer data directory path (default data.drainer) (default "data.drainer")
-dest-db-type string
target db type: mysql or pb; see syncer section in conf/drainer.toml (default "mysql")
target db type: mysql or tidb or file or kafka; see syncer section in conf/drainer.toml (default "mysql")
-detect-interval int
the interval time (in seconds) of detect pumps' status (default 10)
-disable-detect
Expand Down
9 changes: 3 additions & 6 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ disable-dispatch = false
safe-mode = false

# downstream storage, equal to --dest-db-type
# valid values are "mysql", "pb", "tidb", "flash", "kafka"
# valid values are "mysql", "file", "tidb", "flash", "kafka"
db-type = "mysql"

# disable sync these schema
Expand Down Expand Up @@ -81,13 +81,10 @@ port = 3306
# you can uncomment this to change the database to save checkpoint when the downstream is mysql or tidb
#schema = "tidb_binlog"

# Uncomment this if you want to use pb or sql as db-type.
# Compress compresses output file, like pb and sql file. Now it supports "gzip" algorithm only.
# Values can be "gzip". Leave it empty to disable compression.
# Uncomment this if you want to use file as db-type.
#[syncer.to]
# directory to save pb file, default same as data-dir(save checkpoint file) if this is not configured.
# directory to save binlog file, default same as data-dir(save checkpoint file) if this is not configured.
# dir = "data.drainer"
# compression = "gzip"


# when db-type is kafka, you can uncomment this to config the down stream kafka, it will be the globle config kafka default
Expand Down
2 changes: 1 addition & 1 deletion drainer/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewCheckPoint(name string, cfg *Config) (CheckPoint, error) {
switch name {
case "mysql", "tidb":
cp, err = newMysql(name, cfg)
case "pb":
case "file":
cp, err = NewPb(cfg)
case "kafka":
cp, err = newKafka(cfg)
Expand Down
19 changes: 13 additions & 6 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func NewConfig() *Config {
fs.IntVar(&cfg.SyncerCfg.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch")
fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas")
fs.IntVar(&cfg.SyncerCfg.WorkerCount, "c", 16, "parallel worker count")
fs.StringVar(&cfg.SyncerCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql or tidb or pb or flash or kafka; see syncer section in conf/drainer.toml")
fs.StringVar(&cfg.SyncerCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql or tidb or file or kafka; see syncer section in conf/drainer.toml")
fs.BoolVar(&cfg.SyncerCfg.DisableDispatch, "disable-dispatch", false, "disable dispatching sqls that in one same binlog; if set true, work-count and txn-batch would be useless")
fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant")
fs.BoolVar(&cfg.SyncerCfg.DisableCausality, "disable-detect", false, "disbale detect causality")
Expand Down Expand Up @@ -197,7 +197,7 @@ func (cfg *Config) Parse(args []string) error {
}

func (c *SyncerConfig) adjustWorkCount() {
if c.DestDBType == "pb" || c.DestDBType == "kafka" {
if c.DestDBType == "file" || c.DestDBType == "kafka" {
c.DisableDispatch = true
c.WorkerCount = 1
} else if c.DisableDispatch {
Expand Down Expand Up @@ -277,13 +277,17 @@ func (cfg *Config) adjustConfig() error {
cfg.ListenAddr = "http://" + cfg.ListenAddr // add 'http:' scheme to facilitate parsing
adjustString(&cfg.DataDir, defaultDataDir)
adjustInt(&cfg.DetectInterval, defaultDetectInterval)
cfg.SyncerCfg.adjustWorkCount()
cfg.SyncerCfg.adjustDoDBAndTable()

// add default syncer.to configuration if need
if cfg.SyncerCfg.To == nil {
cfg.SyncerCfg.To = new(dsync.DBConfig)
}

if cfg.SyncerCfg.DestDBType == "pb" {
// pb is an alias of file, use file instead
cfg.SyncerCfg.DestDBType = "file"
}

if cfg.SyncerCfg.DestDBType == "kafka" {
// get KafkaAddrs from zookeeper if ZkAddrs is setted
if cfg.SyncerCfg.To.ZKAddrs != "" {
Expand Down Expand Up @@ -318,10 +322,10 @@ func (cfg *Config) adjustConfig() error {
if cfg.SyncerCfg.To.KafkaMaxMessages <= 0 {
cfg.SyncerCfg.To.KafkaMaxMessages = 1024
}
} else if cfg.SyncerCfg.DestDBType == "pb" {
} else if cfg.SyncerCfg.DestDBType == "file" {
if len(cfg.SyncerCfg.To.BinlogFileDir) == 0 {
cfg.SyncerCfg.To.BinlogFileDir = cfg.DataDir
log.Infof("use default downstream pb directory: %s", cfg.DataDir)
log.Infof("use default downstream file directory: %s", cfg.DataDir)
}
} else if cfg.SyncerCfg.DestDBType == "mysql" || cfg.SyncerCfg.DestDBType == "tidb" {
if len(cfg.SyncerCfg.To.Host) == 0 {
Expand Down Expand Up @@ -350,5 +354,8 @@ func (cfg *Config) adjustConfig() error {
}
}

cfg.SyncerCfg.adjustWorkCount()
cfg.SyncerCfg.adjustDoDBAndTable()

return nil
}
13 changes: 13 additions & 0 deletions drainer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,16 @@ func (t *testDrainerSuite) TestValidate(c *C) {
err = cfg.validate()
c.Assert(err, IsNil)
}

func (t *testDrainerSuite) TestAdjustConfig(c *C) {
cfg := NewConfig()
cfg.SyncerCfg.DestDBType = "pb"
cfg.SyncerCfg.WorkerCount = 10
cfg.SyncerCfg.DisableDispatch = false

err := cfg.adjustConfig()
c.Assert(err, IsNil)
c.Assert(cfg.SyncerCfg.DestDBType, Equals, "file")
c.Assert(cfg.SyncerCfg.WorkerCount, Equals, 1)
c.Assert(cfg.SyncerCfg.DisableDispatch, IsTrue)
}
46 changes: 23 additions & 23 deletions drainer/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func (t *schemaSuite) TestSchema(c *C) {
jobs = append(
jobs,
&model.Job{
ID: 6,
State: model.JobStateSynced,
SchemaID: 1,
Type: model.ActionDropSchema,
ID: 6,
State: model.JobStateSynced,
SchemaID: 1,
Type: model.ActionDropSchema,
BinlogInfo: &model.HistoryInfo{SchemaVersion: 3, FinishedTS: 123},
Query: "drop database test",
Query: "drop database test",
},
)
schema, err = NewSchema(jobs, false)
Expand All @@ -96,12 +96,12 @@ func (t *schemaSuite) TestSchema(c *C) {
jobs = append(
jobs,
&model.Job{
ID: 9,
State: model.JobStateSynced,
SchemaID: 1,
Type: model.ActionDropSchema,
ID: 9,
State: model.JobStateSynced,
SchemaID: 1,
Type: model.ActionDropSchema,
BinlogInfo: &model.HistoryInfo{SchemaVersion: 1, FinishedTS: 123},
Query: "drop database test",
Query: "drop database test",
},
)
schema, err = NewSchema(jobs, false)
Expand Down Expand Up @@ -170,7 +170,7 @@ func (*schemaSuite) TestTable(c *C) {
SchemaID: 3,
TableID: 2,
Type: model.ActionCreateTable,
BinlogInfo: &model.HistoryInfo{SchemaVersion: 2, TableInfo: tblInfo, FinishedTS: 123},
BinlogInfo: &model.HistoryInfo{SchemaVersion: 2, TableInfo: tblInfo, FinishedTS: 123},
Query: "create table " + tbName.O,
}
jobs = append(jobs, job)
Expand Down Expand Up @@ -224,13 +224,13 @@ func (*schemaSuite) TestTable(c *C) {
jobs = append(
jobs,
&model.Job{
ID: 9,
State: model.JobStateSynced,
SchemaID: 3,
TableID: 2,
Type: model.ActionTruncateTable,
ID: 9,
State: model.JobStateSynced,
SchemaID: 3,
TableID: 2,
Type: model.ActionTruncateTable,
BinlogInfo: &model.HistoryInfo{SchemaVersion: 5, TableInfo: tblInfo1, FinishedTS: 123},
Query: "truncate table " + tbName.O,
Query: "truncate table " + tbName.O,
},
)
schema1, err := NewSchema(jobs, false)
Expand All @@ -246,13 +246,13 @@ func (*schemaSuite) TestTable(c *C) {
jobs = append(
jobs,
&model.Job{
ID: 9,
State: model.JobStateSynced,
SchemaID: 3,
TableID: 9,
Type: model.ActionDropTable,
ID: 9,
State: model.JobStateSynced,
SchemaID: 3,
TableID: 9,
Type: model.ActionDropTable,
BinlogInfo: &model.HistoryInfo{SchemaVersion: 6, FinishedTS: 123},
Query: "drop table " + tbName.O,
Query: "drop table " + tbName.O,
},
)
schema2, err := NewSchema(jobs, false)
Expand Down
6 changes: 2 additions & 4 deletions drainer/sync/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/pingcap/tidb-binlog/pkg/binlogfile"
"github.com/pingcap/tidb-binlog/pkg/compress"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
tb "github.com/pingcap/tipb/go-binlog"
)
Expand All @@ -29,9 +28,8 @@ type pbSyncer struct {
*baseSyncer
}

func NewPBSyncer(dir string, compression string, tableInfoGetter translator.TableInfoGetter) (*pbSyncer, error) {
codec := compress.ToCompressionCodec(compression)
binlogger, err := binlogfile.OpenBinlogger(dir, codec)
func NewPBSyncer(dir string, tableInfoGetter translator.TableInfoGetter) (*pbSyncer, error) {
binlogger, err := binlogfile.OpenBinlogger(dir)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion drainer/sync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *syncerSuite) SetUpTest(c *check.C) {
}

// create pb syncer
pb, err := NewPBSyncer(c.MkDir(), "", infoGetter)
pb, err := NewPBSyncer(c.MkDir(), infoGetter)
c.Assert(err, check.IsNil)

s.syncers = append(s.syncers, pb)
Expand Down
1 change: 0 additions & 1 deletion drainer/sync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type DBConfig struct {
Port int `toml:"port" json:"port"`
Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"`
BinlogFileDir string `toml:"dir" json:"dir"`
Compression string `toml:"compression" json:"compression"`
TimeLimit string `toml:"time-limit" json:"time-limit"`
SizeLimit string `toml:"size-limit" json:"size-limit"`

Expand Down
4 changes: 2 additions & 2 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func createDSyncer(cfg *SyncerConfig, schema *Schema) (dsyncer dsync.Syncer, err
if err != nil {
return nil, errors.Annotate(err, "fail to create kafka dsyncer")
}
case "pb":
dsyncer, err = dsync.NewPBSyncer(cfg.To.BinlogFileDir, cfg.To.Compression, schema)
case "file":
dsyncer, err = dsync.NewPBSyncer(cfg.To.BinlogFileDir, schema)
if err != nil {
return nil, errors.Annotate(err, "fail to create pb dsyncer")
}
Expand Down
10 changes: 3 additions & 7 deletions pkg/binlogfile/binlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/ngaut/log"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-binlog/pkg/compress"
"github.com/pingcap/tidb-binlog/pkg/file"
"github.com/pingcap/tipb/go-binlog"
"golang.org/x/net/context"
Expand Down Expand Up @@ -71,8 +70,6 @@ type binlogger struct {
// encoder encodes binlog payload into bytes, and write to file
encoder Encoder

codec compress.CompressionCodec

lastSuffix uint64
lastOffset int64

Expand All @@ -83,7 +80,7 @@ type binlogger struct {
}

// OpenBinlogger returns a binlogger for write, then it can be appended
func OpenBinlogger(dirpath string, codec compress.CompressionCodec) (Binlogger, error) {
func OpenBinlogger(dirpath string) (Binlogger, error) {
log.Infof("open binlog directory %s", dirpath)
var (
err error
Expand Down Expand Up @@ -145,8 +142,7 @@ func OpenBinlogger(dirpath string, codec compress.CompressionCodec) (Binlogger,
binlog := &binlogger{
dir: dirpath,
file: fileLock,
encoder: NewEncoder(fileLock, offset, codec),
codec: codec,
encoder: NewEncoder(fileLock, offset),
dirLock: dirLock,
lastSuffix: lastFileSuffix,
lastOffset: offset,
Expand Down Expand Up @@ -414,7 +410,7 @@ func (b *binlogger) rotate() error {
}
b.file = newTail

b.encoder = NewEncoder(b.file, 0, b.codec)
b.encoder = NewEncoder(b.file, 0)
log.Infof("segmented binlog file %v is created", fpath)
return nil
}
Expand Down
21 changes: 10 additions & 11 deletions pkg/binlogfile/binlogger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-binlog/pkg/compress"
"github.com/pingcap/tipb/go-binlog"
)

Expand All @@ -42,7 +41,7 @@ func (s *testBinloggerSuite) TestCreate(c *C) {
}

func checkTest(c *C, dir string) {
bl, err := OpenBinlogger(dir, compress.CompressionNone)
bl, err := OpenBinlogger(dir)
c.Assert(err, IsNil)
defer CloseBinlogger(bl)

Expand All @@ -54,7 +53,7 @@ func checkTest(c *C, dir string) {

func (s *testBinloggerSuite) TestOpenForWrite(c *C) {
dir := c.MkDir()
bl, err := OpenBinlogger(dir, compress.CompressionNone)
bl, err := OpenBinlogger(dir)
c.Assert(err, IsNil)

b, ok := bl.(*binlogger)
Expand All @@ -65,7 +64,7 @@ func (s *testBinloggerSuite) TestOpenForWrite(c *C) {
c.Assert(err, IsNil)
bl.Close()

bl, err = OpenBinlogger(dir, compress.CompressionNone)
bl, err = OpenBinlogger(dir)
c.Assert(err, IsNil)

b, ok = bl.(*binlogger)
Expand All @@ -90,7 +89,7 @@ func (s *testBinloggerSuite) TestOpenForWrite(c *C) {

func (s *testBinloggerSuite) TestRotateFile(c *C) {
dir := c.MkDir()
bl, err := OpenBinlogger(dir, compress.CompressionNone)
bl, err := OpenBinlogger(dir)
c.Assert(err, IsNil)

payload := []byte("binlogtest")
Expand All @@ -110,7 +109,7 @@ func (s *testBinloggerSuite) TestRotateFile(c *C) {

bl.Close()

bl, err = OpenBinlogger(dir, compress.CompressionNone)
bl, err = OpenBinlogger(dir)
c.Assert(err, IsNil)

binlogs, err := bl.ReadFrom(binlog.Pos{}, 1)
Expand All @@ -129,7 +128,7 @@ func (s *testBinloggerSuite) TestRotateFile(c *C) {

func (s *testBinloggerSuite) TestRead(c *C) {
dir := c.MkDir()
bl, err := OpenBinlogger(dir, compress.CompressionNone)
bl, err := OpenBinlogger(dir)
c.Assert(err, IsNil)
defer bl.Close()

Expand Down Expand Up @@ -168,7 +167,7 @@ func (s *testBinloggerSuite) TestRead(c *C) {

func (s *testBinloggerSuite) TestCourruption(c *C) {
dir := c.MkDir()
bl, err := OpenBinlogger(dir, compress.CompressionNone)
bl, err := OpenBinlogger(dir)
c.Assert(err, IsNil)
defer bl.Close()

Expand Down Expand Up @@ -201,7 +200,7 @@ func (s *testBinloggerSuite) TestCourruption(c *C) {

func (s *testBinloggerSuite) TestGC(c *C) {
dir := c.MkDir()
bl, err := OpenBinlogger(dir, compress.CompressionNone)
bl, err := OpenBinlogger(dir)
c.Assert(err, IsNil)
defer CloseBinlogger(bl)

Expand All @@ -227,7 +226,7 @@ func (s *testBinloggerSuite) TestSeekBinlog(c *C) {
os.Remove(f.Name())
}()

encoder := NewEncoder(f, 0, compress.CompressionNone)
encoder := NewEncoder(f, 0)
_, err = encoder.Encode([]byte("binlogtest"))
c.Assert(err, IsNil)

Expand Down Expand Up @@ -270,7 +269,7 @@ func (s *testBinloggerSuite) TestSeekBinlog(c *C) {

func (s *testBinloggerSuite) TestSkipCRCRead(c *C) {
dir := c.MkDir()
bl, err := OpenBinlogger(dir, compress.CompressionNone)
bl, err := OpenBinlogger(dir)
c.Assert(err, IsNil)
defer bl.Close()

Expand Down
Loading

0 comments on commit 8d39e26

Please sign in to comment.