diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index 2a58eb929..21b4a912d 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -76,13 +76,12 @@ ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql" # #replicate-do-db = ["~^b.*","s1"] +[syncer.relay] # directory of relay logs. Empty string indicates disabling relay log. # relay log works only if the downstream is TiDB/MySQL. -relay-log-dir = "" +# log-dir = "" # max file size of each relay log -relay-log-size = 10485760 -# read buffer size of relay log -relay-read-buf-size = 8 +# max-file-size = 10485760 #[[syncer.replicate-do-table]] #db-name ="test" diff --git a/drainer/checkpoint/checkpoint.go b/drainer/checkpoint/checkpoint.go index a9a0a20fa..ea4e822a0 100644 --- a/drainer/checkpoint/checkpoint.go +++ b/drainer/checkpoint/checkpoint.go @@ -19,6 +19,14 @@ import ( "go.uber.org/zap" ) +const ( + // StatusConsistent means server quit normally, data <= ts is synced to downstream + StatusConsistent int = 0 + + // StatusRunning means server running or quit abnormally, part of data may or may not been synced to downstream + StatusRunning int = 1 +) + var ( // ErrCheckPointClosed indicates the CheckPoint already closed. ErrCheckPointClosed = errors.New("CheckPoint already closed") @@ -31,11 +39,14 @@ type CheckPoint interface { Load() error // Save saves checkpoint information. - Save(int64, int64) error + Save(commitTS int64, slaveTS int64, status int) error - // Pos gets position information. + // TS gets checkpoint commit timestamp. TS() int64 + // Status return the status saved. + Status() int + // Close closes the CheckPoint and release resources, after closed other methods should not be called again. Close() error } @@ -50,7 +61,7 @@ func NewCheckPoint(cfg *Config) (CheckPoint, error) { case "mysql", "tidb": cp, err = newMysql(cfg) case "file": - cp, err = NewFile(cfg) + cp, err = NewFile(cfg.InitialCommitTS, cfg.CheckPointFile) default: err = errors.Errorf("unsupported checkpoint type %s", cfg.CheckpointType) } diff --git a/drainer/checkpoint/file.go b/drainer/checkpoint/file.go index e7d287c73..fdb31dd1a 100644 --- a/drainer/checkpoint/file.go +++ b/drainer/checkpoint/file.go @@ -31,14 +31,15 @@ type FileCheckPoint struct { name string - CommitTS int64 `toml:"commitTS" json:"commitTS"` + StatusSaved int `toml:"status" json:"status"` + CommitTS int64 `toml:"commitTS" json:"commitTS"` } // NewFile creates a new FileCheckpoint. -func NewFile(cfg *Config) (CheckPoint, error) { +func NewFile(initialCommitTS int64, filePath string) (CheckPoint, error) { pb := &FileCheckPoint{ - initialCommitTS: cfg.InitialCommitTS, - name: cfg.CheckPointFile, + initialCommitTS: initialCommitTS, + name: filePath, } err := pb.Load() if err != nil { @@ -81,7 +82,7 @@ func (sp *FileCheckPoint) Load() error { } // Save implements CheckPoint.Save interface -func (sp *FileCheckPoint) Save(ts, slaveTS int64) error { +func (sp *FileCheckPoint) Save(ts, slaveTS int64, status int) error { sp.Lock() defer sp.Unlock() @@ -90,6 +91,7 @@ func (sp *FileCheckPoint) Save(ts, slaveTS int64) error { } sp.CommitTS = ts + sp.StatusSaved = status var buf bytes.Buffer e := toml.NewEncoder(&buf) @@ -114,6 +116,14 @@ func (sp *FileCheckPoint) TS() int64 { return sp.CommitTS } +// Status implements CheckPoint.Status interface +func (sp *FileCheckPoint) Status() int { + sp.RLock() + defer sp.RUnlock() + + return sp.StatusSaved +} + // Close implements CheckPoint.Close interface func (sp *FileCheckPoint) Close() error { sp.Lock() diff --git a/drainer/checkpoint/file_test.go b/drainer/checkpoint/file_test.go index 9862313a6..1e03a99fd 100644 --- a/drainer/checkpoint/file_test.go +++ b/drainer/checkpoint/file_test.go @@ -23,22 +23,22 @@ import ( func (t *testCheckPointSuite) TestFile(c *C) { fileName := "/tmp/test" notExistFileName := "test_not_exist" - cfg := new(Config) - cfg.CheckPointFile = fileName - meta, err := NewFile(cfg) + meta, err := NewFile(0, fileName) c.Assert(err, IsNil) defer os.RemoveAll(fileName) // zero (initial) CommitTs c.Assert(meta.TS(), Equals, int64(0)) + c.Assert(meta.Status(), Equals, StatusNormal) testTs := int64(1) // save ts - err = meta.Save(testTs, 0) + err = meta.Save(testTs, 0, StatusRunning) c.Assert(err, IsNil) // check ts ts := meta.TS() c.Assert(ts, Equals, testTs) + c.Assert(meta.Status(), Equals, StatusRunning) // check load ts err = meta.Load() @@ -47,23 +47,22 @@ func (t *testCheckPointSuite) TestFile(c *C) { c.Assert(ts, Equals, testTs) // check not exist meta file - cfg.CheckPointFile = notExistFileName - meta, err = NewFile(cfg) + meta, err = NewFile(0, notExistFileName) c.Assert(err, IsNil) err = meta.Load() c.Assert(err, IsNil) c.Assert(meta.TS(), Equals, int64(0)) // check not exist meta file, but with initialCommitTs - cfg.InitialCommitTS = 123 - meta, err = NewFile(cfg) + var initialCommitTS int64 = 123 + meta, err = NewFile(initialCommitTS, notExistFileName) c.Assert(err, IsNil) - c.Assert(meta.TS(), Equals, cfg.InitialCommitTS) + c.Assert(meta.TS(), Equals, initialCommitTS) // close the checkpoint err = meta.Close() c.Assert(err, IsNil) c.Assert(errors.Cause(meta.Load()), Equals, ErrCheckPointClosed) - c.Assert(errors.Cause(meta.Save(0, 0)), Equals, ErrCheckPointClosed) + c.Assert(errors.Cause(meta.Save(0, 0, StatusNormal)), Equals, ErrCheckPointClosed) c.Assert(errors.Cause(meta.Close()), Equals, ErrCheckPointClosed) } diff --git a/drainer/checkpoint/mysql.go b/drainer/checkpoint/mysql.go index 2c60465a6..1b3914aa6 100644 --- a/drainer/checkpoint/mysql.go +++ b/drainer/checkpoint/mysql.go @@ -19,6 +19,9 @@ import ( "sync" "github.com/pingcap/errors" + "github.com/pingcap/log" + "go.uber.org/zap" + // mysql driver _ "github.com/go-sql-driver/mysql" pkgsql "github.com/pingcap/tidb-binlog/pkg/sql" @@ -35,10 +38,13 @@ type MysqlCheckPoint struct { schema string table string - CommitTS int64 `toml:"commitTS" json:"commitTS"` - TsMap map[string]int64 `toml:"ts-map" json:"ts-map"` + StatusSaved int `toml:"status" json:"status"` + CommitTS int64 `toml:"commitTS" json:"commitTS"` + TsMap map[string]int64 `toml:"ts-map" json:"ts-map"` } +var _ CheckPoint = &MysqlCheckPoint{} + var sqlOpenDB = pkgsql.OpenDB func newMysql(cfg *Config) (CheckPoint, error) { @@ -68,6 +74,16 @@ func newMysql(cfg *Config) (CheckPoint, error) { return nil, errors.Annotatef(err, "exec failed, sql: %s", sql) } + if sp.clusterID == 0 { + id, err := getClusterID(db, sp.schema, sp.table) + if err != nil { + return nil, errors.Trace(err) + } + + log.Info("set cluster id", zap.Uint64("id", id)) + sp.clusterID = id + } + err = sp.Load() return sp, errors.Trace(err) } @@ -106,7 +122,7 @@ func (sp *MysqlCheckPoint) Load() error { } // Save implements checkpoint.Save interface -func (sp *MysqlCheckPoint) Save(ts, slaveTS int64) error { +func (sp *MysqlCheckPoint) Save(ts, slaveTS int64, status int) error { sp.Lock() defer sp.Unlock() @@ -115,6 +131,7 @@ func (sp *MysqlCheckPoint) Save(ts, slaveTS int64) error { } sp.CommitTS = ts + sp.StatusSaved = status if slaveTS > 0 { sp.TsMap["master-ts"] = ts @@ -135,6 +152,14 @@ func (sp *MysqlCheckPoint) Save(ts, slaveTS int64) error { return nil } +// Status implements CheckPoint.Status interface +func (sp *MysqlCheckPoint) Status() int { + sp.RLock() + defer sp.RUnlock() + + return sp.StatusSaved +} + // TS implements CheckPoint.TS interface func (sp *MysqlCheckPoint) TS() int64 { sp.RLock() diff --git a/drainer/checkpoint/mysql_test.go b/drainer/checkpoint/mysql_test.go index 411bb0b79..0f641838f 100644 --- a/drainer/checkpoint/mysql_test.go +++ b/drainer/checkpoint/mysql_test.go @@ -49,7 +49,7 @@ func (s *saveSuite) TestShouldSaveCheckpoint(c *C) { c.Assert(err, IsNil) mock.ExpectExec("replace into db.tbl.*").WillReturnResult(sqlmock.NewResult(0, 0)) cp := MysqlCheckPoint{db: db, schema: "db", table: "tbl"} - err = cp.Save(1111, 0) + err = cp.Save(1111, 0, StatusRunning) c.Assert(err, IsNil) } @@ -63,7 +63,7 @@ func (s *saveSuite) TestShouldUpdateTsMap(c *C) { table: "tbl", TsMap: make(map[string]int64), } - err = cp.Save(65536, 3333) + err = cp.Save(65536, 3333, StatusRunning) c.Assert(err, IsNil) c.Assert(cp.TsMap["master-ts"], Equals, int64(65536)) c.Assert(cp.TsMap["slave-ts"], Equals, int64(3333)) @@ -83,12 +83,13 @@ func (s *loadSuite) TestShouldLoadFromDB(c *C) { TsMap: make(map[string]int64), } rows := sqlmock.NewRows([]string{"checkPoint"}). - AddRow(`{"commitTS": 1024, "ts-map": {"master-ts": 2000, "slave-ts": 1999}}`) + AddRow(`{"commitTS": 1024, "status": 1, "ts-map": {"master-ts": 2000, "slave-ts": 1999}}`) mock.ExpectQuery("select checkPoint from db.tbl.*").WillReturnRows(rows) err = cp.Load() c.Assert(err, IsNil) c.Assert(cp.CommitTS, Equals, int64(1024)) + c.Assert(cp.StatusSaved, Equals, 1) c.Assert(cp.TsMap["master-ts"], Equals, int64(2000)) c.Assert(cp.TsMap["slave-ts"], Equals, int64(1999)) } diff --git a/drainer/checkpoint/util.go b/drainer/checkpoint/util.go index e48132e9e..45867424c 100644 --- a/drainer/checkpoint/util.go +++ b/drainer/checkpoint/util.go @@ -14,12 +14,19 @@ package checkpoint import ( + "database/sql" + stderrors "errors" "fmt" // mysql driver _ "github.com/go-sql-driver/mysql" + "github.com/pingcap/errors" ) +// ErrNoCheckpointItem represents there's no any checkpoint item and the cluster id must be specified +// for the mysql checkpoint type. +var ErrNoCheckpointItem = stderrors.New("no any checkpoint item") + // DBConfig is the DB configuration. type DBConfig struct { Host string `toml:"host" json:"host"` @@ -74,6 +81,36 @@ func genReplaceSQL(sp *MysqlCheckPoint, str string) string { return fmt.Sprintf("replace into %s.%s values(%d, '%s')", sp.schema, sp.table, sp.clusterID, str) } +// getClusterID return the cluster id iff the checkpoint table exist only one row. +func getClusterID(db *sql.DB, schema string, table string) (id uint64, err error) { + sqlQuery := fmt.Sprintf("select clusterID from %s.%s", schema, table) + rows, err := db.Query(sqlQuery) + if err != nil { + return 0, errors.Trace(err) + } + + for rows.Next() { + if id > 0 { + return 0, errors.New("there are multi row in checkpoint table") + } + + err = rows.Scan(&id) + if err != nil { + return 0, errors.Trace(err) + } + } + + if rows.Err() != nil { + return 0, errors.Trace(rows.Err()) + } + + if id == 0 { + return 0, ErrNoCheckpointItem + } + + return +} + func genSelectSQL(sp *MysqlCheckPoint) string { return fmt.Sprintf("select checkPoint from %s.%s where clusterID = %d", sp.schema, sp.table, sp.clusterID) } diff --git a/drainer/checkpoint/util_test.go b/drainer/checkpoint/util_test.go new file mode 100644 index 000000000..746652bf5 --- /dev/null +++ b/drainer/checkpoint/util_test.go @@ -0,0 +1,63 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkpoint + +import ( + "github.com/DATA-DOG/go-sqlmock" + . "github.com/pingcap/check" + "github.com/pingcap/errors" +) + +var _ = Suite(&testUtil{}) + +type testUtil struct{} + +func (t *testUtil) TestG(c *C) { + tests := []struct { + name string + rows []uint64 + id uint64 + err bool + checkSpecifiedErr error + }{ + {"no row", nil, 0, true, ErrNoCheckpointItem}, + {"on row", []uint64{1}, 1, false, nil}, + {"multi row", []uint64{1, 2}, 0, true, nil}, + } + + for _, test := range tests { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + + rows := sqlmock.NewRows([]string{"clusterID"}) + for _, row := range test.rows { + rows.AddRow(row) + } + + mock.ExpectQuery("select clusterID from .*").WillReturnRows(rows) + + c.Log("test: ", test.name) + id, err := getClusterID(db, "schema", "table") + if test.err { + c.Assert(err, NotNil) + c.Assert(id, Equals, test.id) + if test.checkSpecifiedErr != nil { + c.Assert(errors.Cause(err), Equals, test.checkSpecifiedErr) + } + } else { + c.Assert(err, IsNil) + c.Assert(id, Equals, test.id) + } + } +} diff --git a/drainer/config.go b/drainer/config.go index 973d69b8c..f830aa7fb 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -74,14 +74,23 @@ type SyncerConfig struct { DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"` DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"` DestDBType string `toml:"db-type" json:"db-type"` - RelayLogDir string `toml:"relay-log-dir" json:"relay-log-dir"` - RelayLogSize int64 `toml:"relay-log-size" json:"relay-log-size"` - RelayReadBufSize int `toml:"relay-read-buf-size" json:"relay-read-buf-size"` + Relay RelayConfig `toml:"relay" json:"relay"` EnableDispatch bool `toml:"enable-dispatch" json:"enable-dispatch"` SafeMode bool `toml:"safe-mode" json:"safe-mode"` EnableCausality bool `toml:"enable-detect" json:"enable-detect"` } +// RelayConfig is the Relay log's configuration. +type RelayConfig struct { + LogDir string `toml:"log-dir" json:"log-dir"` + MaxFileSize int64 `toml:"max-file-size" json:"max-file-size"` +} + +// IsEnabled return true if we need to handle relay log. +func (rc RelayConfig) IsEnabled() bool { + return len(rc.LogDir) > 0 +} + // Config holds the configuration of drainer type Config struct { *flag.FlagSet `json:"-"` @@ -139,9 +148,9 @@ func NewConfig() *Config { fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas") fs.IntVar(&cfg.SyncerCfg.WorkerCount, "c", 16, "parallel worker count") fs.StringVar(&cfg.SyncerCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql or tidb or file or kafka; see syncer section in conf/drainer.toml") - fs.StringVar(&cfg.SyncerCfg.RelayLogDir, "relay-log-dir", "", "path to relay log of syncer") - fs.Int64Var(&cfg.SyncerCfg.RelayLogSize, "relay-log-size", 10*1024*1024, "max file size of each relay log") - fs.IntVar(&cfg.SyncerCfg.RelayReadBufSize, "relay-read-buf-size", 8, "read buffer size of relay log") + fs.StringVar(&cfg.SyncerCfg.Relay.LogDir, "relay-log-dir", "", "path to relay log of syncer") + fs.Int64Var(&cfg.SyncerCfg.Relay.MaxFileSize, "relay-max-file-size", 10485760, "max file size of each relay log") + fs.BoolVar(&cfg.SyncerCfg.EnableDispatch, "enable-dispatch", true, "enable dispatching sqls that in one same binlog; if set true, work-count and txn-batch would be useless") fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant") fs.BoolVar(&cfg.SyncerCfg.EnableCausality, "enable-detect", false, "enable detect causality") diff --git a/drainer/relay.go b/drainer/relay.go new file mode 100644 index 000000000..099c2f91d --- /dev/null +++ b/drainer/relay.go @@ -0,0 +1,160 @@ +package drainer + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tidb-binlog/drainer/checkpoint" + "github.com/pingcap/tidb-binlog/drainer/relay" + "github.com/pingcap/tidb-binlog/drainer/sync" + "github.com/pingcap/tidb-binlog/pkg/loader" + obinlog "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog" + "go.uber.org/zap" +) + +func feedByRelayLogIfNeed(cfg *Config) error { + if !cfg.SyncerCfg.Relay.IsEnabled() { + return nil + } + + // for the mysql type checkpoint + // clusterID will be use as the key + // we can't get the cluster id from pd so we just set 0 + // and the checkpoint will use the clusterID exist at the checkpoint table. + cpCfg, err := GenCheckPointCfg(cfg, 0 /* clusterID */) + if err != nil { + return errors.Trace(err) + } + + scfg := cfg.SyncerCfg + + cp, err := checkpoint.NewCheckPoint(cpCfg) + if err != nil { + return errors.Trace(err) + } + + defer cp.Close() + + if cp.Status() == checkpoint.StatusConsistent { + return nil + } + + reader, err := relay.NewReader(scfg.Relay.LogDir, 1 /* readBufferSize */) + if err != nil { + return errors.Annotate(err, "failed to create reader") + } + + db, err := loader.CreateDBWithSQLMode(scfg.To.User, scfg.To.Password, scfg.To.Host, scfg.To.Port, scfg.StrSQLMode) + if err != nil { + return errors.Annotate(err, "failed to create SQL db") + } + defer db.Close() + + ld, err := sync.CreateLoader(db, scfg.To, scfg.WorkerCount, scfg.TxnBatch, + queryHistogramVec, scfg.StrSQLMode, scfg.DestDBType, nil /*loopbacksync.LoopBackSync*/) + if err != nil { + return errors.Annotate(err, "failed to create loader") + } + + err = feedByRelayLog(reader, ld, cp) + if err != nil { + return errors.Trace(err) + } + + return nil +} + +// feedByRelayLog will take over the `ld loader.Loader`. +func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint) error { + checkpointTS := cp.TS() + lastSuccessTS := checkpointTS + r.Run() + + loaderQuit := make(chan struct{}) + var loaderErr error + go func() { + ld.SetSafeMode(true) + loaderErr = ld.Run() + close(loaderQuit) + }() + + var readerTxnsC <-chan *obinlog.Binlog + var toPushLoaderTxn *loader.Txn + var loaderInputC chan<- *loader.Txn + successTxnC := ld.Successes() + + readerTxnsC = r.Binlogs() + readerTxnsCClosed := false + + loaderClosed := false + + for { + // when reader is drained and all txn has been push into loader + // we close cloader. + if readerTxnsC == nil && loaderInputC == nil && !loaderClosed { + ld.Close() + loaderClosed = true + } + + // break once we drainer the success items return by loader. + if loaderClosed && successTxnC == nil { + break + } + + select { + case sbinlog, ok := <-readerTxnsC: + if !ok { + log.Info("readerTxnsC closed") + readerTxnsC = nil + readerTxnsCClosed = true + continue + } + if sbinlog.CommitTs <= checkpointTS { + continue + } + + txn, err := loader.SlaveBinlogToTxn(sbinlog) + if err != nil { + return errors.Trace(err) + } + + readerTxnsC = nil + txn.Metadata = sbinlog.CommitTs + toPushLoaderTxn = txn + loaderInputC = ld.Input() + case loaderInputC <- toPushLoaderTxn: + loaderInputC = nil + toPushLoaderTxn = nil + if !readerTxnsCClosed { + readerTxnsC = r.Binlogs() + } + case success, ok := <-successTxnC: + if !ok { + successTxnC = nil + log.Info("success closed") + continue + } + lastSuccessTS = success.Metadata.(int64) + case <-loaderQuit: + if loaderErr != nil { + return errors.Trace(loaderErr) + } + } + } + + log.Info("finish feed by relay log") + + readerErr := <-r.Error() + + if readerErr != nil { + return errors.Trace(readerErr) + } + + err := cp.Save(lastSuccessTS, 0 /* slaveTS */, checkpoint.StatusConsistent) + if err != nil { + return errors.Trace(err) + } + + log.Info("update status as normal", zap.Int64("ts", lastSuccessTS)) + + return nil +} diff --git a/drainer/relay/reader.go b/drainer/relay/reader.go index 063ea542c..64752d913 100644 --- a/drainer/relay/reader.go +++ b/drainer/relay/reader.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb-binlog/pkg/binlogfile" - "github.com/pingcap/tidb-binlog/pkg/loader" obinlog "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog" "github.com/pingcap/tipb/go-binlog" ) @@ -31,8 +30,8 @@ type Reader interface { // Run reads relay log. Run() context.CancelFunc - // Txns returns parsed transactions. - Txns() <-chan *loader.Txn + // Binlogs returns the channel for reading parsed binlogs. + Binlogs() <-chan *obinlog.Binlog // Close releases resources. Close() error @@ -43,7 +42,7 @@ type Reader interface { type reader struct { binlogger binlogfile.Binlogger - txns chan *loader.Txn + binlogs chan *obinlog.Binlog err chan error } @@ -56,7 +55,7 @@ func NewReader(dir string, readBufferSize int) (Reader, error) { return &reader{ binlogger: binlogger, - txns: make(chan *loader.Txn, readBufferSize), + binlogs: make(chan *obinlog.Binlog, readBufferSize), }, nil } @@ -85,21 +84,16 @@ func (r *reader) Run() context.CancelFunc { break } - var txn *loader.Txn - txn, err = loader.SlaveBinlogToTxn(slaveBinlog) - if err != nil { - break - } select { case <-ctx.Done(): err = ctx.Err() log.Warn("Producing transaction is interrupted") - case r.txns <- txn: + case r.binlogs <- slaveBinlog: } } // If binlogger is not done, notify it to stop. cancel() - close(r.txns) + close(r.binlogs) if err == nil { err = <-binlogErr @@ -114,8 +108,8 @@ func (r *reader) Run() context.CancelFunc { } // Txns implements Reader interface. -func (r *reader) Txns() <-chan *loader.Txn { - return r.txns +func (r *reader) Binlogs() <-chan *obinlog.Binlog { + return r.binlogs } // Error implements Reader interface. diff --git a/drainer/relay/reader_test.go b/drainer/relay/reader_test.go index d1900893d..a3a7f6e47 100644 --- a/drainer/relay/reader_test.go +++ b/drainer/relay/reader_test.go @@ -88,9 +88,11 @@ func (r *testReaderSuite) readBinlogAndCheck(c *C, dir string, expectedNumber in var lastTxn *loader.Txn number := 0 - for txn := range relayReader.Txns() { + for txn := range relayReader.Binlogs() { number++ - lastTxn = txn + loaderTxn, err := loader.SlaveBinlogToTxn(txn) + c.Assert(err, IsNil) + lastTxn = loaderTxn } c.Assert(<-relayReader.Error(), IsNil) c.Assert(number, Equals, expectedNumber) diff --git a/drainer/relay/relayer.go b/drainer/relay/relayer.go index 70091659f..b1706237e 100644 --- a/drainer/relay/relayer.go +++ b/drainer/relay/relayer.go @@ -22,6 +22,8 @@ import ( var _ Relayer = &relayer{} +const defaultMaxFileSize = 10 * 1024 * 1024 + // Relayer is the interface for writing relay log. type Relayer interface { // WriteBinlog writes binlog to relay log file. @@ -43,6 +45,10 @@ type relayer struct { // NewRelayer creates a relayer. func NewRelayer(dir string, maxFileSize int64, tableInfoGetter translator.TableInfoGetter) (Relayer, error) { + if maxFileSize <= 0 { + maxFileSize = defaultMaxFileSize + } + binlogger, err := binlogfile.OpenBinlogger(dir, maxFileSize) if err != nil { return nil, errors.Trace(err) diff --git a/drainer/relay_test.go b/drainer/relay_test.go new file mode 100644 index 000000000..7848d3b08 --- /dev/null +++ b/drainer/relay_test.go @@ -0,0 +1,109 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package drainer + +import ( + "github.com/pingcap/check" + "github.com/pingcap/tidb-binlog/drainer/checkpoint" + "github.com/pingcap/tidb-binlog/drainer/relay" + "github.com/pingcap/tidb-binlog/drainer/translator" + "github.com/pingcap/tidb-binlog/pkg/binlogfile" + "github.com/pingcap/tidb-binlog/pkg/loader" +) + +type relaySuite struct{} + +var _ = check.Suite(&relaySuite{}) + +type noOpLoader struct { + input chan *loader.Txn + success chan *loader.Txn +} + +// noOpLoader just return success for every input txn. +var _ loader.Loader = &noOpLoader{} + +func newNoOpLoader() *noOpLoader { + return &noOpLoader{ + input: make(chan *loader.Txn, 1), + success: make(chan *loader.Txn, 1), + } +} + +func (ld *noOpLoader) Run() error { + for txn := range ld.input { + ld.success <- txn + } + + close(ld.success) + return nil +} + +func (ld *noOpLoader) Close() { + close(ld.input) +} + +func (ld *noOpLoader) Input() chan<- *loader.Txn { + return ld.input +} + +func (ld *noOpLoader) Successes() <-chan *loader.Txn { + return ld.success +} + +func (ld *noOpLoader) SetSafeMode(bool) { +} + +func (ld *noOpLoader) GetSafeMode() bool { + return false +} + +var _ loader.Loader = &noOpLoader{} + +func (s *relaySuite) TestFeedByRealyLog(c *check.C) { + cp, err := checkpoint.NewFile(0 /* initialCommitTS */, c.MkDir()+"/checkpoint") + c.Assert(err, check.IsNil) + err = cp.Save(0, 0, checkpoint.StatusRunning) + c.Assert(err, check.IsNil) + c.Assert(cp.Status(), check.Equals, checkpoint.StatusRunning) + + ld := newNoOpLoader() + + // write some relay log + gen := &translator.BinlogGenerator{} + relayDir := c.MkDir() + relayer, err := relay.NewRelayer(relayDir, binlogfile.SegmentSizeBytes, gen) + c.Assert(err, check.IsNil) + + for i := 0; i < 10; i++ { + gen.SetInsert(c) + gen.TiBinlog.StartTs = int64(i) + gen.TiBinlog.CommitTs = int64(i) * 10 + _, err = relayer.WriteBinlog(gen.Schema, gen.Table, gen.TiBinlog, gen.PV) + c.Assert(err, check.IsNil) + } + + relayer.Close() + c.Assert(err, check.IsNil) + + reader, err := relay.NewReader(relayDir, 1) + c.Assert(err, check.IsNil) + + err = feedByRelayLog(reader, ld, cp) + c.Assert(err, check.IsNil) + + ts := cp.TS() + c.Assert(ts, check.Equals, int64(90) /* latest commit ts */) + c.Assert(cp.Status(), check.Equals, checkpoint.StatusNormal) +} diff --git a/drainer/server.go b/drainer/server.go index ec3877255..9478cb588 100644 --- a/drainer/server.go +++ b/drainer/server.go @@ -101,6 +101,10 @@ func NewServer(cfg *Config) (*Server, error) { // get pd client and cluster ID pdCli, err := getPdClient(cfg.EtcdURLs, cfg.Security) if err != nil { + ferr := feedByRelayLogIfNeed(cfg) + if ferr != nil && errors.Cause(ferr) != checkpoint.ErrNoCheckpointItem { + return nil, errors.Trace(ferr) + } return nil, errors.Trace(err) } diff --git a/drainer/sync/mysql.go b/drainer/sync/mysql.go index d1bdac8ef..6dd03f008 100644 --- a/drainer/sync/mysql.go +++ b/drainer/sync/mysql.go @@ -41,12 +41,17 @@ type MysqlSyncer struct { // should only be used for unit test to create mock db var createDB = loader.CreateDBWithSQLMode -// NewMysqlSyncer returns a instance of MysqlSyncer -func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, worker int, batchSize int, queryHistogramVec *prometheus.HistogramVec, sqlMode *string, destDBType string, relayer relay.Relayer, info *loopbacksync.LoopBackSync) (*MysqlSyncer, error) { - db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, sqlMode) - if err != nil { - return nil, errors.Trace(err) - } +// CreateLoader create the Loader instance. +func CreateLoader( + db *sql.DB, + cfg *DBConfig, + worker int, + batchSize int, + queryHistogramVec *prometheus.HistogramVec, + sqlMode *string, + destDBType string, + info *loopbacksync.LoopBackSync, +) (ld loader.Loader, err error) { var opts []loader.Option opts = append(opts, loader.WorkerCount(worker), loader.BatchSize(batchSize), loader.SaveAppliedTS(destDBType == "tidb"), loader.SetloopBackSyncInfo(info)) @@ -60,25 +65,52 @@ func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, w if cfg.SyncMode != 0 { mode := loader.SyncMode(cfg.SyncMode) opts = append(opts, loader.SyncModeOption(mode)) + } + + ld, err = loader.NewLoader(db, opts...) + if err != nil { + return nil, errors.Trace(err) + } + + return +} + +// NewMysqlSyncer returns a instance of MysqlSyncer +func NewMysqlSyncer( + cfg *DBConfig, + tableInfoGetter translator.TableInfoGetter, + worker int, + batchSize int, + queryHistogramVec *prometheus.HistogramVec, + sqlMode *string, + destDBType string, + relayer relay.Relayer, + info *loopbacksync.LoopBackSync, +) (*MysqlSyncer, error) { + db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, sqlMode) + if err != nil { + return nil, errors.Trace(err) + } + + syncMode := loader.SyncMode(cfg.SyncMode) + if syncMode == loader.SyncPartialColumn { + var oldMode, newMode string + oldMode, newMode, err = relaxSQLMode(db) + if err != nil { + db.Close() + return nil, errors.Trace(err) + } - if mode == loader.SyncPartialColumn { - var oldMode, newMode string - oldMode, newMode, err = relaxSQLMode(db) + if newMode != oldMode { + db.Close() + db, err = createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, &newMode) if err != nil { return nil, errors.Trace(err) } - - if newMode != oldMode { - db.Close() - db, err = createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, &newMode) - if err != nil { - return nil, errors.Trace(err) - } - } } } - loader, err := loader.NewLoader(db, opts...) + loader, err := CreateLoader(db, cfg, worker, batchSize, queryHistogramVec, sqlMode, destDBType, info) if err != nil { return nil, errors.Trace(err) } diff --git a/drainer/sync/syncer.go b/drainer/sync/syncer.go index e1e9b4fc2..41f5535f3 100644 --- a/drainer/sync/syncer.go +++ b/drainer/sync/syncer.go @@ -39,6 +39,7 @@ type Syncer interface { // Return not nil if fail to sync data to downstream or nil if closed normally Error() <-chan error // Close the Syncer, no more item can be added by `Sync` + // will drain all items and return nil if all successfully sync into downstream Close() error } diff --git a/drainer/syncer.go b/drainer/syncer.go index 991b44602..ba297abf3 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -108,9 +108,8 @@ func createDSyncer(cfg *SyncerConfig, schema *Schema, info *loopbacksync.LoopBac } case "mysql", "tidb": var relayer relay.Relayer - // If the dir is empty, it means relayer is disabled. - if len(cfg.RelayLogDir) > 0 { - if relayer, err = relay.NewRelayer(cfg.RelayLogDir, cfg.RelayLogSize, schema); err != nil { + if cfg.Relay.IsEnabled() { + if relayer, err = relay.NewRelayer(cfg.Relay.LogDir, cfg.Relay.MaxFileSize, schema); err != nil { return nil, errors.Annotate(err, "fail to create relayer") } } @@ -259,7 +258,7 @@ func (s *Syncer) savePoint(ts, slaveTS int64) { } log.Info("write save point", zap.Int64("ts", ts)) - err := s.cp.Save(ts, slaveTS) + err := s.cp.Save(ts, slaveTS, checkpoint.StatusRunning) if err != nil { log.Fatal("save checkpoint failed", zap.Int64("ts", ts), zap.Error(err)) } @@ -457,7 +456,12 @@ ForLoop: if err != nil { return err } - return cerr + + if cerr != nil { + return cerr + } + + return s.cp.Save(s.cp.TS(), 0, checkpoint.StatusConsistent) } func findLoopBackMark(dmls []*loader.DML, info *loopbacksync.LoopBackSync) (bool, error) { diff --git a/drainer/syncer_test.go b/drainer/syncer_test.go index 7dc71ac1b..54238214c 100644 --- a/drainer/syncer_test.go +++ b/drainer/syncer_test.go @@ -96,7 +96,7 @@ func (s *syncerSuite) TestNewSyncer(c *check.C) { } cpFile := c.MkDir() + "/checkpoint" - cp, err := checkpoint.NewFile(&checkpoint.Config{CheckPointFile: cpFile}) + cp, err := checkpoint.NewFile(0, cpFile) c.Assert(err, check.IsNil) syncer, err := NewSyncer(cp, cfg, nil) diff --git a/pkg/util/util.go b/pkg/util/util.go index 1a1603e3c..69f0ade0c 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -242,6 +242,7 @@ func QueryLatestTsFromPD(tiStore kv.Storage) (int64, error) { } // Store the function in a variable so that we can mock it when testing +// pd.NewClient will retry internally for init info. var newPdCli = pd.NewClient // GetPdClient create a PD client @@ -251,20 +252,12 @@ func GetPdClient(etcdURLs string, securityConfig security.Config) (pd.Client, er return nil, errors.Trace(err) } - pdReconnTimes := 30 - var pdCli pd.Client - for i := 1; i < pdReconnTimes; i++ { - pdCli, err = newPdCli(urlv.StringSlice(), pd.SecurityOption{ - CAPath: securityConfig.SSLCA, - CertPath: securityConfig.SSLCert, - KeyPath: securityConfig.SSLKey, - }) - if err == nil { - break - } - time.Sleep(time.Duration(pdReconnTimes*i) * time.Millisecond) - } + pdCli, err = newPdCli(urlv.StringSlice(), pd.SecurityOption{ + CAPath: securityConfig.SSLCA, + CertPath: securityConfig.SSLCert, + KeyPath: securityConfig.SSLKey, + }) return pdCli, errors.Trace(err) }