Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feed downstream by relay log at startup if need #883

Merged
merged 16 commits into from
Feb 7, 2020
Merged
7 changes: 3 additions & 4 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,12 @@ ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql"
#
#replicate-do-db = ["~^b.*","s1"]

[syncer.relay]
# directory of relay logs. Empty string indicates disabling relay log.
# relay log works only if the downstream is TiDB/MySQL.
relay-log-dir = ""
# log-dir = ""
# max file size of each relay log
relay-log-size = 10485760
# read buffer size of relay log
relay-read-buf-size = 8
# max-file-size = 10485760

#[[syncer.replicate-do-table]]
#db-name ="test"
Expand Down
17 changes: 14 additions & 3 deletions drainer/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ import (
"go.uber.org/zap"
)

const (
// StatusNormal means server quit normally, data <= ts is synced to downstream
StatusNormal int = 0
Copy link
Collaborator

@IANTHEREAL IANTHEREAL Feb 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth a better name, like StatusConsistent and StatusDrained

In fact, I'm not sure what's meaning of it, does it mean the binlog is drained, or reaching a consistent replication state? which one is right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good Change!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename as StatusConsistent 002d449


// StatusRunning means server running or quit abnormally, part of data may or may not been synced to downstream
StatusRunning int = 1
)

var (
// ErrCheckPointClosed indicates the CheckPoint already closed.
ErrCheckPointClosed = errors.New("CheckPoint already closed")
Expand All @@ -31,11 +39,14 @@ type CheckPoint interface {
Load() error

// Save saves checkpoint information.
Save(int64, int64) error
Save(commitTS int64, slaveTS int64, status int) error

// Pos gets position information.
// TS gets checkpoint commit timestamp.
TS() int64

// Status return the status saved.
Status() int

// Close closes the CheckPoint and release resources, after closed other methods should not be called again.
Close() error
}
Expand All @@ -50,7 +61,7 @@ func NewCheckPoint(cfg *Config) (CheckPoint, error) {
case "mysql", "tidb":
cp, err = newMysql(cfg)
case "file":
cp, err = NewFile(cfg)
cp, err = NewFile(cfg.InitialCommitTS, cfg.CheckPointFile)
default:
err = errors.Errorf("unsupported checkpoint type %s", cfg.CheckpointType)
}
Expand Down
20 changes: 15 additions & 5 deletions drainer/checkpoint/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ type FileCheckPoint struct {

name string

CommitTS int64 `toml:"commitTS" json:"commitTS"`
StatusSaved int `toml:"status" json:"status"`
CommitTS int64 `toml:"commitTS" json:"commitTS"`
}

// NewFile creates a new FileCheckpoint.
func NewFile(cfg *Config) (CheckPoint, error) {
func NewFile(initialCommitTS int64, filePath string) (CheckPoint, error) {
pb := &FileCheckPoint{
initialCommitTS: cfg.InitialCommitTS,
name: cfg.CheckPointFile,
initialCommitTS: initialCommitTS,
name: filePath,
}
err := pb.Load()
if err != nil {
Expand Down Expand Up @@ -81,7 +82,7 @@ func (sp *FileCheckPoint) Load() error {
}

// Save implements CheckPoint.Save interface
func (sp *FileCheckPoint) Save(ts, slaveTS int64) error {
func (sp *FileCheckPoint) Save(ts, slaveTS int64, status int) error {
sp.Lock()
defer sp.Unlock()

Expand All @@ -90,6 +91,7 @@ func (sp *FileCheckPoint) Save(ts, slaveTS int64) error {
}

sp.CommitTS = ts
sp.StatusSaved = status

var buf bytes.Buffer
e := toml.NewEncoder(&buf)
Expand All @@ -114,6 +116,14 @@ func (sp *FileCheckPoint) TS() int64 {
return sp.CommitTS
}

// Status implements CheckPoint.Status interface
func (sp *FileCheckPoint) Status() int {
sp.RLock()
defer sp.RUnlock()

return sp.StatusSaved
}

// Close implements CheckPoint.Close interface
func (sp *FileCheckPoint) Close() error {
sp.Lock()
Expand Down
19 changes: 9 additions & 10 deletions drainer/checkpoint/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ import (
func (t *testCheckPointSuite) TestFile(c *C) {
fileName := "/tmp/test"
notExistFileName := "test_not_exist"
cfg := new(Config)
cfg.CheckPointFile = fileName
meta, err := NewFile(cfg)
meta, err := NewFile(0, fileName)
c.Assert(err, IsNil)
defer os.RemoveAll(fileName)

// zero (initial) CommitTs
c.Assert(meta.TS(), Equals, int64(0))
c.Assert(meta.Status(), Equals, StatusNormal)

testTs := int64(1)
// save ts
err = meta.Save(testTs, 0)
err = meta.Save(testTs, 0, StatusRunning)
c.Assert(err, IsNil)
// check ts
ts := meta.TS()
c.Assert(ts, Equals, testTs)
c.Assert(meta.Status(), Equals, StatusRunning)

// check load ts
err = meta.Load()
Expand All @@ -47,23 +47,22 @@ func (t *testCheckPointSuite) TestFile(c *C) {
c.Assert(ts, Equals, testTs)

// check not exist meta file
cfg.CheckPointFile = notExistFileName
meta, err = NewFile(cfg)
meta, err = NewFile(0, notExistFileName)
c.Assert(err, IsNil)
err = meta.Load()
c.Assert(err, IsNil)
c.Assert(meta.TS(), Equals, int64(0))

// check not exist meta file, but with initialCommitTs
cfg.InitialCommitTS = 123
meta, err = NewFile(cfg)
var initialCommitTS int64 = 123
meta, err = NewFile(initialCommitTS, notExistFileName)
c.Assert(err, IsNil)
c.Assert(meta.TS(), Equals, cfg.InitialCommitTS)
c.Assert(meta.TS(), Equals, initialCommitTS)

// close the checkpoint
err = meta.Close()
c.Assert(err, IsNil)
c.Assert(errors.Cause(meta.Load()), Equals, ErrCheckPointClosed)
c.Assert(errors.Cause(meta.Save(0, 0)), Equals, ErrCheckPointClosed)
c.Assert(errors.Cause(meta.Save(0, 0, StatusNormal)), Equals, ErrCheckPointClosed)
c.Assert(errors.Cause(meta.Close()), Equals, ErrCheckPointClosed)
}
31 changes: 28 additions & 3 deletions drainer/checkpoint/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"

// mysql driver
_ "github.com/go-sql-driver/mysql"
pkgsql "github.com/pingcap/tidb-binlog/pkg/sql"
Expand All @@ -35,10 +38,13 @@ type MysqlCheckPoint struct {
schema string
table string

CommitTS int64 `toml:"commitTS" json:"commitTS"`
TsMap map[string]int64 `toml:"ts-map" json:"ts-map"`
StatusSaved int `toml:"status" json:"status"`
CommitTS int64 `toml:"commitTS" json:"commitTS"`
TsMap map[string]int64 `toml:"ts-map" json:"ts-map"`
}

var _ CheckPoint = &MysqlCheckPoint{}

var sqlOpenDB = pkgsql.OpenDB

func newMysql(cfg *Config) (CheckPoint, error) {
Expand Down Expand Up @@ -68,6 +74,16 @@ func newMysql(cfg *Config) (CheckPoint, error) {
return nil, errors.Annotatef(err, "exec failed, sql: %s", sql)
}

if sp.clusterID == 0 {
id, err := getClusterID(db, sp.schema, sp.table)
Copy link
Collaborator

@IANTHEREAL IANTHEREAL Feb 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is very weird.
In order to handle the situation where the upstream cluster is completely down, drainer needs a correct way to fetch the cluster id. But current implementation is not satisfying, like drainer can't allow store multiple entries in checkpoint table

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I file an issue about it #889

if err != nil {
return nil, errors.Trace(err)
}

log.Info("set cluster id", zap.Uint64("id", id))
sp.clusterID = id
}

err = sp.Load()
return sp, errors.Trace(err)
}
Expand Down Expand Up @@ -106,7 +122,7 @@ func (sp *MysqlCheckPoint) Load() error {
}

// Save implements checkpoint.Save interface
func (sp *MysqlCheckPoint) Save(ts, slaveTS int64) error {
func (sp *MysqlCheckPoint) Save(ts, slaveTS int64, status int) error {
sp.Lock()
defer sp.Unlock()

Expand All @@ -115,6 +131,7 @@ func (sp *MysqlCheckPoint) Save(ts, slaveTS int64) error {
}

sp.CommitTS = ts
sp.StatusSaved = status

if slaveTS > 0 {
sp.TsMap["master-ts"] = ts
Expand All @@ -135,6 +152,14 @@ func (sp *MysqlCheckPoint) Save(ts, slaveTS int64) error {
return nil
}

// Status implements CheckPoint.Status interface
func (sp *MysqlCheckPoint) Status() int {
sp.RLock()
defer sp.RUnlock()

return sp.StatusSaved
}

// TS implements CheckPoint.TS interface
func (sp *MysqlCheckPoint) TS() int64 {
sp.RLock()
Expand Down
7 changes: 4 additions & 3 deletions drainer/checkpoint/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *saveSuite) TestShouldSaveCheckpoint(c *C) {
c.Assert(err, IsNil)
mock.ExpectExec("replace into db.tbl.*").WillReturnResult(sqlmock.NewResult(0, 0))
cp := MysqlCheckPoint{db: db, schema: "db", table: "tbl"}
err = cp.Save(1111, 0)
err = cp.Save(1111, 0, StatusRunning)
c.Assert(err, IsNil)
}

Expand All @@ -63,7 +63,7 @@ func (s *saveSuite) TestShouldUpdateTsMap(c *C) {
table: "tbl",
TsMap: make(map[string]int64),
}
err = cp.Save(65536, 3333)
err = cp.Save(65536, 3333, StatusRunning)
c.Assert(err, IsNil)
c.Assert(cp.TsMap["master-ts"], Equals, int64(65536))
c.Assert(cp.TsMap["slave-ts"], Equals, int64(3333))
Expand All @@ -83,12 +83,13 @@ func (s *loadSuite) TestShouldLoadFromDB(c *C) {
TsMap: make(map[string]int64),
}
rows := sqlmock.NewRows([]string{"checkPoint"}).
AddRow(`{"commitTS": 1024, "ts-map": {"master-ts": 2000, "slave-ts": 1999}}`)
AddRow(`{"commitTS": 1024, "status": 1, "ts-map": {"master-ts": 2000, "slave-ts": 1999}}`)
mock.ExpectQuery("select checkPoint from db.tbl.*").WillReturnRows(rows)

err = cp.Load()
c.Assert(err, IsNil)
c.Assert(cp.CommitTS, Equals, int64(1024))
c.Assert(cp.StatusSaved, Equals, 1)
c.Assert(cp.TsMap["master-ts"], Equals, int64(2000))
c.Assert(cp.TsMap["slave-ts"], Equals, int64(1999))
}
Expand Down
36 changes: 36 additions & 0 deletions drainer/checkpoint/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@
package checkpoint

import (
"database/sql"
"fmt"

// mysql driver
_ "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
)

// ErrNoCheckpointItem represents there's no any checkpoint item and the cluster id must be specified
// for the mysql checkpoint type.
var ErrNoCheckpointItem = errors.New("no any checkpoint item")
july2993 marked this conversation as resolved.
Show resolved Hide resolved

// DBConfig is the DB configuration.
type DBConfig struct {
Host string `toml:"host" json:"host"`
Expand Down Expand Up @@ -74,6 +80,36 @@ func genReplaceSQL(sp *MysqlCheckPoint, str string) string {
return fmt.Sprintf("replace into %s.%s values(%d, '%s')", sp.schema, sp.table, sp.clusterID, str)
}

// getClusterID return the cluster id iff the checkpoint table exist only one row.
func getClusterID(db *sql.DB, schema string, table string) (id uint64, err error) {
sqlQuery := fmt.Sprintf("select clusterID from %s.%s", schema, table)
rows, err := db.Query(sqlQuery)
if err != nil {
return 0, errors.Trace(err)
}

for rows.Next() {
if id > 0 {
return 0, errors.New("there are multi row in checkpoint table")
}

err = rows.Scan(&id)
if err != nil {
return 0, errors.Trace(err)
}
}

if rows.Err() != nil {
return 0, errors.Trace(rows.Err())
}

if id == 0 {
return 0, ErrNoCheckpointItem
}

return
}

func genSelectSQL(sp *MysqlCheckPoint) string {
return fmt.Sprintf("select checkPoint from %s.%s where clusterID = %d", sp.schema, sp.table, sp.clusterID)
}
63 changes: 63 additions & 0 deletions drainer/checkpoint/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package checkpoint

import (
"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
"github.com/pingcap/errors"
)

var _ = Suite(&testUtil{})

type testUtil struct{}

func (t *testUtil) TestG(c *C) {
tests := []struct {
name string
rows []uint64
id uint64
err bool
checkSpecifiedErr error
}{
{"no row", nil, 0, true, ErrNoCheckpointItem},
{"on row", []uint64{1}, 1, false, nil},
{"multi row", []uint64{1, 2}, 0, true, nil},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the checkSpecifiedErr be nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, true, nil means the result will be some error but we don't check what kind of error.

}

for _, test := range tests {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)

rows := sqlmock.NewRows([]string{"clusterID"})
for _, row := range test.rows {
rows.AddRow(row)
}

mock.ExpectQuery("select clusterID from .*").WillReturnRows(rows)

c.Log("test: ", test.name)
id, err := getClusterID(db, "schema", "table")
if test.err {
c.Assert(err, NotNil)
c.Assert(id, Equals, test.id)
if test.checkSpecifiedErr != nil {
c.Assert(errors.Cause(err), Equals, test.checkSpecifiedErr)
}
} else {
c.Assert(err, IsNil)
c.Assert(id, Equals, test.id)
}
}
}
Loading