Skip to content

Commit

Permalink
adaptation to oracle checkpoint (#1097)
Browse files Browse the repository at this point in the history
  • Loading branch information
cartersz authored Dec 23, 2021
1 parent a7ae643 commit 6c33762
Show file tree
Hide file tree
Showing 12 changed files with 764 additions and 18 deletions.
2 changes: 2 additions & 0 deletions drainer/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func NewCheckPoint(cfg *Config) (CheckPoint, error) {
switch cfg.CheckpointType {
case "mysql", "tidb":
cp, err = newMysql(cfg)
case "oracle":
cp, err = newOracle(cfg)
case "file":
cp, err = NewFile(cfg.InitialCommitTS, cfg.CheckPointFile)
default:
Expand Down
58 changes: 58 additions & 0 deletions drainer/checkpoint/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,61 @@ func (s *newMysqlSuite) TestCreationErrors(c *C) {
c.Assert(err, NotNil)
c.Assert(err, ErrorMatches, ".*fail table.*")
}

func (s *newMysqlSuite) TestDefaultCheckpointTable(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)

origOpen := sqlOpenDB
defer func() { sqlOpenDB = origOpen }()
sqlOpenDB = func(user, password string, host string, port int, tls *tls.Config) (*sql.DB, error) {
return db, nil
}

mock.ExpectExec("create schema.*").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec("create table if not exists.*").WillReturnResult(sqlmock.NewResult(0, 0))
clusterIDRow := sqlmock.NewRows([]string{"clusterID"}).AddRow("12345")
mock.ExpectQuery("select clusterID from.*").WillReturnRows(clusterIDRow)
checkPointRow := sqlmock.NewRows([]string{"CHECKPOINT"}).
AddRow(`{"commitTS": 1024, "consistent": true, "ts-map": {"primary-ts": 2000, "secondary-ts": 1999}}`)
mock.ExpectQuery("select checkPoint from.*").WillReturnRows(checkPointRow)
cp, err := newMysql(&Config{
CheckpointType: "tidb",
})
c.Assert(err, IsNil)
c.Assert(cp, NotNil)
pcp := cp.(*MysqlCheckPoint)
c.Assert(err, IsNil)
c.Assert(pcp.table, Equals, "checkpoint")
c.Assert(pcp.schema, Equals, "tidb_binlog")
}

func (s *newMysqlSuite) TestConfigCheckpointTable(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)

origOpen := sqlOpenDB
defer func() { sqlOpenDB = origOpen }()
sqlOpenDB = func(user, password string, host string, port int, tls *tls.Config) (*sql.DB, error) {
return db, nil
}

mock.ExpectExec("create schema.*").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec("create table if not exists.*").WillReturnResult(sqlmock.NewResult(0, 0))
clusterIDRow := sqlmock.NewRows([]string{"clusterID"}).AddRow("12345")
mock.ExpectQuery("select clusterID from.*").WillReturnRows(clusterIDRow)
checkPointRow := sqlmock.NewRows([]string{"CHECKPOINT"}).
AddRow(`{"commitTS": 1024, "consistent": true, "ts-map": {"primary-ts": 2000, "secondary-ts": 1999}}`)
mock.ExpectQuery("select checkPoint from.*").WillReturnRows(checkPointRow)
cp, err := newMysql(&Config{
CheckpointType: "tidb",
Table: "table-1",
Schema: "new_schema",
})
c.Assert(err, IsNil)
c.Assert(cp, NotNil)
pcp := cp.(*MysqlCheckPoint)
c.Assert(err, IsNil)
c.Assert(pcp.table, Equals, "table-1")
c.Assert(pcp.schema, Equals, "new_schema")
}
209 changes: 209 additions & 0 deletions drainer/checkpoint/oracle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package checkpoint

import (
"context"
dbsql "database/sql"
"encoding/json"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"

// mysql driver
_ "github.com/go-sql-driver/mysql"
"github.com/pingcap/tidb-binlog/pkg/loader"
"github.com/pingcap/tidb-binlog/pkg/util"
)

// OracleCheckPoint is a local savepoint struct for Oracle
type OracleCheckPoint struct {
sync.RWMutex
closed bool
clusterID uint64
initialCommitTS int64

db *dbsql.DB
schema string
table string

ConsistentSaved bool `toml:"consistent" json:"consistent"`
CommitTS int64 `toml:"commitTS" json:"commitTS"`
TsMap map[string]int64 `toml:"ts-map" json:"ts-map"`
Version int64 `toml:"schema-version" json:"schema-version"`
}

var _ CheckPoint = &OracleCheckPoint{}

var sqlOpenOracleDB = loader.CreateOracleDB

func newOracle(cfg *Config) (CheckPoint, error) {
setDefaultConfig(cfg)

if cfg.Db.TLS != nil {
log.Info("enable TLS for saving checkpoint")
}

db, err := sqlOpenOracleDB(cfg.Db.User, cfg.Db.Password, cfg.Db.Host, cfg.Db.Port, cfg.Db.OracleServiceName, cfg.Db.OracleConnectString)
if err != nil {
return nil, errors.Annotate(err, "open db failed")
}

sp := &OracleCheckPoint{
db: db,
clusterID: cfg.ClusterID,
initialCommitTS: cfg.InitialCommitTS,
schema: cfg.Schema,
table: cfg.Table,
TsMap: make(map[string]int64),
}

sql := genCheckTableIsExist2o(sp)

var tableName string
err = db.QueryRow(sql).Scan(&tableName)
switch {
case err == dbsql.ErrNoRows:
sql = genCreateTable2o(sp)
if _, err = db.Exec(sql); err != nil {
return nil, errors.Annotatef(err, "create checkpoint table failed, sql: %s", sql)
}
case err != nil:
return nil, errors.Annotatef(err, "exec failed, sql: %s", sql)
}

if sp.clusterID == 0 {
id, err := getClusterID(db, sp.schema, sp.table)
if err != nil {
return nil, errors.Trace(err)
}

log.Info("set cluster id", zap.Uint64("id", id))
sp.clusterID = id
}

err = sp.Load()
return sp, errors.Trace(err)
}

// Load implements CheckPoint.Load interface
func (sp *OracleCheckPoint) Load() error {
sp.Lock()
defer sp.Unlock()

if sp.closed {
return errors.Trace(ErrCheckPointClosed)
}

defer func() {
if sp.CommitTS == 0 {
sp.CommitTS = sp.initialCommitTS
}
}()

var str string
selectSQL := genSelectSQL2o(sp)
err := sp.db.QueryRow(selectSQL).Scan(&str)
switch {
case err == dbsql.ErrNoRows:
sp.CommitTS = sp.initialCommitTS
return nil
case err != nil:
return errors.Annotatef(err, "QueryRow failed, sql: %s", selectSQL)
}

if err := json.Unmarshal([]byte(str), sp); err != nil {
return errors.Trace(err)
}

return nil
}

// Save implements checkpoint.Save interface
func (sp *OracleCheckPoint) Save(ts, secondaryTS int64, consistent bool, version int64) error {
sp.Lock()
defer sp.Unlock()

if sp.closed {
return errors.Trace(ErrCheckPointClosed)
}

sp.CommitTS = ts
sp.ConsistentSaved = consistent
if version > sp.Version {
sp.Version = version
}

if secondaryTS > 0 {
sp.TsMap["primary-ts"] = ts
sp.TsMap["secondary-ts"] = secondaryTS
}

b, err := json.Marshal(sp)
if err != nil {
return errors.Annotate(err, "json marshal failed")
}

sql := genReplaceSQL2o(sp, string(b))
return util.RetryContext(context.TODO(), 5, time.Second, 1, func(context.Context) error {
_, err = sp.db.Exec(sql)
if err != nil {
return errors.Annotatef(err, "query sql failed: %s", sql)
}
return nil
})
}

// IsConsistent implements CheckPoint interface
func (sp *OracleCheckPoint) IsConsistent() bool {
sp.RLock()
defer sp.RUnlock()

return sp.ConsistentSaved
}

// TS implements CheckPoint.TS interface
func (sp *OracleCheckPoint) TS() int64 {
sp.RLock()
defer sp.RUnlock()

return sp.CommitTS
}

// SchemaVersion implements CheckPoint.SchemaVersion interface.
func (sp *OracleCheckPoint) SchemaVersion() int64 {
sp.RLock()
defer sp.RUnlock()

return sp.Version
}

// Close implements CheckPoint.Close interface
func (sp *OracleCheckPoint) Close() error {
sp.Lock()
defer sp.Unlock()

if sp.closed {
return errors.Trace(ErrCheckPointClosed)
}

err := sp.db.Close()
if err == nil {
sp.closed = true
}
return errors.Trace(err)
}
Loading

0 comments on commit 6c33762

Please sign in to comment.