diff --git a/cmd/drainer/README.md b/cmd/drainer/README.md index 61e638148..ab70faea9 100644 --- a/cmd/drainer/README.md +++ b/cmd/drainer/README.md @@ -20,7 +20,7 @@ Usage of drainer: -data-dir string drainer data directory path (default data.drainer) (default "data.drainer") -dest-db-type string - target db type: mysql or pb; see syncer section in conf/drainer.toml (default "mysql") + target db type: mysql or tidb or file or kafka; see syncer section in conf/drainer.toml (default "mysql") -detect-interval int the interval time (in seconds) of detect pumps' status (default 10) -disable-detect diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index 1fe6175e5..120915044 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -46,7 +46,7 @@ disable-dispatch = false safe-mode = false # downstream storage, equal to --dest-db-type -# valid values are "mysql", "pb", "tidb", "flash", "kafka" +# valid values are "mysql", "file", "tidb", "flash", "kafka" db-type = "mysql" # disable sync these schema @@ -81,13 +81,10 @@ port = 3306 # you can uncomment this to change the database to save checkpoint when the downstream is mysql or tidb #schema = "tidb_binlog" -# Uncomment this if you want to use pb or sql as db-type. -# Compress compresses output file, like pb and sql file. Now it supports "gzip" algorithm only. -# Values can be "gzip". Leave it empty to disable compression. +# Uncomment this if you want to use file as db-type. #[syncer.to] -# directory to save pb file, default same as data-dir(save checkpoint file) if this is not configured. +# directory to save binlog file, default same as data-dir(save checkpoint file) if this is not configured. # dir = "data.drainer" -# compression = "gzip" # when db-type is kafka, you can uncomment this to config the down stream kafka, it will be the globle config kafka default diff --git a/drainer/checkpoint/checkpoint.go b/drainer/checkpoint/checkpoint.go index 5c1018d77..fe5957580 100644 --- a/drainer/checkpoint/checkpoint.go +++ b/drainer/checkpoint/checkpoint.go @@ -54,7 +54,7 @@ func NewCheckPoint(name string, cfg *Config) (CheckPoint, error) { switch name { case "mysql", "tidb": cp, err = newMysql(name, cfg) - case "pb": + case "file": cp, err = NewPb(cfg) case "kafka": cp, err = newKafka(cfg) diff --git a/drainer/config.go b/drainer/config.go index 0e7718ed9..1defcc5f1 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -124,7 +124,7 @@ func NewConfig() *Config { fs.IntVar(&cfg.SyncerCfg.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch") fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas") fs.IntVar(&cfg.SyncerCfg.WorkerCount, "c", 16, "parallel worker count") - fs.StringVar(&cfg.SyncerCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql or tidb or pb or flash or kafka; see syncer section in conf/drainer.toml") + fs.StringVar(&cfg.SyncerCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql or tidb or file or kafka; see syncer section in conf/drainer.toml") fs.BoolVar(&cfg.SyncerCfg.DisableDispatch, "disable-dispatch", false, "disable dispatching sqls that in one same binlog; if set true, work-count and txn-batch would be useless") fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant") fs.BoolVar(&cfg.SyncerCfg.DisableCausality, "disable-detect", false, "disbale detect causality") @@ -197,7 +197,7 @@ func (cfg *Config) Parse(args []string) error { } func (c *SyncerConfig) adjustWorkCount() { - if c.DestDBType == "pb" || c.DestDBType == "kafka" { + if c.DestDBType == "file" || c.DestDBType == "kafka" { c.DisableDispatch = true c.WorkerCount = 1 } else if c.DisableDispatch { @@ -277,13 +277,17 @@ func (cfg *Config) adjustConfig() error { cfg.ListenAddr = "http://" + cfg.ListenAddr // add 'http:' scheme to facilitate parsing adjustString(&cfg.DataDir, defaultDataDir) adjustInt(&cfg.DetectInterval, defaultDetectInterval) - cfg.SyncerCfg.adjustWorkCount() - cfg.SyncerCfg.adjustDoDBAndTable() // add default syncer.to configuration if need if cfg.SyncerCfg.To == nil { cfg.SyncerCfg.To = new(dsync.DBConfig) } + + if cfg.SyncerCfg.DestDBType == "pb" { + // pb is an alias of file, use file instead + cfg.SyncerCfg.DestDBType = "file" + } + if cfg.SyncerCfg.DestDBType == "kafka" { // get KafkaAddrs from zookeeper if ZkAddrs is setted if cfg.SyncerCfg.To.ZKAddrs != "" { @@ -318,10 +322,10 @@ func (cfg *Config) adjustConfig() error { if cfg.SyncerCfg.To.KafkaMaxMessages <= 0 { cfg.SyncerCfg.To.KafkaMaxMessages = 1024 } - } else if cfg.SyncerCfg.DestDBType == "pb" { + } else if cfg.SyncerCfg.DestDBType == "file" { if len(cfg.SyncerCfg.To.BinlogFileDir) == 0 { cfg.SyncerCfg.To.BinlogFileDir = cfg.DataDir - log.Infof("use default downstream pb directory: %s", cfg.DataDir) + log.Infof("use default downstream file directory: %s", cfg.DataDir) } } else if cfg.SyncerCfg.DestDBType == "mysql" || cfg.SyncerCfg.DestDBType == "tidb" { if len(cfg.SyncerCfg.To.Host) == 0 { @@ -350,5 +354,8 @@ func (cfg *Config) adjustConfig() error { } } + cfg.SyncerCfg.adjustWorkCount() + cfg.SyncerCfg.adjustDoDBAndTable() + return nil } diff --git a/drainer/config_test.go b/drainer/config_test.go index 084dc3e43..93513c475 100644 --- a/drainer/config_test.go +++ b/drainer/config_test.go @@ -69,3 +69,16 @@ func (t *testDrainerSuite) TestValidate(c *C) { err = cfg.validate() c.Assert(err, IsNil) } + +func (t *testDrainerSuite) TestAdjustConfig(c *C) { + cfg := NewConfig() + cfg.SyncerCfg.DestDBType = "pb" + cfg.SyncerCfg.WorkerCount = 10 + cfg.SyncerCfg.DisableDispatch = false + + err := cfg.adjustConfig() + c.Assert(err, IsNil) + c.Assert(cfg.SyncerCfg.DestDBType, Equals, "file") + c.Assert(cfg.SyncerCfg.WorkerCount, Equals, 1) + c.Assert(cfg.SyncerCfg.DisableDispatch, IsTrue) +} diff --git a/drainer/schema_test.go b/drainer/schema_test.go index 47ffec94f..7943421d3 100644 --- a/drainer/schema_test.go +++ b/drainer/schema_test.go @@ -68,12 +68,12 @@ func (t *schemaSuite) TestSchema(c *C) { jobs = append( jobs, &model.Job{ - ID: 6, - State: model.JobStateSynced, - SchemaID: 1, - Type: model.ActionDropSchema, + ID: 6, + State: model.JobStateSynced, + SchemaID: 1, + Type: model.ActionDropSchema, BinlogInfo: &model.HistoryInfo{SchemaVersion: 3, FinishedTS: 123}, - Query: "drop database test", + Query: "drop database test", }, ) schema, err = NewSchema(jobs, false) @@ -96,12 +96,12 @@ func (t *schemaSuite) TestSchema(c *C) { jobs = append( jobs, &model.Job{ - ID: 9, - State: model.JobStateSynced, - SchemaID: 1, - Type: model.ActionDropSchema, + ID: 9, + State: model.JobStateSynced, + SchemaID: 1, + Type: model.ActionDropSchema, BinlogInfo: &model.HistoryInfo{SchemaVersion: 1, FinishedTS: 123}, - Query: "drop database test", + Query: "drop database test", }, ) schema, err = NewSchema(jobs, false) @@ -170,7 +170,7 @@ func (*schemaSuite) TestTable(c *C) { SchemaID: 3, TableID: 2, Type: model.ActionCreateTable, - BinlogInfo: &model.HistoryInfo{SchemaVersion: 2, TableInfo: tblInfo, FinishedTS: 123}, + BinlogInfo: &model.HistoryInfo{SchemaVersion: 2, TableInfo: tblInfo, FinishedTS: 123}, Query: "create table " + tbName.O, } jobs = append(jobs, job) @@ -224,13 +224,13 @@ func (*schemaSuite) TestTable(c *C) { jobs = append( jobs, &model.Job{ - ID: 9, - State: model.JobStateSynced, - SchemaID: 3, - TableID: 2, - Type: model.ActionTruncateTable, + ID: 9, + State: model.JobStateSynced, + SchemaID: 3, + TableID: 2, + Type: model.ActionTruncateTable, BinlogInfo: &model.HistoryInfo{SchemaVersion: 5, TableInfo: tblInfo1, FinishedTS: 123}, - Query: "truncate table " + tbName.O, + Query: "truncate table " + tbName.O, }, ) schema1, err := NewSchema(jobs, false) @@ -246,13 +246,13 @@ func (*schemaSuite) TestTable(c *C) { jobs = append( jobs, &model.Job{ - ID: 9, - State: model.JobStateSynced, - SchemaID: 3, - TableID: 9, - Type: model.ActionDropTable, + ID: 9, + State: model.JobStateSynced, + SchemaID: 3, + TableID: 9, + Type: model.ActionDropTable, BinlogInfo: &model.HistoryInfo{SchemaVersion: 6, FinishedTS: 123}, - Query: "drop table " + tbName.O, + Query: "drop table " + tbName.O, }, ) schema2, err := NewSchema(jobs, false) diff --git a/drainer/sync/pb.go b/drainer/sync/pb.go index d33296024..038cdcda9 100644 --- a/drainer/sync/pb.go +++ b/drainer/sync/pb.go @@ -16,7 +16,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb-binlog/drainer/translator" "github.com/pingcap/tidb-binlog/pkg/binlogfile" - "github.com/pingcap/tidb-binlog/pkg/compress" pb "github.com/pingcap/tidb-binlog/proto/binlog" tb "github.com/pingcap/tipb/go-binlog" ) @@ -29,9 +28,8 @@ type pbSyncer struct { *baseSyncer } -func NewPBSyncer(dir string, compression string, tableInfoGetter translator.TableInfoGetter) (*pbSyncer, error) { - codec := compress.ToCompressionCodec(compression) - binlogger, err := binlogfile.OpenBinlogger(dir, codec) +func NewPBSyncer(dir string, tableInfoGetter translator.TableInfoGetter) (*pbSyncer, error) { + binlogger, err := binlogfile.OpenBinlogger(dir) if err != nil { return nil, errors.Trace(err) } diff --git a/drainer/sync/syncer_test.go b/drainer/sync/syncer_test.go index 18790ad88..c7553d61b 100644 --- a/drainer/sync/syncer_test.go +++ b/drainer/sync/syncer_test.go @@ -54,7 +54,7 @@ func (s *syncerSuite) SetUpTest(c *check.C) { } // create pb syncer - pb, err := NewPBSyncer(c.MkDir(), "", infoGetter) + pb, err := NewPBSyncer(c.MkDir(), infoGetter) c.Assert(err, check.IsNil) s.syncers = append(s.syncers, pb) diff --git a/drainer/sync/util.go b/drainer/sync/util.go index ef7d34267..4be69162a 100644 --- a/drainer/sync/util.go +++ b/drainer/sync/util.go @@ -26,7 +26,6 @@ type DBConfig struct { Port int `toml:"port" json:"port"` Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"` BinlogFileDir string `toml:"dir" json:"dir"` - Compression string `toml:"compression" json:"compression"` TimeLimit string `toml:"time-limit" json:"time-limit"` SizeLimit string `toml:"size-limit" json:"size-limit"` diff --git a/drainer/syncer.go b/drainer/syncer.go index a98a659fc..855e2f48b 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -92,8 +92,8 @@ func createDSyncer(cfg *SyncerConfig, schema *Schema) (dsyncer dsync.Syncer, err if err != nil { return nil, errors.Annotate(err, "fail to create kafka dsyncer") } - case "pb": - dsyncer, err = dsync.NewPBSyncer(cfg.To.BinlogFileDir, cfg.To.Compression, schema) + case "file": + dsyncer, err = dsync.NewPBSyncer(cfg.To.BinlogFileDir, schema) if err != nil { return nil, errors.Annotate(err, "fail to create pb dsyncer") } diff --git a/pkg/binlogfile/binlogger.go b/pkg/binlogfile/binlogger.go index e6e075d3c..529f631b0 100644 --- a/pkg/binlogfile/binlogger.go +++ b/pkg/binlogfile/binlogger.go @@ -24,7 +24,6 @@ import ( "github.com/ngaut/log" "github.com/pingcap/errors" - "github.com/pingcap/tidb-binlog/pkg/compress" "github.com/pingcap/tidb-binlog/pkg/file" "github.com/pingcap/tipb/go-binlog" "golang.org/x/net/context" @@ -71,8 +70,6 @@ type binlogger struct { // encoder encodes binlog payload into bytes, and write to file encoder Encoder - codec compress.CompressionCodec - lastSuffix uint64 lastOffset int64 @@ -83,7 +80,7 @@ type binlogger struct { } // OpenBinlogger returns a binlogger for write, then it can be appended -func OpenBinlogger(dirpath string, codec compress.CompressionCodec) (Binlogger, error) { +func OpenBinlogger(dirpath string) (Binlogger, error) { log.Infof("open binlog directory %s", dirpath) var ( err error @@ -145,8 +142,7 @@ func OpenBinlogger(dirpath string, codec compress.CompressionCodec) (Binlogger, binlog := &binlogger{ dir: dirpath, file: fileLock, - encoder: NewEncoder(fileLock, offset, codec), - codec: codec, + encoder: NewEncoder(fileLock, offset), dirLock: dirLock, lastSuffix: lastFileSuffix, lastOffset: offset, @@ -414,7 +410,7 @@ func (b *binlogger) rotate() error { } b.file = newTail - b.encoder = NewEncoder(b.file, 0, b.codec) + b.encoder = NewEncoder(b.file, 0) log.Infof("segmented binlog file %v is created", fpath) return nil } diff --git a/pkg/binlogfile/binlogger_test.go b/pkg/binlogfile/binlogger_test.go index 3d05f004a..7e08fe904 100644 --- a/pkg/binlogfile/binlogger_test.go +++ b/pkg/binlogfile/binlogger_test.go @@ -23,7 +23,6 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" - "github.com/pingcap/tidb-binlog/pkg/compress" "github.com/pingcap/tipb/go-binlog" ) @@ -42,7 +41,7 @@ func (s *testBinloggerSuite) TestCreate(c *C) { } func checkTest(c *C, dir string) { - bl, err := OpenBinlogger(dir, compress.CompressionNone) + bl, err := OpenBinlogger(dir) c.Assert(err, IsNil) defer CloseBinlogger(bl) @@ -54,7 +53,7 @@ func checkTest(c *C, dir string) { func (s *testBinloggerSuite) TestOpenForWrite(c *C) { dir := c.MkDir() - bl, err := OpenBinlogger(dir, compress.CompressionNone) + bl, err := OpenBinlogger(dir) c.Assert(err, IsNil) b, ok := bl.(*binlogger) @@ -65,7 +64,7 @@ func (s *testBinloggerSuite) TestOpenForWrite(c *C) { c.Assert(err, IsNil) bl.Close() - bl, err = OpenBinlogger(dir, compress.CompressionNone) + bl, err = OpenBinlogger(dir) c.Assert(err, IsNil) b, ok = bl.(*binlogger) @@ -90,7 +89,7 @@ func (s *testBinloggerSuite) TestOpenForWrite(c *C) { func (s *testBinloggerSuite) TestRotateFile(c *C) { dir := c.MkDir() - bl, err := OpenBinlogger(dir, compress.CompressionNone) + bl, err := OpenBinlogger(dir) c.Assert(err, IsNil) payload := []byte("binlogtest") @@ -110,7 +109,7 @@ func (s *testBinloggerSuite) TestRotateFile(c *C) { bl.Close() - bl, err = OpenBinlogger(dir, compress.CompressionNone) + bl, err = OpenBinlogger(dir) c.Assert(err, IsNil) binlogs, err := bl.ReadFrom(binlog.Pos{}, 1) @@ -129,7 +128,7 @@ func (s *testBinloggerSuite) TestRotateFile(c *C) { func (s *testBinloggerSuite) TestRead(c *C) { dir := c.MkDir() - bl, err := OpenBinlogger(dir, compress.CompressionNone) + bl, err := OpenBinlogger(dir) c.Assert(err, IsNil) defer bl.Close() @@ -168,7 +167,7 @@ func (s *testBinloggerSuite) TestRead(c *C) { func (s *testBinloggerSuite) TestCourruption(c *C) { dir := c.MkDir() - bl, err := OpenBinlogger(dir, compress.CompressionNone) + bl, err := OpenBinlogger(dir) c.Assert(err, IsNil) defer bl.Close() @@ -201,7 +200,7 @@ func (s *testBinloggerSuite) TestCourruption(c *C) { func (s *testBinloggerSuite) TestGC(c *C) { dir := c.MkDir() - bl, err := OpenBinlogger(dir, compress.CompressionNone) + bl, err := OpenBinlogger(dir) c.Assert(err, IsNil) defer CloseBinlogger(bl) @@ -227,7 +226,7 @@ func (s *testBinloggerSuite) TestSeekBinlog(c *C) { os.Remove(f.Name()) }() - encoder := NewEncoder(f, 0, compress.CompressionNone) + encoder := NewEncoder(f, 0) _, err = encoder.Encode([]byte("binlogtest")) c.Assert(err, IsNil) @@ -270,7 +269,7 @@ func (s *testBinloggerSuite) TestSeekBinlog(c *C) { func (s *testBinloggerSuite) TestSkipCRCRead(c *C) { dir := c.MkDir() - bl, err := OpenBinlogger(dir, compress.CompressionNone) + bl, err := OpenBinlogger(dir) c.Assert(err, IsNil) defer bl.Close() diff --git a/pkg/binlogfile/encoder.go b/pkg/binlogfile/encoder.go index 68782b193..a2ee423f7 100644 --- a/pkg/binlogfile/encoder.go +++ b/pkg/binlogfile/encoder.go @@ -19,7 +19,6 @@ import ( "io" "github.com/pingcap/errors" - "github.com/pingcap/tidb-binlog/pkg/compress" ) var magic uint32 = 471532804 @@ -35,15 +34,13 @@ type Encoder interface { type encoder struct { bw io.Writer - codec compress.CompressionCodec offset int64 } // NewEncoder creates a Encoder instance -func NewEncoder(w io.Writer, initOffset int64, codec compress.CompressionCodec) Encoder { +func NewEncoder(w io.Writer, initOffset int64) Encoder { return &encoder{ bw: w, - codec: codec, offset: initOffset, } } @@ -51,12 +48,7 @@ func NewEncoder(w io.Writer, initOffset int64, codec compress.CompressionCodec) // Encode implements interface of Encoder func (e *encoder) Encode(payload []byte) (int64, error) { data := Encode(payload) - - data, err := compress.Compress(data, e.codec) - if err != nil { - return 0, errors.Trace(err) - } - _, err = e.bw.Write(data) + _, err := e.bw.Write(data) if err != nil { return 0, errors.Trace(err) } diff --git a/pkg/compress/compress.go b/pkg/compress/compress.go deleted file mode 100644 index a3d886d2a..000000000 --- a/pkg/compress/compress.go +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package compress - -import ( - "bytes" - "compress/gzip" - "strings" - - "github.com/ngaut/log" - "github.com/pingcap/errors" -) - -// CompressionCodec defines type of compression. -type CompressionCodec int8 - -const ( - // CompressionNone means no compression. - CompressionNone CompressionCodec = iota - // CompressionGZIP means using GZIP compression. - CompressionGZIP -) - -// ToCompressionCodec converts v to CompressionCodec. -func ToCompressionCodec(v string) CompressionCodec { - v = strings.ToLower(v) - switch v { - case "": - return CompressionNone - case "gzip": - return CompressionGZIP - default: - log.Warnf("unknown codec %v, no compression.", v) - return CompressionNone - } -} - -// Compress compresses payload based on the codec. -func Compress(data []byte, codec CompressionCodec) (payload []byte, err error) { - switch codec { - case CompressionNone: - payload = data - case CompressionGZIP: - var buf bytes.Buffer - writer := gzip.NewWriter(&buf) - if _, err := writer.Write(data); err != nil { - return nil, errors.Trace(err) - } - if err := writer.Close(); err != nil { - return nil, errors.Trace(err) - } - payload = buf.Bytes() - } - - return payload, nil -} diff --git a/pkg/compress/compress_test.go b/pkg/compress/compress_test.go deleted file mode 100644 index 3ba537786..000000000 --- a/pkg/compress/compress_test.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package compress_test - -import ( - "bytes" - "compress/gzip" - "io/ioutil" - "testing" - - . "github.com/pingcap/check" - "github.com/pingcap/tidb-binlog/pkg/compress" -) - -func TestClient(t *testing.T) { - TestingT(t) -} - -var _ = Suite(&testCompressSuite{}) - -type testCompressSuite struct{} - -func (s *testCompressSuite) TestParseCompressCodec(c *C) { - c.Assert(compress.ToCompressionCodec(""), Equals, compress.CompressionNone) - c.Assert(compress.ToCompressionCodec("gzip"), Equals, compress.CompressionGZIP) - c.Assert(compress.ToCompressionCodec("zstd"), Not(Equals), compress.CompressionGZIP) -} - -func (s *testCompressSuite) TestNoCompression(c *C) { - data := bytes.Repeat([]byte("test.data"), 1000) - payload, err := compress.Compress(data, compress.CompressionNone) - c.Assert(err, IsNil) - c.Assert(payload, BytesEquals, data) -} - -func (s *testCompressSuite) TestGZIPCompression(c *C) { - data := bytes.Repeat([]byte("test.data"), 1000) - payload, err := compress.Compress(data, compress.CompressionGZIP) - c.Assert(err, IsNil) - c.Assert(len(payload), Less, len(data)) - - // also check that decompressing the payload gets back the original data. - decompressor, err := gzip.NewReader(bytes.NewReader(payload)) - c.Assert(err, IsNil) - decompressed, err := ioutil.ReadAll(decompressor) - c.Assert(err, IsNil) - c.Assert(decompressed, BytesEquals, data) -} diff --git a/pkg/loader/load.go b/pkg/loader/load.go index da69faae2..42a97aaaa 100644 --- a/pkg/loader/load.go +++ b/pkg/loader/load.go @@ -508,4 +508,4 @@ func (s *loaderImpl) groupDMLs(dmls []*DML) (batchByTbls map[string][]*DML, sing } } return -} \ No newline at end of file +} diff --git a/pkg/loader/load_test.go b/pkg/loader/load_test.go index 077abec96..a204e1dd3 100644 --- a/pkg/loader/load_test.go +++ b/pkg/loader/load_test.go @@ -15,6 +15,7 @@ package loader import ( "database/sql" + sqlmock "github.com/DATA-DOG/go-sqlmock" check "github.com/pingcap/check" ) diff --git a/pkg/loader/model_test.go b/pkg/loader/model_test.go index 013a30dc1..7c7382432 100644 --- a/pkg/loader/model_test.go +++ b/pkg/loader/model_test.go @@ -95,15 +95,15 @@ func (d *dmlSuite) testWhere(c *check.C, tp DMLType) { c.Assert(strings.Count(builder.String(), "?"), check.Equals, len(args)) } +type getKeysSuite struct{} -type getKeysSuite struct {} var _ = check.Suite(&getKeysSuite{}) func (s *getKeysSuite) TestGetKeyShouldUseNamesWithVals(c *check.C) { names := []string{"name", "age", "city"} values := map[string]interface{}{ "name": "pingcap", - "age": 42, + "age": 42, } c.Assert(getKey(names, values), check.Equals, "(name: pingcap)(age: 42)") } @@ -125,29 +125,29 @@ func (s *getKeysSuite) TestShouldHaveAtLeastOneKey(c *check.C) { func (s *getKeysSuite) TestShouldCollectNewOldUniqKeyVals(c *check.C) { dml := DML{ Database: "db", - Table: "tbl", - Tp: UpdateDMLType, + Table: "tbl", + Tp: UpdateDMLType, info: &tableInfo{ columns: []string{"id", "first", "last", "other"}, uniqueKeys: []indexInfo{ indexInfo{ - name: "uniq name", + name: "uniq name", columns: []string{"first", "last"}, }, indexInfo{ - name: "other", + name: "other", columns: []string{"other"}, }, }, }, Values: map[string]interface{}{ "first": "strict", - "last": "tester", + "last": "tester", "other": 42, }, OldValues: map[string]interface{}{ "first": "Strict", - "last": "Tester", + "last": "Tester", "other": 1, }, } @@ -161,17 +161,18 @@ func (s *getKeysSuite) TestShouldCollectNewOldUniqKeyVals(c *check.C) { c.Assert(keys, check.DeepEquals, expected) } -type SQLSuite struct {} +type SQLSuite struct{} + var _ = check.Suite(&SQLSuite{}) func (s *SQLSuite) TestInsertSQL(c *check.C) { dml := DML{ - Tp: InsertDMLType, + Tp: InsertDMLType, Database: "test", - Table: "hello", + Table: "hello", Values: map[string]interface{}{ "name": "pc", - "age": 42, + "age": 42, }, info: &tableInfo{ columns: []string{"name", "age"}, @@ -186,9 +187,9 @@ func (s *SQLSuite) TestInsertSQL(c *check.C) { func (s *SQLSuite) TestDeleteSQL(c *check.C) { dml := DML{ - Tp: DeleteDMLType, + Tp: DeleteDMLType, Database: "test", - Table: "hello", + Table: "hello", Values: map[string]interface{}{ "name": "pc", }, @@ -206,9 +207,9 @@ func (s *SQLSuite) TestDeleteSQL(c *check.C) { func (s *SQLSuite) TestUpdateSQL(c *check.C) { dml := DML{ - Tp: UpdateDMLType, + Tp: UpdateDMLType, Database: "db", - Table: "tbl", + Table: "tbl", Values: map[string]interface{}{ "name": "pc", }, @@ -226,4 +227,4 @@ func (s *SQLSuite) TestUpdateSQL(c *check.C) { c.Assert(args, check.HasLen, 2) c.Assert(args[0], check.Equals, "pc") c.Assert(args[1], check.Equals, "pingcap") -} \ No newline at end of file +} diff --git a/pkg/loader/util_test.go b/pkg/loader/util_test.go index e60618f55..bb0f6b357 100644 --- a/pkg/loader/util_test.go +++ b/pkg/loader/util_test.go @@ -14,8 +14,8 @@ package loader import ( - "testing" "regexp" + "testing" sqlmock "github.com/DATA-DOG/go-sqlmock" check "github.com/pingcap/check" diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go index 714ac25e8..a4354cd39 100644 --- a/pkg/node/node_test.go +++ b/pkg/node/node_test.go @@ -26,4 +26,4 @@ func (s *testNodeSuite) TestClone(c *C) { status2 := CloneStatus(status) c.Assert(status, Not(Equals), status2) c.Assert(*status, Equals, *status2) -} \ No newline at end of file +} diff --git a/pkg/sql/sql.go b/pkg/sql/sql.go index 1e7240671..1df22b8f0 100644 --- a/pkg/sql/sql.go +++ b/pkg/sql/sql.go @@ -23,7 +23,6 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" - "github.com/go-sql-driver/mysql" "github.com/ngaut/log" "github.com/pingcap/errors" diff --git a/pkg/sql/sql_test.go b/pkg/sql/sql_test.go index 32038b450..bf1ec4ecd 100644 --- a/pkg/sql/sql_test.go +++ b/pkg/sql/sql_test.go @@ -19,7 +19,7 @@ import ( "testing" "time" - "github.com/DATA-DOG/go-sqlmock" + sqlmock "github.com/DATA-DOG/go-sqlmock" "github.com/go-sql-driver/mysql" . "github.com/pingcap/check" "github.com/prometheus/client_golang/prometheus" diff --git a/pkg/zk/zkmock_test.go b/pkg/zk/zkmock_test.go index cbda87f32..9f4e6e8c1 100644 --- a/pkg/zk/zkmock_test.go +++ b/pkg/zk/zkmock_test.go @@ -18,9 +18,10 @@ package zk_test import ( + reflect "reflect" + gomock "github.com/golang/mock/gomock" zk "github.com/samuel/go-zookeeper/zk" - reflect "reflect" ) // MockConn is a mock of Conn interface diff --git a/reparo/file_test.go b/reparo/file_test.go index d558ef9d9..05ffd0b34 100644 --- a/reparo/file_test.go +++ b/reparo/file_test.go @@ -18,7 +18,6 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb-binlog/pkg/binlogfile" - "github.com/pingcap/tidb-binlog/pkg/compress" pb "github.com/pingcap/tidb-binlog/proto/binlog" "github.com/pingcap/tidb/store/tikv/oracle" gb "github.com/pingcap/tipb/go-binlog" @@ -47,7 +46,7 @@ func (s *testFileSuite) TestIsAcceptableBinlogFile(c *C) { binlogData, err := binlog.Marshal() c.Assert(err, IsNil) - binloger, err := binlogfile.OpenBinlogger(binlogDir, compress.CompressionNone) + binloger, err := binlogfile.OpenBinlogger(binlogDir) c.Assert(err, IsNil) binloger.WriteTail(&gb.Entity{Payload: binlogData}) err = binloger.Close() diff --git a/tests/binlog/drainer.toml b/tests/binlog/drainer.toml index 9a21dc06f..413c6c038 100644 --- a/tests/binlog/drainer.toml +++ b/tests/binlog/drainer.toml @@ -39,7 +39,7 @@ disable-dispatch = false safe-mode = false # downstream storage, equal to --dest-db-type -# valid values are "mysql", "pb", "tidb", "flash", "kafka" +# valid values are "mysql", "file", "tidb", "flash", "kafka" db-type = "mysql" ##replicate-do-db priority over replicate-do-table if have same db name @@ -66,12 +66,9 @@ port = 3306 [syncer.to.checkpoint] #schema = "tidb_binlog" -# Uncomment this if you want to use pb or sql as db-type. -# Compress compresses output file, like pb and sql file. Now it supports "gzip" algorithm only. -# Values can be "gzip". Leave it empty to disable compression. +# Uncomment this if you want to use file as db-type. #[syncer.to] #dir = "data.drainer" -#compression = "gzip" # when db-type is kafka, you can uncomment this to config the down stream kafka, it will be the globle config kafka default diff --git a/tests/dailytest/parser.go b/tests/dailytest/parser.go index d14f75bb1..826f51a53 100644 --- a/tests/dailytest/parser.go +++ b/tests/dailytest/parser.go @@ -235,4 +235,3 @@ func parseTableSQL(table *table, sql string) error { return errors.Trace(err) } - diff --git a/tests/filter/drainer.toml b/tests/filter/drainer.toml index c1042b93c..59f2867a0 100644 --- a/tests/filter/drainer.toml +++ b/tests/filter/drainer.toml @@ -28,7 +28,7 @@ disable-dispatch = false safe-mode = false # downstream storage, equal to --dest-db-type -# valid values are "mysql", "pb", "tidb", "flash", "kafka" +# valid values are "mysql", "file", "tidb", "flash", "kafka" db-type = "mysql" # disable sync these schema diff --git a/tests/kafka/drainer.toml b/tests/kafka/drainer.toml index 6a20c0c91..398335c00 100644 --- a/tests/kafka/drainer.toml +++ b/tests/kafka/drainer.toml @@ -22,7 +22,7 @@ disable-dispatch = false safe-mode = false # downstream storage, equal to --dest-db-type -# valid values are "mysql", "pb", "tidb", "flash", "kafka" +# valid values are "mysql", "file", "tidb", "flash", "kafka" db-type = "kafka" # Time and size limits for flash batch write diff --git a/tests/reparo/drainer.toml b/tests/reparo/drainer.toml index 59e4975fb..b04743af4 100644 --- a/tests/reparo/drainer.toml +++ b/tests/reparo/drainer.toml @@ -31,8 +31,8 @@ disable-dispatch = false safe-mode = false # downstream storage, equal to --dest-db-type -# valid values are "mysql", "pb", "tidb", "flash", "kafka" -db-type = "pb" +# valid values are "mysql", "file", "tidb", "flash", "kafka" +db-type = "file" #[syncer.to] #dir = "/data/data.drainer" diff --git a/tests/restart/drainer.toml b/tests/restart/drainer.toml index 6d68330f6..6e5415ec9 100644 --- a/tests/restart/drainer.toml +++ b/tests/restart/drainer.toml @@ -31,7 +31,7 @@ disable-dispatch = false safe-mode = false # downstream storage, equal to --dest-db-type -# valid values are "mysql", "pb", "tidb", "flash", "kafka" +# valid values are "mysql", "file", "tidb", "flash", "kafka" db-type = "mysql" # the downstream mysql protocol database diff --git a/tests/status/drainer.toml b/tests/status/drainer.toml index ee50cba79..9275c91d9 100644 --- a/tests/status/drainer.toml +++ b/tests/status/drainer.toml @@ -31,8 +31,8 @@ disable-dispatch = false safe-mode = false # downstream storage, equal to --dest-db-type -# valid values are "mysql", "pb", "tidb", "flash", "kafka" -db-type = "pb" +# valid values are "mysql", "file", "tidb", "flash", "kafka" +db-type = "file" #[syncer.to] #dir = "/data/data.drainer"