From 5767a31b83d6d0da483b2e9c2372b347fa7497a0 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Wed, 8 Jan 2020 10:53:05 +0800 Subject: [PATCH 01/13] Group config about relay --- cmd/drainer/drainer.toml | 7 +++---- drainer/config.go | 13 +++++++------ drainer/relay/relayer.go | 6 ++++++ drainer/syncer.go | 4 ++-- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index b6cb921a7..9073a4409 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -63,13 +63,12 @@ ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql" # #replicate-do-db = ["~^b.*","s1"] +[relay] # directory of relay logs. Empty string indicates disabling relay log. # relay log works only if the downstream is TiDB/MySQL. -relay-log-dir = "" +# log-dir = "" # max file size of each relay log -relay-log-size = 10485760 -# read buffer size of relay log -relay-read-buf-size = 8 +# log-size = 10485760 #[[syncer.replicate-do-table]] #db-name ="test" diff --git a/drainer/config.go b/drainer/config.go index ddc0f8c3d..0406157e3 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -71,14 +71,18 @@ type SyncerConfig struct { DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"` DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"` DestDBType string `toml:"db-type" json:"db-type"` - RelayLogDir string `toml:"relay-log-dir" json:"relay-log-dir"` - RelayLogSize int64 `toml:"relay-log-size" json:"relay-log-size"` - RelayReadBufSize int `toml:"relay-read-buf-size" json:"relay-read-buf-size"` + Relay RelayConfig `toml:"relay" json:"relay"` EnableDispatch bool `toml:"enable-dispatch" json:"enable-dispatch"` SafeMode bool `toml:"safe-mode" json:"safe-mode"` EnableCausality bool `toml:"enable-detect" json:"enable-detect"` } +// RelayConfig is the Relay log's configuration. +type RelayConfig struct { + LogDir string `toml:"log-dir" json:"log-dir"` + LogSize int64 `toml:"log-size" json:"log-size"` +} + // Config holds the configuration of drainer type Config struct { *flag.FlagSet `json:"-"` @@ -133,9 +137,6 @@ func NewConfig() *Config { fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas") fs.IntVar(&cfg.SyncerCfg.WorkerCount, "c", 16, "parallel worker count") fs.StringVar(&cfg.SyncerCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql or tidb or file or kafka; see syncer section in conf/drainer.toml") - fs.StringVar(&cfg.SyncerCfg.RelayLogDir, "relay-log-dir", "", "path to relay log of syncer") - fs.Int64Var(&cfg.SyncerCfg.RelayLogSize, "relay-log-size", 10*1024*1024, "max file size of each relay log") - fs.IntVar(&cfg.SyncerCfg.RelayReadBufSize, "relay-read-buf-size", 8, "read buffer size of relay log") fs.BoolVar(&cfg.SyncerCfg.EnableDispatch, "enable-dispatch", true, "enable dispatching sqls that in one same binlog; if set true, work-count and txn-batch would be useless") fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant") fs.BoolVar(&cfg.SyncerCfg.EnableCausality, "enable-detect", false, "enable detect causality") diff --git a/drainer/relay/relayer.go b/drainer/relay/relayer.go index 613eff57d..81fe8ea02 100644 --- a/drainer/relay/relayer.go +++ b/drainer/relay/relayer.go @@ -22,6 +22,8 @@ import ( var _ Relayer = &relayer{} +const defaultMaxFileSize = 10 * 1024 * 1024 + // Relayer is the interface for writing relay log. type Relayer interface { // WriteBinlog writes binlog to relay log file. @@ -43,6 +45,10 @@ type relayer struct { // NewRelayer creates a relayer. func NewRelayer(dir string, maxFileSize int64, tableInfoGetter translator.TableInfoGetter) (Relayer, error) { + if maxFileSize <= 0 { + maxFileSize = defaultMaxFileSize + } + binlogger, err := binlogfile.OpenBinlogger(dir, maxFileSize) if err != nil { return nil, errors.Trace(err) diff --git a/drainer/syncer.go b/drainer/syncer.go index 9c7fda309..a95f65c98 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -102,8 +102,8 @@ func createDSyncer(cfg *SyncerConfig, schema *Schema) (dsyncer dsync.Syncer, err case "mysql", "tidb": var relayer relay.Relayer // If the dir is empty, it means relayer is disabled. - if len(cfg.RelayLogDir) > 0 { - if relayer, err = relay.NewRelayer(cfg.RelayLogDir, cfg.RelayLogSize, schema); err != nil { + if len(cfg.Relay.LogDir) > 0 { + if relayer, err = relay.NewRelayer(cfg.Relay.LogDir, cfg.Relay.LogSize, schema); err != nil { return nil, errors.Annotate(err, "fail to create relayer") } } From b764a25135002e2e672300035a42890e155e37cd Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Wed, 8 Jan 2020 13:12:09 +0800 Subject: [PATCH 02/13] Add status to tell quit normal or not --- drainer/checkpoint/checkpoint.go | 15 +++++++++++++-- drainer/checkpoint/file.go | 14 ++++++++++++-- drainer/checkpoint/file_test.go | 6 ++++-- drainer/checkpoint/mysql.go | 20 ++++++++++++++------ drainer/checkpoint/mysql_test.go | 7 ++++--- drainer/syncer.go | 6 +++++- 6 files changed, 52 insertions(+), 16 deletions(-) diff --git a/drainer/checkpoint/checkpoint.go b/drainer/checkpoint/checkpoint.go index a9a0a20fa..63af868a4 100644 --- a/drainer/checkpoint/checkpoint.go +++ b/drainer/checkpoint/checkpoint.go @@ -19,6 +19,14 @@ import ( "go.uber.org/zap" ) +const ( + // StatusNormal means server quit normally, data <= ts is synced to downstream + StatusNormal int = 0 + + // StatusRunning means server running or quit abnormally, part of data may or may not been synced to downstream + StatusRunning int = 1 +) + var ( // ErrCheckPointClosed indicates the CheckPoint already closed. ErrCheckPointClosed = errors.New("CheckPoint already closed") @@ -31,11 +39,14 @@ type CheckPoint interface { Load() error // Save saves checkpoint information. - Save(int64, int64) error + Save(commitTS int64, slaveTS int64, status int) error - // Pos gets position information. + // TS gets checkpoint commit timestamp. TS() int64 + // Status return the status saved. + Status() int + // Close closes the CheckPoint and release resources, after closed other methods should not be called again. Close() error } diff --git a/drainer/checkpoint/file.go b/drainer/checkpoint/file.go index e7d287c73..3992dce71 100644 --- a/drainer/checkpoint/file.go +++ b/drainer/checkpoint/file.go @@ -31,7 +31,8 @@ type FileCheckPoint struct { name string - CommitTS int64 `toml:"commitTS" json:"commitTS"` + StatusSaved int `toml:"status" json:"status"` + CommitTS int64 `toml:"commitTS" json:"commitTS"` } // NewFile creates a new FileCheckpoint. @@ -81,7 +82,7 @@ func (sp *FileCheckPoint) Load() error { } // Save implements CheckPoint.Save interface -func (sp *FileCheckPoint) Save(ts, slaveTS int64) error { +func (sp *FileCheckPoint) Save(ts, slaveTS int64, status int) error { sp.Lock() defer sp.Unlock() @@ -90,6 +91,7 @@ func (sp *FileCheckPoint) Save(ts, slaveTS int64) error { } sp.CommitTS = ts + sp.StatusSaved = status var buf bytes.Buffer e := toml.NewEncoder(&buf) @@ -114,6 +116,14 @@ func (sp *FileCheckPoint) TS() int64 { return sp.CommitTS } +// Status implements CheckPoint.Status interface +func (sp *FileCheckPoint) Status() int { + sp.RLock() + defer sp.RUnlock() + + return sp.StatusSaved +} + // Close implements CheckPoint.Close interface func (sp *FileCheckPoint) Close() error { sp.Lock() diff --git a/drainer/checkpoint/file_test.go b/drainer/checkpoint/file_test.go index 9862313a6..7f08bf87d 100644 --- a/drainer/checkpoint/file_test.go +++ b/drainer/checkpoint/file_test.go @@ -31,14 +31,16 @@ func (t *testCheckPointSuite) TestFile(c *C) { // zero (initial) CommitTs c.Assert(meta.TS(), Equals, int64(0)) + c.Assert(meta.Status(), Equals, StatusNormal) testTs := int64(1) // save ts - err = meta.Save(testTs, 0) + err = meta.Save(testTs, 0, StatusRunning) c.Assert(err, IsNil) // check ts ts := meta.TS() c.Assert(ts, Equals, testTs) + c.Assert(meta.Status(), Equals, StatusRunning) // check load ts err = meta.Load() @@ -64,6 +66,6 @@ func (t *testCheckPointSuite) TestFile(c *C) { err = meta.Close() c.Assert(err, IsNil) c.Assert(errors.Cause(meta.Load()), Equals, ErrCheckPointClosed) - c.Assert(errors.Cause(meta.Save(0, 0)), Equals, ErrCheckPointClosed) + c.Assert(errors.Cause(meta.Save(0, 0, StatusNormal)), Equals, ErrCheckPointClosed) c.Assert(errors.Cause(meta.Close()), Equals, ErrCheckPointClosed) } diff --git a/drainer/checkpoint/mysql.go b/drainer/checkpoint/mysql.go index 2c60465a6..4a861789c 100644 --- a/drainer/checkpoint/mysql.go +++ b/drainer/checkpoint/mysql.go @@ -6,9 +6,7 @@ // // http://www.apache.org/licenses/LICENSE-2.0 // -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and +// Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. package checkpoint @@ -35,8 +33,9 @@ type MysqlCheckPoint struct { schema string table string - CommitTS int64 `toml:"commitTS" json:"commitTS"` - TsMap map[string]int64 `toml:"ts-map" json:"ts-map"` + StatusSaved int `toml:"status" json:"status"` + CommitTS int64 `toml:"commitTS" json:"commitTS"` + TsMap map[string]int64 `toml:"ts-map" json:"ts-map"` } var sqlOpenDB = pkgsql.OpenDB @@ -106,7 +105,7 @@ func (sp *MysqlCheckPoint) Load() error { } // Save implements checkpoint.Save interface -func (sp *MysqlCheckPoint) Save(ts, slaveTS int64) error { +func (sp *MysqlCheckPoint) Save(ts, slaveTS int64, status int) error { sp.Lock() defer sp.Unlock() @@ -115,6 +114,7 @@ func (sp *MysqlCheckPoint) Save(ts, slaveTS int64) error { } sp.CommitTS = ts + sp.StatusSaved = status if slaveTS > 0 { sp.TsMap["master-ts"] = ts @@ -135,6 +135,14 @@ func (sp *MysqlCheckPoint) Save(ts, slaveTS int64) error { return nil } +// Status implements CheckPoint.Status interface +func (sp *MysqlCheckPoint) Status() int { + sp.RLock() + defer sp.RUnlock() + + return sp.StatusSaved +} + // TS implements CheckPoint.TS interface func (sp *MysqlCheckPoint) TS() int64 { sp.RLock() diff --git a/drainer/checkpoint/mysql_test.go b/drainer/checkpoint/mysql_test.go index 411bb0b79..0f641838f 100644 --- a/drainer/checkpoint/mysql_test.go +++ b/drainer/checkpoint/mysql_test.go @@ -49,7 +49,7 @@ func (s *saveSuite) TestShouldSaveCheckpoint(c *C) { c.Assert(err, IsNil) mock.ExpectExec("replace into db.tbl.*").WillReturnResult(sqlmock.NewResult(0, 0)) cp := MysqlCheckPoint{db: db, schema: "db", table: "tbl"} - err = cp.Save(1111, 0) + err = cp.Save(1111, 0, StatusRunning) c.Assert(err, IsNil) } @@ -63,7 +63,7 @@ func (s *saveSuite) TestShouldUpdateTsMap(c *C) { table: "tbl", TsMap: make(map[string]int64), } - err = cp.Save(65536, 3333) + err = cp.Save(65536, 3333, StatusRunning) c.Assert(err, IsNil) c.Assert(cp.TsMap["master-ts"], Equals, int64(65536)) c.Assert(cp.TsMap["slave-ts"], Equals, int64(3333)) @@ -83,12 +83,13 @@ func (s *loadSuite) TestShouldLoadFromDB(c *C) { TsMap: make(map[string]int64), } rows := sqlmock.NewRows([]string{"checkPoint"}). - AddRow(`{"commitTS": 1024, "ts-map": {"master-ts": 2000, "slave-ts": 1999}}`) + AddRow(`{"commitTS": 1024, "status": 1, "ts-map": {"master-ts": 2000, "slave-ts": 1999}}`) mock.ExpectQuery("select checkPoint from db.tbl.*").WillReturnRows(rows) err = cp.Load() c.Assert(err, IsNil) c.Assert(cp.CommitTS, Equals, int64(1024)) + c.Assert(cp.StatusSaved, Equals, 1) c.Assert(cp.TsMap["master-ts"], Equals, int64(2000)) c.Assert(cp.TsMap["slave-ts"], Equals, int64(1999)) } diff --git a/drainer/syncer.go b/drainer/syncer.go index a95f65c98..639d8281e 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -252,7 +252,7 @@ func (s *Syncer) savePoint(ts, slaveTS int64) { } log.Info("write save point", zap.Int64("ts", ts)) - err := s.cp.Save(ts, slaveTS) + err := s.cp.Save(ts, slaveTS, checkpoint.StatusRunning) if err != nil { log.Fatal("save checkpoint failed", zap.Int64("ts", ts), zap.Error(err)) } @@ -436,6 +436,10 @@ ForLoop: if err != nil { return err } + + if cerr == nil { + s.cp.Save(s.cp.TS(), 0, checkpoint.StatusNormal) + } return cerr } From 8876d361ccef42c1fadb85a3c6e311b0e3242ebe Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Fri, 10 Jan 2020 14:42:08 +0800 Subject: [PATCH 03/13] Try to recover status as normal if can't init pd client --- cmd/drainer/drainer.toml | 2 +- drainer/checkpoint/checkpoint.go | 2 +- drainer/checkpoint/file.go | 6 +- drainer/checkpoint/file_test.go | 13 +-- drainer/checkpoint/mysql.go | 19 +++- drainer/checkpoint/util.go | 32 +++++++ drainer/checkpoint/util_test.go | 58 ++++++++++++ drainer/config.go | 5 + drainer/relay.go | 155 +++++++++++++++++++++++++++++++ drainer/relay/reader.go | 16 +--- drainer/relay/reader_test.go | 4 +- drainer/relay_test.go | 104 +++++++++++++++++++++ drainer/server.go | 4 + drainer/sync/mysql.go | 37 +++++++- drainer/syncer_test.go | 2 +- 15 files changed, 427 insertions(+), 32 deletions(-) create mode 100644 drainer/checkpoint/util_test.go create mode 100644 drainer/relay.go create mode 100644 drainer/relay_test.go diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index 9073a4409..2d39ea82f 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -63,7 +63,7 @@ ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql" # #replicate-do-db = ["~^b.*","s1"] -[relay] +[syncer.relay] # directory of relay logs. Empty string indicates disabling relay log. # relay log works only if the downstream is TiDB/MySQL. # log-dir = "" diff --git a/drainer/checkpoint/checkpoint.go b/drainer/checkpoint/checkpoint.go index 63af868a4..067f7c72b 100644 --- a/drainer/checkpoint/checkpoint.go +++ b/drainer/checkpoint/checkpoint.go @@ -61,7 +61,7 @@ func NewCheckPoint(cfg *Config) (CheckPoint, error) { case "mysql", "tidb": cp, err = newMysql(cfg) case "file": - cp, err = NewFile(cfg) + cp, err = NewFile(cfg.InitialCommitTS, cfg.CheckPointFile) default: err = errors.Errorf("unsupported checkpoint type %s", cfg.CheckpointType) } diff --git a/drainer/checkpoint/file.go b/drainer/checkpoint/file.go index 3992dce71..fdb31dd1a 100644 --- a/drainer/checkpoint/file.go +++ b/drainer/checkpoint/file.go @@ -36,10 +36,10 @@ type FileCheckPoint struct { } // NewFile creates a new FileCheckpoint. -func NewFile(cfg *Config) (CheckPoint, error) { +func NewFile(initialCommitTS int64, filePath string) (CheckPoint, error) { pb := &FileCheckPoint{ - initialCommitTS: cfg.InitialCommitTS, - name: cfg.CheckPointFile, + initialCommitTS: initialCommitTS, + name: filePath, } err := pb.Load() if err != nil { diff --git a/drainer/checkpoint/file_test.go b/drainer/checkpoint/file_test.go index 7f08bf87d..1e03a99fd 100644 --- a/drainer/checkpoint/file_test.go +++ b/drainer/checkpoint/file_test.go @@ -23,9 +23,7 @@ import ( func (t *testCheckPointSuite) TestFile(c *C) { fileName := "/tmp/test" notExistFileName := "test_not_exist" - cfg := new(Config) - cfg.CheckPointFile = fileName - meta, err := NewFile(cfg) + meta, err := NewFile(0, fileName) c.Assert(err, IsNil) defer os.RemoveAll(fileName) @@ -49,18 +47,17 @@ func (t *testCheckPointSuite) TestFile(c *C) { c.Assert(ts, Equals, testTs) // check not exist meta file - cfg.CheckPointFile = notExistFileName - meta, err = NewFile(cfg) + meta, err = NewFile(0, notExistFileName) c.Assert(err, IsNil) err = meta.Load() c.Assert(err, IsNil) c.Assert(meta.TS(), Equals, int64(0)) // check not exist meta file, but with initialCommitTs - cfg.InitialCommitTS = 123 - meta, err = NewFile(cfg) + var initialCommitTS int64 = 123 + meta, err = NewFile(initialCommitTS, notExistFileName) c.Assert(err, IsNil) - c.Assert(meta.TS(), Equals, cfg.InitialCommitTS) + c.Assert(meta.TS(), Equals, initialCommitTS) // close the checkpoint err = meta.Close() diff --git a/drainer/checkpoint/mysql.go b/drainer/checkpoint/mysql.go index 4a861789c..1b3914aa6 100644 --- a/drainer/checkpoint/mysql.go +++ b/drainer/checkpoint/mysql.go @@ -6,7 +6,9 @@ // // http://www.apache.org/licenses/LICENSE-2.0 // -// Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and // limitations under the License. package checkpoint @@ -17,6 +19,9 @@ import ( "sync" "github.com/pingcap/errors" + "github.com/pingcap/log" + "go.uber.org/zap" + // mysql driver _ "github.com/go-sql-driver/mysql" pkgsql "github.com/pingcap/tidb-binlog/pkg/sql" @@ -38,6 +43,8 @@ type MysqlCheckPoint struct { TsMap map[string]int64 `toml:"ts-map" json:"ts-map"` } +var _ CheckPoint = &MysqlCheckPoint{} + var sqlOpenDB = pkgsql.OpenDB func newMysql(cfg *Config) (CheckPoint, error) { @@ -67,6 +74,16 @@ func newMysql(cfg *Config) (CheckPoint, error) { return nil, errors.Annotatef(err, "exec failed, sql: %s", sql) } + if sp.clusterID == 0 { + id, err := getClusterID(db, sp.schema, sp.table) + if err != nil { + return nil, errors.Trace(err) + } + + log.Info("set cluster id", zap.Uint64("id", id)) + sp.clusterID = id + } + err = sp.Load() return sp, errors.Trace(err) } diff --git a/drainer/checkpoint/util.go b/drainer/checkpoint/util.go index e48132e9e..647617d51 100644 --- a/drainer/checkpoint/util.go +++ b/drainer/checkpoint/util.go @@ -14,10 +14,12 @@ package checkpoint import ( + "database/sql" "fmt" // mysql driver _ "github.com/go-sql-driver/mysql" + "github.com/pingcap/errors" ) // DBConfig is the DB configuration. @@ -74,6 +76,36 @@ func genReplaceSQL(sp *MysqlCheckPoint, str string) string { return fmt.Sprintf("replace into %s.%s values(%d, '%s')", sp.schema, sp.table, sp.clusterID, str) } +func getClusterID(db *sql.DB, schema string, table string) (id uint64, err error) { + sqlQuery := fmt.Sprintf("select clusterID from %s.%s", schema, table) + rows, err := db.Query(sqlQuery) + if err != nil { + return 0, errors.Trace(err) + } + + for rows.Next() { + // multi row + if id > 0 { + return 0, errors.New("there are multi row int checkpoint table") + } + + err = rows.Scan(&id) + if err != nil { + return 0, errors.Trace(err) + } + } + + if rows.Err() != nil { + return 0, errors.Trace(rows.Err()) + } + + if id == 0 { + return 0, errors.New("no any item at checkpoint table") + } + + return +} + func genSelectSQL(sp *MysqlCheckPoint) string { return fmt.Sprintf("select checkPoint from %s.%s where clusterID = %d", sp.schema, sp.table, sp.clusterID) } diff --git a/drainer/checkpoint/util_test.go b/drainer/checkpoint/util_test.go new file mode 100644 index 000000000..a05378f59 --- /dev/null +++ b/drainer/checkpoint/util_test.go @@ -0,0 +1,58 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkpoint + +import ( + "github.com/DATA-DOG/go-sqlmock" + . "github.com/pingcap/check" +) + +var _ = Suite(&testUtil{}) + +type testUtil struct{} + +func (t *testUtil) TestG(c *C) { + tests := []struct { + name string + rows []uint64 + id uint64 + err bool + }{ + {"no row", nil, 0, true}, + {"on row", []uint64{1}, 1, false}, + {"multi row", []uint64{1, 2}, 0, true}, + } + + for _, test := range tests { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + + rows := sqlmock.NewRows([]string{"clusterID"}) + for _, row := range test.rows { + rows.AddRow(row) + } + + mock.ExpectQuery("select clusterID from .*").WillReturnRows(rows) + + c.Log("test: ", test.name) + id, err := getClusterID(db, "schema", "table") + if test.err { + c.Assert(err, NotNil) + c.Assert(id, Equals, test.id) + } else { + c.Assert(err, IsNil) + c.Assert(id, Equals, test.id) + } + } +} diff --git a/drainer/config.go b/drainer/config.go index 0406157e3..51e848964 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -83,6 +83,11 @@ type RelayConfig struct { LogSize int64 `toml:"log-size" json:"log-size"` } +// SwitchOn return true if we need to handle relay log. +func (rc RelayConfig) SwitchOn() bool { + return len(rc.LogDir) > 0 +} + // Config holds the configuration of drainer type Config struct { *flag.FlagSet `json:"-"` diff --git a/drainer/relay.go b/drainer/relay.go new file mode 100644 index 000000000..db666ea95 --- /dev/null +++ b/drainer/relay.go @@ -0,0 +1,155 @@ +package drainer + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tidb-binlog/drainer/checkpoint" + "github.com/pingcap/tidb-binlog/drainer/relay" + "github.com/pingcap/tidb-binlog/drainer/sync" + "github.com/pingcap/tidb-binlog/pkg/loader" + obinlog "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog" + "go.uber.org/zap" +) + +func feedByRelayLogIfNeed(cfg *Config) error { + if !cfg.SyncerCfg.Relay.SwitchOn() { + return nil + } + + // for the mysql type checkpoint + // clusterID will be use as the key + // we can't get the cluster id from pd so we just set 0 + // and the checkpoint will use the clusterID exist at the checkpoint table. + cpCfg, err := GenCheckPointCfg(cfg, 0 /* clusterID */) + if err != nil { + return errors.Trace(err) + } + + scfg := cfg.SyncerCfg + + cp, err := checkpoint.NewCheckPoint(cpCfg) + if err != nil { + return errors.Trace(err) + } + + defer cp.Close() + + if cp.Status() == checkpoint.StatusNormal { + return nil + } + + reader, err := relay.NewReader(scfg.Relay.LogDir, 1 /* readBufferSize */) + if err != nil { + return errors.Annotate(err, "failed to create reader") + } + + db, ld, err := sync.CreateLoader(scfg.To, scfg.WorkerCount, scfg.TxnBatch, nil, scfg.StrSQLMode, scfg.DestDBType) + if err != nil { + return errors.Annotate(err, "faild to create loader") + } + + defer db.Close() + + err = feedByRelayLog(reader, ld, cp) + if err != nil { + return errors.Trace(err) + } + + return nil +} + +// feedByRelayLog will take over the `ld loader.Loader`. +func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint) error { + checkpointTS := cp.TS() + var lastSuccessTS int64 + r.Run() + + loaderQuit := make(chan struct{}) + var loaderErr error + go func() { + ld.SetSafeMode(true) + loaderErr = ld.Run() + close(loaderQuit) + }() + + var readerTxns <-chan *obinlog.Binlog + // var readerInputClosed bool + var toPushLoaderTxn *loader.Txn + var loaderInput chan<- *loader.Txn + successTxnC := ld.Successes() + + readerTxns = r.Txns() + + loaderClosed := false +loop: + for { + if readerTxns == nil && loaderInput == nil && !loaderClosed { + ld.Close() + loaderClosed = true + } + + if loaderClosed && successTxnC == nil { + break + } + + select { + case sbinlog, ok := <-readerTxns: + if !ok { + log.Info("readerTxns closed") + readerTxns = nil + continue + } + txn, err := loader.SlaveBinlogToTxn(sbinlog) + if err != nil { + return errors.Trace(err) + } + + if sbinlog.CommitTs <= checkpointTS { + continue + } + + txn.Metadata = sbinlog.CommitTs + toPushLoaderTxn = txn + loaderInput = ld.Input() + case loaderInput <- toPushLoaderTxn: + loaderInput = nil + toPushLoaderTxn = nil + case success, ok := <-successTxnC: + if !ok { + successTxnC = nil + log.Info("success closed") + break loop + } + lastSuccessTS = success.Metadata.(int64) + case <-loaderQuit: + if loaderErr != nil { + return errors.Trace(loaderErr) + } + loaderQuit = nil + } + } + + log.Info("finish feed by relay log") + + readerErr := <-r.Error() + <-loaderQuit + + if readerErr != nil { + return errors.Trace(readerErr) + } + + if loaderErr != nil { + return errors.Trace(loaderErr) + } + + if lastSuccessTS > checkpointTS { + err := cp.Save(lastSuccessTS, 0 /* slaveTS */, checkpoint.StatusNormal) + if err != nil { + return errors.Trace(err) + } + + log.Info("update status as normal", zap.Int64("ts", lastSuccessTS)) + } + + return nil +} diff --git a/drainer/relay/reader.go b/drainer/relay/reader.go index 063ea542c..1f8f76461 100644 --- a/drainer/relay/reader.go +++ b/drainer/relay/reader.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb-binlog/pkg/binlogfile" - "github.com/pingcap/tidb-binlog/pkg/loader" obinlog "github.com/pingcap/tidb-tools/tidb-binlog/slave_binlog_proto/go-binlog" "github.com/pingcap/tipb/go-binlog" ) @@ -32,7 +31,7 @@ type Reader interface { Run() context.CancelFunc // Txns returns parsed transactions. - Txns() <-chan *loader.Txn + Txns() <-chan *obinlog.Binlog // Close releases resources. Close() error @@ -43,7 +42,7 @@ type Reader interface { type reader struct { binlogger binlogfile.Binlogger - txns chan *loader.Txn + txns chan *obinlog.Binlog err chan error } @@ -56,7 +55,7 @@ func NewReader(dir string, readBufferSize int) (Reader, error) { return &reader{ binlogger: binlogger, - txns: make(chan *loader.Txn, readBufferSize), + txns: make(chan *obinlog.Binlog, readBufferSize), }, nil } @@ -85,16 +84,11 @@ func (r *reader) Run() context.CancelFunc { break } - var txn *loader.Txn - txn, err = loader.SlaveBinlogToTxn(slaveBinlog) - if err != nil { - break - } select { case <-ctx.Done(): err = ctx.Err() log.Warn("Producing transaction is interrupted") - case r.txns <- txn: + case r.txns <- slaveBinlog: } } // If binlogger is not done, notify it to stop. @@ -114,7 +108,7 @@ func (r *reader) Run() context.CancelFunc { } // Txns implements Reader interface. -func (r *reader) Txns() <-chan *loader.Txn { +func (r *reader) Txns() <-chan *obinlog.Binlog { return r.txns } diff --git a/drainer/relay/reader_test.go b/drainer/relay/reader_test.go index d1900893d..55c276cdf 100644 --- a/drainer/relay/reader_test.go +++ b/drainer/relay/reader_test.go @@ -90,7 +90,9 @@ func (r *testReaderSuite) readBinlogAndCheck(c *C, dir string, expectedNumber in number := 0 for txn := range relayReader.Txns() { number++ - lastTxn = txn + loaderTxn, err := loader.SlaveBinlogToTxn(txn) + c.Assert(err, IsNil) + lastTxn = loaderTxn } c.Assert(<-relayReader.Error(), IsNil) c.Assert(number, Equals, expectedNumber) diff --git a/drainer/relay_test.go b/drainer/relay_test.go new file mode 100644 index 000000000..469cde8cd --- /dev/null +++ b/drainer/relay_test.go @@ -0,0 +1,104 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package drainer + +import ( + "github.com/pingcap/check" + "github.com/pingcap/tidb-binlog/drainer/checkpoint" + "github.com/pingcap/tidb-binlog/drainer/relay" + "github.com/pingcap/tidb-binlog/drainer/translator" + "github.com/pingcap/tidb-binlog/pkg/binlogfile" + "github.com/pingcap/tidb-binlog/pkg/loader" +) + +type relaySuite struct{} + +var _ = check.Suite(&relaySuite{}) + +type noOpLoader struct { + input chan *loader.Txn + success chan *loader.Txn +} + +func newNoOpLoader() *noOpLoader { + return &noOpLoader{ + input: make(chan *loader.Txn, 1), + success: make(chan *loader.Txn, 1), + } +} + +func (ld *noOpLoader) Run() error { + for txn := range ld.input { + ld.success <- txn + } + + close(ld.success) + return nil +} + +func (ld *noOpLoader) Close() { + close(ld.input) +} + +func (ld *noOpLoader) Input() chan<- *loader.Txn { + return ld.input +} + +func (ld *noOpLoader) Successes() <-chan *loader.Txn { + return ld.success +} + +func (ld *noOpLoader) SetSafeMode(bool) { + return +} + +func (ld *noOpLoader) GetSafeMode() bool { + return false +} + +var _ loader.Loader = &noOpLoader{} + +func (s *relaySuite) TestFeedByRealyLog(c *check.C) { + cp, err := checkpoint.NewFile(0 /* initialCommitTS */, c.MkDir()+"/cp") + c.Assert(err, check.IsNil) + cp.Save(0, 0, checkpoint.StatusRunning) + + ld := newNoOpLoader() + + // write some relay log + gen := &translator.BinlogGenerator{} + relayDir := c.MkDir() + relayer, err := relay.NewRelayer(relayDir, binlogfile.SegmentSizeBytes, gen) + c.Assert(err, check.IsNil) + + for i := 0; i < 10; i++ { + gen.SetInsert(c) + gen.TiBinlog.StartTs = int64(i) + gen.TiBinlog.CommitTs = int64(i) * 10 + relayer.WriteBinlog(gen.Schema, gen.Table, gen.TiBinlog, gen.PV) + } + + relayer.Close() + c.Assert(err, check.IsNil) + + reader, err := relay.NewReader(relayDir, 1) + c.Assert(err, check.IsNil) + + err = feedByRelayLog(reader, ld, cp) + c.Assert(err, check.IsNil) + + ts := cp.TS() + c.Assert(ts, check.Equals, int64(90) /* latest commit ts */) + c.Assert(cp.Status(), check.Equals, checkpoint.StatusNormal) +} diff --git a/drainer/server.go b/drainer/server.go index ec3877255..472c2315a 100644 --- a/drainer/server.go +++ b/drainer/server.go @@ -101,6 +101,10 @@ func NewServer(cfg *Config) (*Server, error) { // get pd client and cluster ID pdCli, err := getPdClient(cfg.EtcdURLs, cfg.Security) if err != nil { + err := feedByRelayLogIfNeed(cfg) + if err != nil { + return nil, errors.Trace(err) + } return nil, errors.Trace(err) } diff --git a/drainer/sync/mysql.go b/drainer/sync/mysql.go index dfaf716c2..b8246387f 100644 --- a/drainer/sync/mysql.go +++ b/drainer/sync/mysql.go @@ -39,11 +39,18 @@ type MysqlSyncer struct { // should only be used for unit test to create mock db var createDB = loader.CreateDBWithSQLMode -// NewMysqlSyncer returns a instance of MysqlSyncer -func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, worker int, batchSize int, queryHistogramVec *prometheus.HistogramVec, sqlMode *string, destDBType string, relayer relay.Relayer) (*MysqlSyncer, error) { - db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, sqlMode) +// CreateLoader create the Loader instance. +func CreateLoader( + cfg *DBConfig, + worker int, + batchSize int, + queryHistogramVec *prometheus.HistogramVec, + sqlMode *string, + destDBType string, +) (db *sql.DB, ld loader.Loader, err error) { + db, err = createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, sqlMode) if err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) } var opts []loader.Option @@ -55,7 +62,27 @@ func NewMysqlSyncer(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter, w })) } - loader, err := loader.NewLoader(db, opts...) + ld, err = loader.NewLoader(db, opts...) + if err != nil { + db.Close() + return nil, nil, errors.Trace(err) + } + + return +} + +// NewMysqlSyncer returns a instance of MysqlSyncer +func NewMysqlSyncer( + cfg *DBConfig, + tableInfoGetter translator.TableInfoGetter, + worker int, + batchSize int, + queryHistogramVec *prometheus.HistogramVec, + sqlMode *string, + destDBType string, + relayer relay.Relayer, +) (*MysqlSyncer, error) { + db, loader, err := CreateLoader(cfg, worker, batchSize, queryHistogramVec, sqlMode, destDBType) if err != nil { return nil, errors.Trace(err) } diff --git a/drainer/syncer_test.go b/drainer/syncer_test.go index 6902d0b3d..1a271b7f8 100644 --- a/drainer/syncer_test.go +++ b/drainer/syncer_test.go @@ -64,7 +64,7 @@ func (s *syncerSuite) TestNewSyncer(c *check.C) { } cpFile := c.MkDir() + "/checkpoint" - cp, err := checkpoint.NewFile(&checkpoint.Config{CheckPointFile: cpFile}) + cp, err := checkpoint.NewFile(0, cpFile) c.Assert(err, check.IsNil) syncer, err := NewSyncer(cp, cfg, nil) From 3604ca6591d973e2dd50e16f508cf6e6a8ab08e0 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Fri, 10 Jan 2020 15:07:08 +0800 Subject: [PATCH 04/13] Fix return wrong err --- drainer/relay.go | 2 +- drainer/server.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/drainer/relay.go b/drainer/relay.go index db666ea95..ef2f72521 100644 --- a/drainer/relay.go +++ b/drainer/relay.go @@ -43,7 +43,7 @@ func feedByRelayLogIfNeed(cfg *Config) error { return errors.Annotate(err, "failed to create reader") } - db, ld, err := sync.CreateLoader(scfg.To, scfg.WorkerCount, scfg.TxnBatch, nil, scfg.StrSQLMode, scfg.DestDBType) + db, ld, err := sync.CreateLoader(scfg.To, scfg.WorkerCount, scfg.TxnBatch, nil, scfg.StrSQLMode, scfg.DestDBType, scfg.To.SyncMode) if err != nil { return errors.Annotate(err, "faild to create loader") } diff --git a/drainer/server.go b/drainer/server.go index 472c2315a..6b830365b 100644 --- a/drainer/server.go +++ b/drainer/server.go @@ -101,9 +101,9 @@ func NewServer(cfg *Config) (*Server, error) { // get pd client and cluster ID pdCli, err := getPdClient(cfg.EtcdURLs, cfg.Security) if err != nil { - err := feedByRelayLogIfNeed(cfg) - if err != nil { - return nil, errors.Trace(err) + ferr := feedByRelayLogIfNeed(cfg) + if ferr != nil { + return nil, errors.Trace(ferr) } return nil, errors.Trace(err) } From 95e28fffb181afcfbbd1a63db7408c3c9533c5aa Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Fri, 10 Jan 2020 15:16:53 +0800 Subject: [PATCH 05/13] remove check by empty dir directly --- drainer/syncer.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/drainer/syncer.go b/drainer/syncer.go index 639d8281e..0de0479fe 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -101,8 +101,7 @@ func createDSyncer(cfg *SyncerConfig, schema *Schema) (dsyncer dsync.Syncer, err } case "mysql", "tidb": var relayer relay.Relayer - // If the dir is empty, it means relayer is disabled. - if len(cfg.Relay.LogDir) > 0 { + if cfg.Relay.SwitchOn() { if relayer, err = relay.NewRelayer(cfg.Relay.LogDir, cfg.Relay.LogSize, schema); err != nil { return nil, errors.Annotate(err, "fail to create relayer") } From 56877bb818ae9bca22a0725d5ccc29564b341bb1 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Fri, 10 Jan 2020 15:22:45 +0800 Subject: [PATCH 06/13] Rename log-size to max-file-size --- cmd/drainer/drainer.toml | 2 +- drainer/config.go | 4 ++-- drainer/syncer.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index fb9746551..2e90440f1 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -68,7 +68,7 @@ ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql" # relay log works only if the downstream is TiDB/MySQL. # log-dir = "" # max file size of each relay log -# log-size = 10485760 +# max-file-size = 10485760 #[[syncer.replicate-do-table]] #db-name ="test" diff --git a/drainer/config.go b/drainer/config.go index 51e848964..6132cbf49 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -79,8 +79,8 @@ type SyncerConfig struct { // RelayConfig is the Relay log's configuration. type RelayConfig struct { - LogDir string `toml:"log-dir" json:"log-dir"` - LogSize int64 `toml:"log-size" json:"log-size"` + LogDir string `toml:"log-dir" json:"log-dir"` + MaxFileSize int64 `toml:"max-file-size" json:"max-file-size"` } // SwitchOn return true if we need to handle relay log. diff --git a/drainer/syncer.go b/drainer/syncer.go index 0de0479fe..5ebe7caf4 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -102,7 +102,7 @@ func createDSyncer(cfg *SyncerConfig, schema *Schema) (dsyncer dsync.Syncer, err case "mysql", "tidb": var relayer relay.Relayer if cfg.Relay.SwitchOn() { - if relayer, err = relay.NewRelayer(cfg.Relay.LogDir, cfg.Relay.LogSize, schema); err != nil { + if relayer, err = relay.NewRelayer(cfg.Relay.LogDir, cfg.Relay.MaxFileSize, schema); err != nil { return nil, errors.Annotate(err, "fail to create relayer") } } From 81db7e58149781ecf8e1c57c7e22131907a108d4 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Fri, 10 Jan 2020 18:55:43 +0800 Subject: [PATCH 07/13] Update as normal anyway the statue may be Running even it's consistent with the log --- drainer/relay.go | 14 ++++++-------- pkg/util/util.go | 19 ++++++------------- 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/drainer/relay.go b/drainer/relay.go index ef2f72521..0f7ff63a3 100644 --- a/drainer/relay.go +++ b/drainer/relay.go @@ -61,7 +61,7 @@ func feedByRelayLogIfNeed(cfg *Config) error { // feedByRelayLog will take over the `ld loader.Loader`. func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint) error { checkpointTS := cp.TS() - var lastSuccessTS int64 + lastSuccessTS := checkpointTS r.Run() loaderQuit := make(chan struct{}) @@ -142,14 +142,12 @@ loop: return errors.Trace(loaderErr) } - if lastSuccessTS > checkpointTS { - err := cp.Save(lastSuccessTS, 0 /* slaveTS */, checkpoint.StatusNormal) - if err != nil { - return errors.Trace(err) - } - - log.Info("update status as normal", zap.Int64("ts", lastSuccessTS)) + err := cp.Save(lastSuccessTS, 0 /* slaveTS */, checkpoint.StatusNormal) + if err != nil { + return errors.Trace(err) } + log.Info("update status as normal", zap.Int64("ts", lastSuccessTS)) + return nil } diff --git a/pkg/util/util.go b/pkg/util/util.go index 1a1603e3c..69f0ade0c 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -242,6 +242,7 @@ func QueryLatestTsFromPD(tiStore kv.Storage) (int64, error) { } // Store the function in a variable so that we can mock it when testing +// pd.NewClient will retry internally for init info. var newPdCli = pd.NewClient // GetPdClient create a PD client @@ -251,20 +252,12 @@ func GetPdClient(etcdURLs string, securityConfig security.Config) (pd.Client, er return nil, errors.Trace(err) } - pdReconnTimes := 30 - var pdCli pd.Client - for i := 1; i < pdReconnTimes; i++ { - pdCli, err = newPdCli(urlv.StringSlice(), pd.SecurityOption{ - CAPath: securityConfig.SSLCA, - CertPath: securityConfig.SSLCert, - KeyPath: securityConfig.SSLKey, - }) - if err == nil { - break - } - time.Sleep(time.Duration(pdReconnTimes*i) * time.Millisecond) - } + pdCli, err = newPdCli(urlv.StringSlice(), pd.SecurityOption{ + CAPath: securityConfig.SSLCA, + CertPath: securityConfig.SSLCert, + KeyPath: securityConfig.SSLKey, + }) return pdCli, errors.Trace(err) } From ae18878836e1d9c173e90d1cf133fef918831e68 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Fri, 10 Jan 2020 19:19:52 +0800 Subject: [PATCH 08/13] Fix lint --- drainer/relay.go | 4 ++-- drainer/relay_test.go | 7 ++++--- drainer/syncer.go | 7 ++++--- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/drainer/relay.go b/drainer/relay.go index 0f7ff63a3..e7ea05144 100644 --- a/drainer/relay.go +++ b/drainer/relay.go @@ -81,7 +81,7 @@ func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint) readerTxns = r.Txns() loaderClosed := false -loop: + for { if readerTxns == nil && loaderInput == nil && !loaderClosed { ld.Close() @@ -118,7 +118,7 @@ loop: if !ok { successTxnC = nil log.Info("success closed") - break loop + continue } lastSuccessTS = success.Metadata.(int64) case <-loaderQuit: diff --git a/drainer/relay_test.go b/drainer/relay_test.go index 469cde8cd..80c35070e 100644 --- a/drainer/relay_test.go +++ b/drainer/relay_test.go @@ -60,7 +60,6 @@ func (ld *noOpLoader) Successes() <-chan *loader.Txn { } func (ld *noOpLoader) SetSafeMode(bool) { - return } func (ld *noOpLoader) GetSafeMode() bool { @@ -72,7 +71,8 @@ var _ loader.Loader = &noOpLoader{} func (s *relaySuite) TestFeedByRealyLog(c *check.C) { cp, err := checkpoint.NewFile(0 /* initialCommitTS */, c.MkDir()+"/cp") c.Assert(err, check.IsNil) - cp.Save(0, 0, checkpoint.StatusRunning) + err = cp.Save(0, 0, checkpoint.StatusRunning) + c.Assert(err, check.IsNil) ld := newNoOpLoader() @@ -86,7 +86,8 @@ func (s *relaySuite) TestFeedByRealyLog(c *check.C) { gen.SetInsert(c) gen.TiBinlog.StartTs = int64(i) gen.TiBinlog.CommitTs = int64(i) * 10 - relayer.WriteBinlog(gen.Schema, gen.Table, gen.TiBinlog, gen.PV) + _, err = relayer.WriteBinlog(gen.Schema, gen.Table, gen.TiBinlog, gen.PV) + c.Assert(err, check.IsNil) } relayer.Close() diff --git a/drainer/syncer.go b/drainer/syncer.go index 5ebe7caf4..994b6c47f 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -436,10 +436,11 @@ ForLoop: return err } - if cerr == nil { - s.cp.Save(s.cp.TS(), 0, checkpoint.StatusNormal) + if cerr != nil { + return cerr } - return cerr + + return s.cp.Save(s.cp.TS(), 0, checkpoint.StatusNormal) } // filterTable may drop some table mutation in `PrewriteValue` From 8107f2f09d29e2790e4a3ba599cf0ffdec84970c Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Tue, 14 Jan 2020 15:29:45 +0800 Subject: [PATCH 09/13] Refine some code and comment --- drainer/checkpoint/util.go | 10 +++++++--- drainer/checkpoint/util_test.go | 19 +++++++++++------- drainer/relay.go | 35 +++++++++++++++------------------ drainer/relay_test.go | 6 +++++- drainer/server.go | 2 +- drainer/sync/mysql.go | 5 ++--- 6 files changed, 43 insertions(+), 34 deletions(-) diff --git a/drainer/checkpoint/util.go b/drainer/checkpoint/util.go index 647617d51..62da56655 100644 --- a/drainer/checkpoint/util.go +++ b/drainer/checkpoint/util.go @@ -22,6 +22,10 @@ import ( "github.com/pingcap/errors" ) +// ErrNoCheckpointItem represents there's no any checkpoint item and the cluster id must be specified +// for the mysql checkpoint type. +var ErrNoCheckpointItem = errors.New("no any checkpoint item") + // DBConfig is the DB configuration. type DBConfig struct { Host string `toml:"host" json:"host"` @@ -76,6 +80,7 @@ func genReplaceSQL(sp *MysqlCheckPoint, str string) string { return fmt.Sprintf("replace into %s.%s values(%d, '%s')", sp.schema, sp.table, sp.clusterID, str) } +// getClusterID return the cluster id iff the checkpoint table exist only one row. func getClusterID(db *sql.DB, schema string, table string) (id uint64, err error) { sqlQuery := fmt.Sprintf("select clusterID from %s.%s", schema, table) rows, err := db.Query(sqlQuery) @@ -84,9 +89,8 @@ func getClusterID(db *sql.DB, schema string, table string) (id uint64, err error } for rows.Next() { - // multi row if id > 0 { - return 0, errors.New("there are multi row int checkpoint table") + return 0, errors.New("there are multi row in checkpoint table") } err = rows.Scan(&id) @@ -100,7 +104,7 @@ func getClusterID(db *sql.DB, schema string, table string) (id uint64, err error } if id == 0 { - return 0, errors.New("no any item at checkpoint table") + return 0, ErrNoCheckpointItem } return diff --git a/drainer/checkpoint/util_test.go b/drainer/checkpoint/util_test.go index a05378f59..746652bf5 100644 --- a/drainer/checkpoint/util_test.go +++ b/drainer/checkpoint/util_test.go @@ -16,6 +16,7 @@ package checkpoint import ( "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" + "github.com/pingcap/errors" ) var _ = Suite(&testUtil{}) @@ -24,14 +25,15 @@ type testUtil struct{} func (t *testUtil) TestG(c *C) { tests := []struct { - name string - rows []uint64 - id uint64 - err bool + name string + rows []uint64 + id uint64 + err bool + checkSpecifiedErr error }{ - {"no row", nil, 0, true}, - {"on row", []uint64{1}, 1, false}, - {"multi row", []uint64{1, 2}, 0, true}, + {"no row", nil, 0, true, ErrNoCheckpointItem}, + {"on row", []uint64{1}, 1, false, nil}, + {"multi row", []uint64{1, 2}, 0, true, nil}, } for _, test := range tests { @@ -50,6 +52,9 @@ func (t *testUtil) TestG(c *C) { if test.err { c.Assert(err, NotNil) c.Assert(id, Equals, test.id) + if test.checkSpecifiedErr != nil { + c.Assert(errors.Cause(err), Equals, test.checkSpecifiedErr) + } } else { c.Assert(err, IsNil) c.Assert(id, Equals, test.id) diff --git a/drainer/relay.go b/drainer/relay.go index e7ea05144..22bd63253 100644 --- a/drainer/relay.go +++ b/drainer/relay.go @@ -43,9 +43,10 @@ func feedByRelayLogIfNeed(cfg *Config) error { return errors.Annotate(err, "failed to create reader") } - db, ld, err := sync.CreateLoader(scfg.To, scfg.WorkerCount, scfg.TxnBatch, nil, scfg.StrSQLMode, scfg.DestDBType, scfg.To.SyncMode) + db, ld, err := sync.CreateLoader(scfg.To, scfg.WorkerCount, scfg.TxnBatch, + queryHistogramVec, scfg.StrSQLMode, scfg.DestDBType) if err != nil { - return errors.Annotate(err, "faild to create loader") + return errors.Annotate(err, "failed to create loader") } defer db.Close() @@ -72,31 +73,33 @@ func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint) close(loaderQuit) }() - var readerTxns <-chan *obinlog.Binlog - // var readerInputClosed bool + var readerTxnsC <-chan *obinlog.Binlog var toPushLoaderTxn *loader.Txn - var loaderInput chan<- *loader.Txn + var loaderInputC chan<- *loader.Txn successTxnC := ld.Successes() - readerTxns = r.Txns() + readerTxnsC = r.Txns() loaderClosed := false for { - if readerTxns == nil && loaderInput == nil && !loaderClosed { + // when reader is drained and all txn has been push into loader + // we close cloader. + if readerTxnsC == nil && loaderInputC == nil && !loaderClosed { ld.Close() loaderClosed = true } + // break once we drainer the success items return by loader. if loaderClosed && successTxnC == nil { break } select { - case sbinlog, ok := <-readerTxns: + case sbinlog, ok := <-readerTxnsC: if !ok { - log.Info("readerTxns closed") - readerTxns = nil + log.Info("readerTxnsC closed") + readerTxnsC = nil continue } txn, err := loader.SlaveBinlogToTxn(sbinlog) @@ -110,9 +113,9 @@ func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint) txn.Metadata = sbinlog.CommitTs toPushLoaderTxn = txn - loaderInput = ld.Input() - case loaderInput <- toPushLoaderTxn: - loaderInput = nil + loaderInputC = ld.Input() + case loaderInputC <- toPushLoaderTxn: + loaderInputC = nil toPushLoaderTxn = nil case success, ok := <-successTxnC: if !ok { @@ -125,23 +128,17 @@ func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint) if loaderErr != nil { return errors.Trace(loaderErr) } - loaderQuit = nil } } log.Info("finish feed by relay log") readerErr := <-r.Error() - <-loaderQuit if readerErr != nil { return errors.Trace(readerErr) } - if loaderErr != nil { - return errors.Trace(loaderErr) - } - err := cp.Save(lastSuccessTS, 0 /* slaveTS */, checkpoint.StatusNormal) if err != nil { return errors.Trace(err) diff --git a/drainer/relay_test.go b/drainer/relay_test.go index 80c35070e..7848d3b08 100644 --- a/drainer/relay_test.go +++ b/drainer/relay_test.go @@ -31,6 +31,9 @@ type noOpLoader struct { success chan *loader.Txn } +// noOpLoader just return success for every input txn. +var _ loader.Loader = &noOpLoader{} + func newNoOpLoader() *noOpLoader { return &noOpLoader{ input: make(chan *loader.Txn, 1), @@ -69,10 +72,11 @@ func (ld *noOpLoader) GetSafeMode() bool { var _ loader.Loader = &noOpLoader{} func (s *relaySuite) TestFeedByRealyLog(c *check.C) { - cp, err := checkpoint.NewFile(0 /* initialCommitTS */, c.MkDir()+"/cp") + cp, err := checkpoint.NewFile(0 /* initialCommitTS */, c.MkDir()+"/checkpoint") c.Assert(err, check.IsNil) err = cp.Save(0, 0, checkpoint.StatusRunning) c.Assert(err, check.IsNil) + c.Assert(cp.Status(), check.Equals, checkpoint.StatusRunning) ld := newNoOpLoader() diff --git a/drainer/server.go b/drainer/server.go index 6b830365b..9478cb588 100644 --- a/drainer/server.go +++ b/drainer/server.go @@ -102,7 +102,7 @@ func NewServer(cfg *Config) (*Server, error) { pdCli, err := getPdClient(cfg.EtcdURLs, cfg.Security) if err != nil { ferr := feedByRelayLogIfNeed(cfg) - if ferr != nil { + if ferr != nil && errors.Cause(ferr) != checkpoint.ErrNoCheckpointItem { return nil, errors.Trace(ferr) } return nil, errors.Trace(err) diff --git a/drainer/sync/mysql.go b/drainer/sync/mysql.go index dcb259295..bd94524b4 100644 --- a/drainer/sync/mysql.go +++ b/drainer/sync/mysql.go @@ -48,7 +48,6 @@ func CreateLoader( queryHistogramVec *prometheus.HistogramVec, sqlMode *string, destDBType string, - syncMode int, ) (db *sql.DB, ld loader.Loader, err error) { db, err = createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, sqlMode) if err != nil { @@ -64,7 +63,7 @@ func CreateLoader( })) } - if syncMode != 0 { + if cfg.SyncMode != 0 { mode := loader.SyncMode(cfg.SyncMode) opts = append(opts, loader.SyncModeOption(mode)) @@ -105,7 +104,7 @@ func NewMysqlSyncer( destDBType string, relayer relay.Relayer, ) (*MysqlSyncer, error) { - db, loader, err := CreateLoader(cfg, worker, batchSize, queryHistogramVec, sqlMode, destDBType, cfg.SyncMode) + db, loader, err := CreateLoader(cfg, worker, batchSize, queryHistogramVec, sqlMode, destDBType) if err != nil { return nil, errors.Trace(err) } From d9197461e40e9c090e7a480c9bbe30200fdd7321 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Wed, 5 Feb 2020 16:32:31 +0800 Subject: [PATCH 10/13] Address trivial comment --- drainer/config.go | 7 +++++-- drainer/relay.go | 18 ++++++++++++------ drainer/relay/reader.go | 16 ++++++++-------- drainer/relay/reader_test.go | 2 +- drainer/syncer.go | 2 +- 5 files changed, 27 insertions(+), 18 deletions(-) diff --git a/drainer/config.go b/drainer/config.go index eec031081..f830aa7fb 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -86,8 +86,8 @@ type RelayConfig struct { MaxFileSize int64 `toml:"max-file-size" json:"max-file-size"` } -// SwitchOn return true if we need to handle relay log. -func (rc RelayConfig) SwitchOn() bool { +// IsEnabled return true if we need to handle relay log. +func (rc RelayConfig) IsEnabled() bool { return len(rc.LogDir) > 0 } @@ -148,6 +148,9 @@ func NewConfig() *Config { fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas") fs.IntVar(&cfg.SyncerCfg.WorkerCount, "c", 16, "parallel worker count") fs.StringVar(&cfg.SyncerCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql or tidb or file or kafka; see syncer section in conf/drainer.toml") + fs.StringVar(&cfg.SyncerCfg.Relay.LogDir, "relay-log-dir", "", "path to relay log of syncer") + fs.Int64Var(&cfg.SyncerCfg.Relay.MaxFileSize, "relay-max-file-size", 10485760, "max file size of each relay log") + fs.BoolVar(&cfg.SyncerCfg.EnableDispatch, "enable-dispatch", true, "enable dispatching sqls that in one same binlog; if set true, work-count and txn-batch would be useless") fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant") fs.BoolVar(&cfg.SyncerCfg.EnableCausality, "enable-detect", false, "enable detect causality") diff --git a/drainer/relay.go b/drainer/relay.go index ac976cfde..20b3461eb 100644 --- a/drainer/relay.go +++ b/drainer/relay.go @@ -12,7 +12,7 @@ import ( ) func feedByRelayLogIfNeed(cfg *Config) error { - if !cfg.SyncerCfg.Relay.SwitchOn() { + if !cfg.SyncerCfg.Relay.IsEnabled() { return nil } @@ -78,7 +78,8 @@ func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint) var loaderInputC chan<- *loader.Txn successTxnC := ld.Successes() - readerTxnsC = r.Txns() + readerTxnsC = r.Binlogs() + readerTxnsCClosed := false loaderClosed := false @@ -100,23 +101,28 @@ func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint) if !ok { log.Info("readerTxnsC closed") readerTxnsC = nil + readerTxnsCClosed = true continue } + if sbinlog.CommitTs <= checkpointTS { + continue + } + txn, err := loader.SlaveBinlogToTxn(sbinlog) if err != nil { return errors.Trace(err) } - if sbinlog.CommitTs <= checkpointTS { - continue - } - + readerTxnsC = nil txn.Metadata = sbinlog.CommitTs toPushLoaderTxn = txn loaderInputC = ld.Input() case loaderInputC <- toPushLoaderTxn: loaderInputC = nil toPushLoaderTxn = nil + if !readerTxnsCClosed { + readerTxnsC = r.Binlogs() + } case success, ok := <-successTxnC: if !ok { successTxnC = nil diff --git a/drainer/relay/reader.go b/drainer/relay/reader.go index 1f8f76461..64752d913 100644 --- a/drainer/relay/reader.go +++ b/drainer/relay/reader.go @@ -30,8 +30,8 @@ type Reader interface { // Run reads relay log. Run() context.CancelFunc - // Txns returns parsed transactions. - Txns() <-chan *obinlog.Binlog + // Binlogs returns the channel for reading parsed binlogs. + Binlogs() <-chan *obinlog.Binlog // Close releases resources. Close() error @@ -42,7 +42,7 @@ type Reader interface { type reader struct { binlogger binlogfile.Binlogger - txns chan *obinlog.Binlog + binlogs chan *obinlog.Binlog err chan error } @@ -55,7 +55,7 @@ func NewReader(dir string, readBufferSize int) (Reader, error) { return &reader{ binlogger: binlogger, - txns: make(chan *obinlog.Binlog, readBufferSize), + binlogs: make(chan *obinlog.Binlog, readBufferSize), }, nil } @@ -88,12 +88,12 @@ func (r *reader) Run() context.CancelFunc { case <-ctx.Done(): err = ctx.Err() log.Warn("Producing transaction is interrupted") - case r.txns <- slaveBinlog: + case r.binlogs <- slaveBinlog: } } // If binlogger is not done, notify it to stop. cancel() - close(r.txns) + close(r.binlogs) if err == nil { err = <-binlogErr @@ -108,8 +108,8 @@ func (r *reader) Run() context.CancelFunc { } // Txns implements Reader interface. -func (r *reader) Txns() <-chan *obinlog.Binlog { - return r.txns +func (r *reader) Binlogs() <-chan *obinlog.Binlog { + return r.binlogs } // Error implements Reader interface. diff --git a/drainer/relay/reader_test.go b/drainer/relay/reader_test.go index 55c276cdf..a3a7f6e47 100644 --- a/drainer/relay/reader_test.go +++ b/drainer/relay/reader_test.go @@ -88,7 +88,7 @@ func (r *testReaderSuite) readBinlogAndCheck(c *C, dir string, expectedNumber in var lastTxn *loader.Txn number := 0 - for txn := range relayReader.Txns() { + for txn := range relayReader.Binlogs() { number++ loaderTxn, err := loader.SlaveBinlogToTxn(txn) c.Assert(err, IsNil) diff --git a/drainer/syncer.go b/drainer/syncer.go index 3ec5915aa..0774c7863 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -108,7 +108,7 @@ func createDSyncer(cfg *SyncerConfig, schema *Schema, info *loopbacksync.LoopBac } case "mysql", "tidb": var relayer relay.Relayer - if cfg.Relay.SwitchOn() { + if cfg.Relay.IsEnabled() { if relayer, err = relay.NewRelayer(cfg.Relay.LogDir, cfg.Relay.MaxFileSize, schema); err != nil { return nil, errors.Annotate(err, "fail to create relayer") } From d34142569b67bb097232bd91c791dce05c665f38 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Wed, 5 Feb 2020 16:37:12 +0800 Subject: [PATCH 11/13] Use std error --- drainer/checkpoint/util.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/drainer/checkpoint/util.go b/drainer/checkpoint/util.go index 62da56655..45867424c 100644 --- a/drainer/checkpoint/util.go +++ b/drainer/checkpoint/util.go @@ -15,6 +15,7 @@ package checkpoint import ( "database/sql" + stderrors "errors" "fmt" // mysql driver @@ -24,7 +25,7 @@ import ( // ErrNoCheckpointItem represents there's no any checkpoint item and the cluster id must be specified // for the mysql checkpoint type. -var ErrNoCheckpointItem = errors.New("no any checkpoint item") +var ErrNoCheckpointItem = stderrors.New("no any checkpoint item") // DBConfig is the DB configuration. type DBConfig struct { From bb81853698b4d86fb8d81481b49b48d31e674c68 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Wed, 5 Feb 2020 23:14:04 +0800 Subject: [PATCH 12/13] Make CreateLoader accept sql.DB --- drainer/relay.go | 10 ++++++--- drainer/sync/mysql.go | 51 +++++++++++++++++++++++-------------------- 2 files changed, 34 insertions(+), 27 deletions(-) diff --git a/drainer/relay.go b/drainer/relay.go index 20b3461eb..d42769b6b 100644 --- a/drainer/relay.go +++ b/drainer/relay.go @@ -43,14 +43,18 @@ func feedByRelayLogIfNeed(cfg *Config) error { return errors.Annotate(err, "failed to create reader") } - db, ld, err := sync.CreateLoader(scfg.To, scfg.WorkerCount, scfg.TxnBatch, + db, err := loader.CreateDBWithSQLMode(scfg.To.User, scfg.To.Password, scfg.To.Host, scfg.To.Port, scfg.StrSQLMode) + if err != nil { + return errors.Annotate(err, "failed to create SQL db") + } + defer db.Close() + + ld, err := sync.CreateLoader(db, scfg.To, scfg.WorkerCount, scfg.TxnBatch, queryHistogramVec, scfg.StrSQLMode, scfg.DestDBType, nil /*loopbacksync.LoopBackSync*/) if err != nil { return errors.Annotate(err, "failed to create loader") } - defer db.Close() - err = feedByRelayLog(reader, ld, cp) if err != nil { return errors.Trace(err) diff --git a/drainer/sync/mysql.go b/drainer/sync/mysql.go index 079ba2bbc..6dd03f008 100644 --- a/drainer/sync/mysql.go +++ b/drainer/sync/mysql.go @@ -43,6 +43,7 @@ var createDB = loader.CreateDBWithSQLMode // CreateLoader create the Loader instance. func CreateLoader( + db *sql.DB, cfg *DBConfig, worker int, batchSize int, @@ -50,11 +51,7 @@ func CreateLoader( sqlMode *string, destDBType string, info *loopbacksync.LoopBackSync, -) (db *sql.DB, ld loader.Loader, err error) { - db, err = createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, sqlMode) - if err != nil { - return nil, nil, errors.Trace(err) - } +) (ld loader.Loader, err error) { var opts []loader.Option opts = append(opts, loader.WorkerCount(worker), loader.BatchSize(batchSize), loader.SaveAppliedTS(destDBType == "tidb"), loader.SetloopBackSyncInfo(info)) @@ -68,28 +65,11 @@ func CreateLoader( if cfg.SyncMode != 0 { mode := loader.SyncMode(cfg.SyncMode) opts = append(opts, loader.SyncModeOption(mode)) - - if mode == loader.SyncPartialColumn { - var oldMode, newMode string - oldMode, newMode, err = relaxSQLMode(db) - if err != nil { - return nil, nil, errors.Trace(err) - } - - if newMode != oldMode { - db.Close() - db, err = createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, &newMode) - if err != nil { - return nil, nil, errors.Trace(err) - } - } - } } ld, err = loader.NewLoader(db, opts...) if err != nil { - db.Close() - return nil, nil, errors.Trace(err) + return nil, errors.Trace(err) } return @@ -107,7 +87,30 @@ func NewMysqlSyncer( relayer relay.Relayer, info *loopbacksync.LoopBackSync, ) (*MysqlSyncer, error) { - db, loader, err := CreateLoader(cfg, worker, batchSize, queryHistogramVec, sqlMode, destDBType, info) + db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, sqlMode) + if err != nil { + return nil, errors.Trace(err) + } + + syncMode := loader.SyncMode(cfg.SyncMode) + if syncMode == loader.SyncPartialColumn { + var oldMode, newMode string + oldMode, newMode, err = relaxSQLMode(db) + if err != nil { + db.Close() + return nil, errors.Trace(err) + } + + if newMode != oldMode { + db.Close() + db, err = createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, &newMode) + if err != nil { + return nil, errors.Trace(err) + } + } + } + + loader, err := CreateLoader(db, cfg, worker, batchSize, queryHistogramVec, sqlMode, destDBType, info) if err != nil { return nil, errors.Trace(err) } From 002d4493a4f5cfcb06bd41acc762788271b5c2d9 Mon Sep 17 00:00:00 2001 From: Jiahao Huang Date: Wed, 5 Feb 2020 23:54:56 +0800 Subject: [PATCH 13/13] Rename StatusNormal to StatusConsitent --- drainer/checkpoint/checkpoint.go | 4 ++-- drainer/relay.go | 4 ++-- drainer/sync/syncer.go | 1 + drainer/syncer.go | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/drainer/checkpoint/checkpoint.go b/drainer/checkpoint/checkpoint.go index 067f7c72b..ea4e822a0 100644 --- a/drainer/checkpoint/checkpoint.go +++ b/drainer/checkpoint/checkpoint.go @@ -20,8 +20,8 @@ import ( ) const ( - // StatusNormal means server quit normally, data <= ts is synced to downstream - StatusNormal int = 0 + // StatusConsistent means server quit normally, data <= ts is synced to downstream + StatusConsistent int = 0 // StatusRunning means server running or quit abnormally, part of data may or may not been synced to downstream StatusRunning int = 1 diff --git a/drainer/relay.go b/drainer/relay.go index d42769b6b..099c2f91d 100644 --- a/drainer/relay.go +++ b/drainer/relay.go @@ -34,7 +34,7 @@ func feedByRelayLogIfNeed(cfg *Config) error { defer cp.Close() - if cp.Status() == checkpoint.StatusNormal { + if cp.Status() == checkpoint.StatusConsistent { return nil } @@ -149,7 +149,7 @@ func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint) return errors.Trace(readerErr) } - err := cp.Save(lastSuccessTS, 0 /* slaveTS */, checkpoint.StatusNormal) + err := cp.Save(lastSuccessTS, 0 /* slaveTS */, checkpoint.StatusConsistent) if err != nil { return errors.Trace(err) } diff --git a/drainer/sync/syncer.go b/drainer/sync/syncer.go index e1e9b4fc2..41f5535f3 100644 --- a/drainer/sync/syncer.go +++ b/drainer/sync/syncer.go @@ -39,6 +39,7 @@ type Syncer interface { // Return not nil if fail to sync data to downstream or nil if closed normally Error() <-chan error // Close the Syncer, no more item can be added by `Sync` + // will drain all items and return nil if all successfully sync into downstream Close() error } diff --git a/drainer/syncer.go b/drainer/syncer.go index 0774c7863..19c078374 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -461,7 +461,7 @@ ForLoop: return cerr } - return s.cp.Save(s.cp.TS(), 0, checkpoint.StatusNormal) + return s.cp.Save(s.cp.TS(), 0, checkpoint.StatusConsistent) } func findLoopBackMark(dmls []*loader.DML, info *loopbacksync.LoopBackSync) (bool, error) {