Skip to content

Commit

Permalink
feed downstream by relay log at startup if need (#883)
Browse files Browse the repository at this point in the history
  • Loading branch information
july2993 authored Feb 7, 2020
1 parent ff59e72 commit db9bb50
Show file tree
Hide file tree
Showing 20 changed files with 546 additions and 87 deletions.
7 changes: 3 additions & 4 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,12 @@ ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql"
#
#replicate-do-db = ["~^b.*","s1"]

[syncer.relay]
# directory of relay logs. Empty string indicates disabling relay log.
# relay log works only if the downstream is TiDB/MySQL.
relay-log-dir = ""
# log-dir = ""
# max file size of each relay log
relay-log-size = 10485760
# read buffer size of relay log
relay-read-buf-size = 8
# max-file-size = 10485760

#[[syncer.replicate-do-table]]
#db-name ="test"
Expand Down
17 changes: 14 additions & 3 deletions drainer/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ import (
"go.uber.org/zap"
)

const (
// StatusConsistent means server quit normally, data <= ts is synced to downstream
StatusConsistent int = 0

// StatusRunning means server running or quit abnormally, part of data may or may not been synced to downstream
StatusRunning int = 1
)

var (
// ErrCheckPointClosed indicates the CheckPoint already closed.
ErrCheckPointClosed = errors.New("CheckPoint already closed")
Expand All @@ -31,11 +39,14 @@ type CheckPoint interface {
Load() error

// Save saves checkpoint information.
Save(int64, int64) error
Save(commitTS int64, slaveTS int64, status int) error

// Pos gets position information.
// TS gets checkpoint commit timestamp.
TS() int64

// Status return the status saved.
Status() int

// Close closes the CheckPoint and release resources, after closed other methods should not be called again.
Close() error
}
Expand All @@ -50,7 +61,7 @@ func NewCheckPoint(cfg *Config) (CheckPoint, error) {
case "mysql", "tidb":
cp, err = newMysql(cfg)
case "file":
cp, err = NewFile(cfg)
cp, err = NewFile(cfg.InitialCommitTS, cfg.CheckPointFile)
default:
err = errors.Errorf("unsupported checkpoint type %s", cfg.CheckpointType)
}
Expand Down
20 changes: 15 additions & 5 deletions drainer/checkpoint/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ type FileCheckPoint struct {

name string

CommitTS int64 `toml:"commitTS" json:"commitTS"`
StatusSaved int `toml:"status" json:"status"`
CommitTS int64 `toml:"commitTS" json:"commitTS"`
}

// NewFile creates a new FileCheckpoint.
func NewFile(cfg *Config) (CheckPoint, error) {
func NewFile(initialCommitTS int64, filePath string) (CheckPoint, error) {
pb := &FileCheckPoint{
initialCommitTS: cfg.InitialCommitTS,
name: cfg.CheckPointFile,
initialCommitTS: initialCommitTS,
name: filePath,
}
err := pb.Load()
if err != nil {
Expand Down Expand Up @@ -81,7 +82,7 @@ func (sp *FileCheckPoint) Load() error {
}

// Save implements CheckPoint.Save interface
func (sp *FileCheckPoint) Save(ts, slaveTS int64) error {
func (sp *FileCheckPoint) Save(ts, slaveTS int64, status int) error {
sp.Lock()
defer sp.Unlock()

Expand All @@ -90,6 +91,7 @@ func (sp *FileCheckPoint) Save(ts, slaveTS int64) error {
}

sp.CommitTS = ts
sp.StatusSaved = status

var buf bytes.Buffer
e := toml.NewEncoder(&buf)
Expand All @@ -114,6 +116,14 @@ func (sp *FileCheckPoint) TS() int64 {
return sp.CommitTS
}

// Status implements CheckPoint.Status interface
func (sp *FileCheckPoint) Status() int {
sp.RLock()
defer sp.RUnlock()

return sp.StatusSaved
}

// Close implements CheckPoint.Close interface
func (sp *FileCheckPoint) Close() error {
sp.Lock()
Expand Down
19 changes: 9 additions & 10 deletions drainer/checkpoint/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ import (
func (t *testCheckPointSuite) TestFile(c *C) {
fileName := "/tmp/test"
notExistFileName := "test_not_exist"
cfg := new(Config)
cfg.CheckPointFile = fileName
meta, err := NewFile(cfg)
meta, err := NewFile(0, fileName)
c.Assert(err, IsNil)
defer os.RemoveAll(fileName)

// zero (initial) CommitTs
c.Assert(meta.TS(), Equals, int64(0))
c.Assert(meta.Status(), Equals, StatusNormal)

testTs := int64(1)
// save ts
err = meta.Save(testTs, 0)
err = meta.Save(testTs, 0, StatusRunning)
c.Assert(err, IsNil)
// check ts
ts := meta.TS()
c.Assert(ts, Equals, testTs)
c.Assert(meta.Status(), Equals, StatusRunning)

// check load ts
err = meta.Load()
Expand All @@ -47,23 +47,22 @@ func (t *testCheckPointSuite) TestFile(c *C) {
c.Assert(ts, Equals, testTs)

// check not exist meta file
cfg.CheckPointFile = notExistFileName
meta, err = NewFile(cfg)
meta, err = NewFile(0, notExistFileName)
c.Assert(err, IsNil)
err = meta.Load()
c.Assert(err, IsNil)
c.Assert(meta.TS(), Equals, int64(0))

// check not exist meta file, but with initialCommitTs
cfg.InitialCommitTS = 123
meta, err = NewFile(cfg)
var initialCommitTS int64 = 123
meta, err = NewFile(initialCommitTS, notExistFileName)
c.Assert(err, IsNil)
c.Assert(meta.TS(), Equals, cfg.InitialCommitTS)
c.Assert(meta.TS(), Equals, initialCommitTS)

// close the checkpoint
err = meta.Close()
c.Assert(err, IsNil)
c.Assert(errors.Cause(meta.Load()), Equals, ErrCheckPointClosed)
c.Assert(errors.Cause(meta.Save(0, 0)), Equals, ErrCheckPointClosed)
c.Assert(errors.Cause(meta.Save(0, 0, StatusNormal)), Equals, ErrCheckPointClosed)
c.Assert(errors.Cause(meta.Close()), Equals, ErrCheckPointClosed)
}
31 changes: 28 additions & 3 deletions drainer/checkpoint/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"

// mysql driver
_ "github.com/go-sql-driver/mysql"
pkgsql "github.com/pingcap/tidb-binlog/pkg/sql"
Expand All @@ -35,10 +38,13 @@ type MysqlCheckPoint struct {
schema string
table string

CommitTS int64 `toml:"commitTS" json:"commitTS"`
TsMap map[string]int64 `toml:"ts-map" json:"ts-map"`
StatusSaved int `toml:"status" json:"status"`
CommitTS int64 `toml:"commitTS" json:"commitTS"`
TsMap map[string]int64 `toml:"ts-map" json:"ts-map"`
}

var _ CheckPoint = &MysqlCheckPoint{}

var sqlOpenDB = pkgsql.OpenDB

func newMysql(cfg *Config) (CheckPoint, error) {
Expand Down Expand Up @@ -68,6 +74,16 @@ func newMysql(cfg *Config) (CheckPoint, error) {
return nil, errors.Annotatef(err, "exec failed, sql: %s", sql)
}

if sp.clusterID == 0 {
id, err := getClusterID(db, sp.schema, sp.table)
if err != nil {
return nil, errors.Trace(err)
}

log.Info("set cluster id", zap.Uint64("id", id))
sp.clusterID = id
}

err = sp.Load()
return sp, errors.Trace(err)
}
Expand Down Expand Up @@ -106,7 +122,7 @@ func (sp *MysqlCheckPoint) Load() error {
}

// Save implements checkpoint.Save interface
func (sp *MysqlCheckPoint) Save(ts, slaveTS int64) error {
func (sp *MysqlCheckPoint) Save(ts, slaveTS int64, status int) error {
sp.Lock()
defer sp.Unlock()

Expand All @@ -115,6 +131,7 @@ func (sp *MysqlCheckPoint) Save(ts, slaveTS int64) error {
}

sp.CommitTS = ts
sp.StatusSaved = status

if slaveTS > 0 {
sp.TsMap["master-ts"] = ts
Expand All @@ -135,6 +152,14 @@ func (sp *MysqlCheckPoint) Save(ts, slaveTS int64) error {
return nil
}

// Status implements CheckPoint.Status interface
func (sp *MysqlCheckPoint) Status() int {
sp.RLock()
defer sp.RUnlock()

return sp.StatusSaved
}

// TS implements CheckPoint.TS interface
func (sp *MysqlCheckPoint) TS() int64 {
sp.RLock()
Expand Down
7 changes: 4 additions & 3 deletions drainer/checkpoint/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *saveSuite) TestShouldSaveCheckpoint(c *C) {
c.Assert(err, IsNil)
mock.ExpectExec("replace into db.tbl.*").WillReturnResult(sqlmock.NewResult(0, 0))
cp := MysqlCheckPoint{db: db, schema: "db", table: "tbl"}
err = cp.Save(1111, 0)
err = cp.Save(1111, 0, StatusRunning)
c.Assert(err, IsNil)
}

Expand All @@ -63,7 +63,7 @@ func (s *saveSuite) TestShouldUpdateTsMap(c *C) {
table: "tbl",
TsMap: make(map[string]int64),
}
err = cp.Save(65536, 3333)
err = cp.Save(65536, 3333, StatusRunning)
c.Assert(err, IsNil)
c.Assert(cp.TsMap["master-ts"], Equals, int64(65536))
c.Assert(cp.TsMap["slave-ts"], Equals, int64(3333))
Expand All @@ -83,12 +83,13 @@ func (s *loadSuite) TestShouldLoadFromDB(c *C) {
TsMap: make(map[string]int64),
}
rows := sqlmock.NewRows([]string{"checkPoint"}).
AddRow(`{"commitTS": 1024, "ts-map": {"master-ts": 2000, "slave-ts": 1999}}`)
AddRow(`{"commitTS": 1024, "status": 1, "ts-map": {"master-ts": 2000, "slave-ts": 1999}}`)
mock.ExpectQuery("select checkPoint from db.tbl.*").WillReturnRows(rows)

err = cp.Load()
c.Assert(err, IsNil)
c.Assert(cp.CommitTS, Equals, int64(1024))
c.Assert(cp.StatusSaved, Equals, 1)
c.Assert(cp.TsMap["master-ts"], Equals, int64(2000))
c.Assert(cp.TsMap["slave-ts"], Equals, int64(1999))
}
Expand Down
37 changes: 37 additions & 0 deletions drainer/checkpoint/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@
package checkpoint

import (
"database/sql"
stderrors "errors"
"fmt"

// mysql driver
_ "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
)

// ErrNoCheckpointItem represents there's no any checkpoint item and the cluster id must be specified
// for the mysql checkpoint type.
var ErrNoCheckpointItem = stderrors.New("no any checkpoint item")

// DBConfig is the DB configuration.
type DBConfig struct {
Host string `toml:"host" json:"host"`
Expand Down Expand Up @@ -74,6 +81,36 @@ func genReplaceSQL(sp *MysqlCheckPoint, str string) string {
return fmt.Sprintf("replace into %s.%s values(%d, '%s')", sp.schema, sp.table, sp.clusterID, str)
}

// getClusterID return the cluster id iff the checkpoint table exist only one row.
func getClusterID(db *sql.DB, schema string, table string) (id uint64, err error) {
sqlQuery := fmt.Sprintf("select clusterID from %s.%s", schema, table)
rows, err := db.Query(sqlQuery)
if err != nil {
return 0, errors.Trace(err)
}

for rows.Next() {
if id > 0 {
return 0, errors.New("there are multi row in checkpoint table")
}

err = rows.Scan(&id)
if err != nil {
return 0, errors.Trace(err)
}
}

if rows.Err() != nil {
return 0, errors.Trace(rows.Err())
}

if id == 0 {
return 0, ErrNoCheckpointItem
}

return
}

func genSelectSQL(sp *MysqlCheckPoint) string {
return fmt.Sprintf("select checkPoint from %s.%s where clusterID = %d", sp.schema, sp.table, sp.clusterID)
}
63 changes: 63 additions & 0 deletions drainer/checkpoint/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package checkpoint

import (
"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/pingcap/errors"
)

var _ = Suite(&testUtil{})

type testUtil struct{}

func (t *testUtil) TestG(c *C) {
tests := []struct {
name string
rows []uint64
id uint64
err bool
checkSpecifiedErr error
}{
{"no row", nil, 0, true, ErrNoCheckpointItem},
{"on row", []uint64{1}, 1, false, nil},
{"multi row", []uint64{1, 2}, 0, true, nil},
}

for _, test := range tests {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)

rows := sqlmock.NewRows([]string{"clusterID"})
for _, row := range test.rows {
rows.AddRow(row)
}

mock.ExpectQuery("select clusterID from .*").WillReturnRows(rows)

c.Log("test: ", test.name)
id, err := getClusterID(db, "schema", "table")
if test.err {
c.Assert(err, NotNil)
c.Assert(id, Equals, test.id)
if test.checkSpecifiedErr != nil {
c.Assert(errors.Cause(err), Equals, test.checkSpecifiedErr)
}
} else {
c.Assert(err, IsNil)
c.Assert(id, Equals, test.id)
}
}
}
Loading

0 comments on commit db9bb50

Please sign in to comment.