Skip to content

Commit

Permalink
feed downstream by relay log at startup if need (pingcap#883)
Browse files Browse the repository at this point in the history
  • Loading branch information
july2993 committed Feb 10, 2020
1 parent 1b37c46 commit 0fe1f4f
Show file tree
Hide file tree
Showing 23 changed files with 560 additions and 91 deletions.
7 changes: 3 additions & 4 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,12 @@ ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql"
#
#replicate-do-db = ["~^b.*","s1"]

[syncer.relay]
# directory of relay logs. Empty string indicates disabling relay log.
# relay log works only if the downstream is TiDB/MySQL.
relay-log-dir = ""
# log-dir = ""
# max file size of each relay log
relay-log-size = 10485760
# read buffer size of relay log
relay-read-buf-size = 8
# max-file-size = 10485760

#[[syncer.replicate-do-table]]
#db-name ="test"
Expand Down
17 changes: 14 additions & 3 deletions drainer/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ import (
"go.uber.org/zap"
)

const (
// StatusConsistent means server quit normally, data <= ts is synced to downstream
StatusConsistent int = 0

// StatusRunning means server running or quit abnormally, part of data may or may not been synced to downstream
StatusRunning int = 1
)

var (
// ErrCheckPointClosed indicates the CheckPoint already closed.
ErrCheckPointClosed = errors.New("CheckPoint already closed")
Expand All @@ -31,11 +39,14 @@ type CheckPoint interface {
Load() error

// Save saves checkpoint information.
Save(int64, int64) error
Save(commitTS int64, slaveTS int64, status int) error

// Pos gets position information.
// TS gets checkpoint commit timestamp.
TS() int64

// Status return the status saved.
Status() int

// Close closes the CheckPoint and release resources, after closed other methods should not be called again.
Close() error
}
Expand All @@ -50,7 +61,7 @@ func NewCheckPoint(cfg *Config) (CheckPoint, error) {
case "mysql", "tidb":
cp, err = newMysql(cfg)
case "file":
cp, err = NewFile(cfg)
cp, err = NewFile(cfg.InitialCommitTS, cfg.CheckPointFile)
case "flash":
cp, err = newFlash(cfg)
default:
Expand Down
20 changes: 15 additions & 5 deletions drainer/checkpoint/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ type FileCheckPoint struct {

name string

CommitTS int64 `toml:"commitTS" json:"commitTS"`
StatusSaved int `toml:"status" json:"status"`
CommitTS int64 `toml:"commitTS" json:"commitTS"`
}

// NewFile creates a new FileCheckpoint.
func NewFile(cfg *Config) (CheckPoint, error) {
func NewFile(initialCommitTS int64, filePath string) (CheckPoint, error) {
pb := &FileCheckPoint{
initialCommitTS: cfg.InitialCommitTS,
name: cfg.CheckPointFile,
initialCommitTS: initialCommitTS,
name: filePath,
}
err := pb.Load()
if err != nil {
Expand Down Expand Up @@ -81,7 +82,7 @@ func (sp *FileCheckPoint) Load() error {
}

// Save implements CheckPoint.Save interface
func (sp *FileCheckPoint) Save(ts, slaveTS int64) error {
func (sp *FileCheckPoint) Save(ts, slaveTS int64, status int) error {
sp.Lock()
defer sp.Unlock()

Expand All @@ -90,6 +91,7 @@ func (sp *FileCheckPoint) Save(ts, slaveTS int64) error {
}

sp.CommitTS = ts
sp.StatusSaved = status

var buf bytes.Buffer
e := toml.NewEncoder(&buf)
Expand All @@ -114,6 +116,14 @@ func (sp *FileCheckPoint) TS() int64 {
return sp.CommitTS
}

// Status implements CheckPoint.Status interface
func (sp *FileCheckPoint) Status() int {
sp.RLock()
defer sp.RUnlock()

return sp.StatusSaved
}

// Close implements CheckPoint.Close interface
func (sp *FileCheckPoint) Close() error {
sp.Lock()
Expand Down
19 changes: 9 additions & 10 deletions drainer/checkpoint/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ import (
func (t *testCheckPointSuite) TestFile(c *C) {
fileName := "/tmp/test"
notExistFileName := "test_not_exist"
cfg := new(Config)
cfg.CheckPointFile = fileName
meta, err := NewFile(cfg)
meta, err := NewFile(0, fileName)
c.Assert(err, IsNil)
defer os.RemoveAll(fileName)

// zero (initial) CommitTs
c.Assert(meta.TS(), Equals, int64(0))
c.Assert(meta.Status(), Equals, StatusConsistent)

testTs := int64(1)
// save ts
err = meta.Save(testTs, 0)
err = meta.Save(testTs, 0, StatusRunning)
c.Assert(err, IsNil)
// check ts
ts := meta.TS()
c.Assert(ts, Equals, testTs)
c.Assert(meta.Status(), Equals, StatusRunning)

// check load ts
err = meta.Load()
Expand All @@ -47,23 +47,22 @@ func (t *testCheckPointSuite) TestFile(c *C) {
c.Assert(ts, Equals, testTs)

// check not exist meta file
cfg.CheckPointFile = notExistFileName
meta, err = NewFile(cfg)
meta, err = NewFile(0, notExistFileName)
c.Assert(err, IsNil)
err = meta.Load()
c.Assert(err, IsNil)
c.Assert(meta.TS(), Equals, int64(0))

// check not exist meta file, but with initialCommitTs
cfg.InitialCommitTS = 123
meta, err = NewFile(cfg)
var initialCommitTS int64 = 123
meta, err = NewFile(initialCommitTS, notExistFileName)
c.Assert(err, IsNil)
c.Assert(meta.TS(), Equals, cfg.InitialCommitTS)
c.Assert(meta.TS(), Equals, initialCommitTS)

// close the checkpoint
err = meta.Close()
c.Assert(err, IsNil)
c.Assert(errors.Cause(meta.Load()), Equals, ErrCheckPointClosed)
c.Assert(errors.Cause(meta.Save(0, 0)), Equals, ErrCheckPointClosed)
c.Assert(errors.Cause(meta.Save(0, 0, StatusConsistent)), Equals, ErrCheckPointClosed)
c.Assert(errors.Cause(meta.Close()), Equals, ErrCheckPointClosed)
}
14 changes: 12 additions & 2 deletions drainer/checkpoint/flash.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type FlashCheckPoint struct {
schema string
table string

CommitTS int64 `toml:"commitTS" json:"commitTS"`
StatusSaved int `toml:"status" json:"status"`
CommitTS int64 `toml:"commitTS" json:"commitTS"`
}

func checkFlashConfig(cfg *Config) {
Expand Down Expand Up @@ -142,7 +143,7 @@ func (sp *FlashCheckPoint) Load() error {
}

// Save implements checkpoint.Save interface
func (sp *FlashCheckPoint) Save(ts, slaveTS int64) error {
func (sp *FlashCheckPoint) Save(ts, slaveTS int64, status int) error {
sp.Lock()
defer sp.Unlock()

Expand All @@ -151,6 +152,7 @@ func (sp *FlashCheckPoint) Save(ts, slaveTS int64) error {
}

sp.CommitTS = ts
sp.StatusSaved = status

b, err := json.Marshal(sp)
if err != nil {
Expand All @@ -173,6 +175,14 @@ func (sp *FlashCheckPoint) TS() int64 {
return sp.CommitTS
}

// Status implements CheckPoint.Status interface
func (sp *FlashCheckPoint) Status() int {
sp.RLock()
defer sp.RUnlock()

return sp.StatusSaved
}

// Close implements CheckPoint.Close interface.
func (sp *FlashCheckPoint) Close() error {
sp.Lock()
Expand Down
2 changes: 1 addition & 1 deletion drainer/checkpoint/flash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *flashSuite) TestSave(c *C) {
mock.ExpectBegin()
mock.ExpectExec("IMPORT INTO.*").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectCommit()
err = cp.Save(1024, 0)
err = cp.Save(1024, 0, 0)
c.Assert(err, IsNil)
}

Expand Down
31 changes: 28 additions & 3 deletions drainer/checkpoint/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"

// mysql driver
_ "github.com/go-sql-driver/mysql"
pkgsql "github.com/pingcap/tidb-binlog/pkg/sql"
Expand All @@ -35,10 +38,13 @@ type MysqlCheckPoint struct {
schema string
table string

CommitTS int64 `toml:"commitTS" json:"commitTS"`
TsMap map[string]int64 `toml:"ts-map" json:"ts-map"`
StatusSaved int `toml:"status" json:"status"`
CommitTS int64 `toml:"commitTS" json:"commitTS"`
TsMap map[string]int64 `toml:"ts-map" json:"ts-map"`
}

var _ CheckPoint = &MysqlCheckPoint{}

var sqlOpenDB = pkgsql.OpenDB

func newMysql(cfg *Config) (CheckPoint, error) {
Expand Down Expand Up @@ -68,6 +74,16 @@ func newMysql(cfg *Config) (CheckPoint, error) {
return nil, errors.Annotatef(err, "exec failed, sql: %s", sql)
}

if sp.clusterID == 0 {
id, err := getClusterID(db, sp.schema, sp.table)
if err != nil {
return nil, errors.Trace(err)
}

log.Info("set cluster id", zap.Uint64("id", id))
sp.clusterID = id
}

err = sp.Load()
return sp, errors.Trace(err)
}
Expand Down Expand Up @@ -106,7 +122,7 @@ func (sp *MysqlCheckPoint) Load() error {
}

// Save implements checkpoint.Save interface
func (sp *MysqlCheckPoint) Save(ts, slaveTS int64) error {
func (sp *MysqlCheckPoint) Save(ts, slaveTS int64, status int) error {
sp.Lock()
defer sp.Unlock()

Expand All @@ -115,6 +131,7 @@ func (sp *MysqlCheckPoint) Save(ts, slaveTS int64) error {
}

sp.CommitTS = ts
sp.StatusSaved = status

if slaveTS > 0 {
sp.TsMap["master-ts"] = ts
Expand All @@ -135,6 +152,14 @@ func (sp *MysqlCheckPoint) Save(ts, slaveTS int64) error {
return nil
}

// Status implements CheckPoint.Status interface
func (sp *MysqlCheckPoint) Status() int {
sp.RLock()
defer sp.RUnlock()

return sp.StatusSaved
}

// TS implements CheckPoint.TS interface
func (sp *MysqlCheckPoint) TS() int64 {
sp.RLock()
Expand Down
7 changes: 4 additions & 3 deletions drainer/checkpoint/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *saveSuite) TestShouldSaveCheckpoint(c *C) {
c.Assert(err, IsNil)
mock.ExpectExec("replace into db.tbl.*").WillReturnResult(sqlmock.NewResult(0, 0))
cp := MysqlCheckPoint{db: db, schema: "db", table: "tbl"}
err = cp.Save(1111, 0)
err = cp.Save(1111, 0, StatusRunning)
c.Assert(err, IsNil)
}

Expand All @@ -63,7 +63,7 @@ func (s *saveSuite) TestShouldUpdateTsMap(c *C) {
table: "tbl",
TsMap: make(map[string]int64),
}
err = cp.Save(65536, 3333)
err = cp.Save(65536, 3333, StatusRunning)
c.Assert(err, IsNil)
c.Assert(cp.TsMap["master-ts"], Equals, int64(65536))
c.Assert(cp.TsMap["slave-ts"], Equals, int64(3333))
Expand All @@ -83,12 +83,13 @@ func (s *loadSuite) TestShouldLoadFromDB(c *C) {
TsMap: make(map[string]int64),
}
rows := sqlmock.NewRows([]string{"checkPoint"}).
AddRow(`{"commitTS": 1024, "ts-map": {"master-ts": 2000, "slave-ts": 1999}}`)
AddRow(`{"commitTS": 1024, "status": 1, "ts-map": {"master-ts": 2000, "slave-ts": 1999}}`)
mock.ExpectQuery("select checkPoint from db.tbl.*").WillReturnRows(rows)

err = cp.Load()
c.Assert(err, IsNil)
c.Assert(cp.CommitTS, Equals, int64(1024))
c.Assert(cp.StatusSaved, Equals, 1)
c.Assert(cp.TsMap["master-ts"], Equals, int64(2000))
c.Assert(cp.TsMap["slave-ts"], Equals, int64(1999))
}
Expand Down
37 changes: 37 additions & 0 deletions drainer/checkpoint/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@
package checkpoint

import (
"database/sql"
stderrors "errors"
"fmt"

// mysql driver
_ "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
)

// ErrNoCheckpointItem represents there's no any checkpoint item and the cluster id must be specified
// for the mysql checkpoint type.
var ErrNoCheckpointItem = stderrors.New("no any checkpoint item")

// DBConfig is the DB configuration.
type DBConfig struct {
Host string `toml:"host" json:"host"`
Expand Down Expand Up @@ -74,6 +81,36 @@ func genReplaceSQL(sp *MysqlCheckPoint, str string) string {
return fmt.Sprintf("replace into %s.%s values(%d, '%s')", sp.schema, sp.table, sp.clusterID, str)
}

// getClusterID return the cluster id iff the checkpoint table exist only one row.
func getClusterID(db *sql.DB, schema string, table string) (id uint64, err error) {
sqlQuery := fmt.Sprintf("select clusterID from %s.%s", schema, table)
rows, err := db.Query(sqlQuery)
if err != nil {
return 0, errors.Trace(err)
}

for rows.Next() {
if id > 0 {
return 0, errors.New("there are multi row in checkpoint table")
}

err = rows.Scan(&id)
if err != nil {
return 0, errors.Trace(err)
}
}

if rows.Err() != nil {
return 0, errors.Trace(rows.Err())
}

if id == 0 {
return 0, ErrNoCheckpointItem
}

return
}

func genSelectSQL(sp *MysqlCheckPoint) string {
return fmt.Sprintf("select checkPoint from %s.%s where clusterID = %d", sp.schema, sp.table, sp.clusterID)
}
Loading

0 comments on commit 0fe1f4f

Please sign in to comment.