Skip to content

Commit

Permalink
sync-diff-inspector: add annotation for create table info (#679)
Browse files Browse the repository at this point in the history
close #678
  • Loading branch information
Leavrth authored Sep 23, 2022
1 parent cc87705 commit 4572996
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 7 deletions.
77 changes: 77 additions & 0 deletions pkg/dbutil/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package dbutil
import (
"context"
"fmt"
"regexp"
"strings"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/parser"
Expand All @@ -29,10 +31,85 @@ import (
"github.com/pingcap/tidb/util/collate"
)

const (
AnnotationClusteredReplaceString = "${1} /*T![clustered_index] CLUSTERED */${2}\n"
AnnotationNonClusteredReplaceString = "${1} /*T![clustered_index] NONCLUSTERED */${2}\n"
)

func init() {
collate.SetNewCollationEnabledForTest(false)
}

// addClusteredAnnotation add the `/*T![clustered_index] NONCLUSTERED */` for primary key of create table info
// In the older version, the create table info hasn't `/*T![clustered_index] NONCLUSTERED */`,
// which lead the issue https://github.com/pingcap/tidb-tools/issues/678
//
// Before Get Create Table Info:
// mysql> SHOW CREATE TABLE `test`.`itest`;
//
// +-------+--------------------------------------------------------------------+
// | Table | Create Table |
// +-------+--------------------------------------------------------------------+
// | itest | CREATE TABLE `itest` (
// `id` int(11) DEFAULT NULL,
// `name` varchar(24) DEFAULT NULL,
// PRIMARY KEY (`id`)
// ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin |
// +-------+--------------------------------------------------------------------+
//
// After Add the annotation:
//
// +-------+--------------------------------------------------------------------+
// | Table | Create Table |
// +-------+--------------------------------------------------------------------+
// | itest | CREATE TABLE `itest` (
// `id` int(11) DEFAULT NULL,
// `name` varchar(24) DEFAULT NULL,
// PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */
// ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin |
// +-------+--------------------------------------------------------------------+
func addClusteredAnnotationForPrimaryKey(raw string, replace string) (string, error) {
reg, regErr := regexp.Compile(`(PRIMARY\sKEY.*\))(\s*,?)\s*\n`)
if reg == nil || regErr != nil {
return raw, errors.Annotate(regErr, "failed to compile regex for add clustered annotation, err: %s")
}
return reg.ReplaceAllString(raw, replace), nil
}

func isPKISHandle(ctx context.Context, db QueryExecutor, schemaName string, tableName string) bool {
query := fmt.Sprintf("SELECT _tidb_rowid FROM %s LIMIT 0;", TableName(schemaName, tableName))
_, err := db.QueryContext(ctx, query)
if err != nil && strings.Contains(err.Error(), "Unknown column") {
return true
}
return false
}

func GetTableInfoWithVersion(ctx context.Context, db QueryExecutor, schemaName string, tableName string, version *semver.Version) (*model.TableInfo, error) {
createTableSQL, err := GetCreateTableSQL(ctx, db, schemaName, tableName)
if err != nil {
return nil, errors.Trace(err)
}

if version != nil && version.Major <= 4 {
var replaceString string
if isPKISHandle(ctx, db, schemaName, tableName) {
replaceString = AnnotationClusteredReplaceString
} else {
replaceString = AnnotationNonClusteredReplaceString
}
createTableSQL, err = addClusteredAnnotationForPrimaryKey(createTableSQL, replaceString)
if err != nil {
return nil, errors.Trace(err)
}
}
parser2, err := GetParserForDB(ctx, db)
if err != nil {
return nil, errors.Trace(err)
}
return GetTableInfoBySQL(createTableSQL, parser2)
}

// GetTableInfo returns table information.
func GetTableInfo(ctx context.Context, db QueryExecutor, schemaName string, tableName string) (*model.TableInfo, error) {
createTableSQL, err := GetCreateTableSQL(ctx, db, schemaName, tableName)
Expand Down
81 changes: 81 additions & 0 deletions pkg/dbutil/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,84 @@ func (*testDBSuite) TestSchemacmpEncode(c *C) {
table := schemacmp.Encode(tableInfo)
c.Assert(table.String(), Equals, "CREATE TABLE `tbl`(`id` INT(24) NOT NULL, PRIMARY KEY (`id`)) CHARSET UTF8MB4 COLLATE UTF8MB4_BIN")
}

func (s *testDBSuite) TestAddClusteredAnnotation(c *C) {
type testCase struct {
raw string
expect string
}

cases := []testCase{
{
// for version > 5, nothing changed
raw: `
CREATE TABLE t (
i int(11) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (i) /*T![clustered_index] NONCLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T! SHARD_ROW_ID_BITS=2 */
`,
expect: `
CREATE TABLE t (
i int(11) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (i) /*T![clustered_index] NONCLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T! SHARD_ROW_ID_BITS=2 */
`,
},
{
// for version < 5, add the annotation
raw: `
CREATE TABLE t (
i int(11) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (i)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T! SHARD_ROW_ID_BITS=2 */
`,
expect: `
CREATE TABLE t (
i int(11) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (i) /*T![clustered_index] NONCLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T! SHARD_ROW_ID_BITS=2 */
`,
},
{
// have comma need to be matched
raw: `
CREATE TABLE t (
i int(11) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (i),
KEY name (name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T! SHARD_ROW_ID_BITS=2 */
`,
expect: `
CREATE TABLE t (
i int(11) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (i) /*T![clustered_index] NONCLUSTERED */,
KEY name (name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T! SHARD_ROW_ID_BITS=2 */
`,
},
{
// somthing strange
raw: `
CREATE TABLE t (
i int(11) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (i, ` + "`" + `i)d` + "`" + `)) ,
KEY name (name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T! SHARD_ROW_ID_BITS=2 */
`,
expect: `
CREATE TABLE t (
i int(11) NOT NULL AUTO_INCREMENT,
PRIMARY KEY (i, ` + "`" + `i)d` + "`" + `)) /*T![clustered_index] NONCLUSTERED */ ,
KEY name (name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T! SHARD_ROW_ID_BITS=2 */
`,
},
}

for i, ca := range cases {
c.Log(i)
res, err := addClusteredAnnotationForPrimaryKey(ca.raw, AnnotationNonClusteredReplaceString)
c.Assert(err, IsNil)
c.Assert(res, Equals, ca.expect)
}
}
4 changes: 3 additions & 1 deletion sync_diff_inspector/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,12 @@ func initTables(ctx context.Context, cfg *config.Config) (cfgTables []*config.Ta
// will add default source information, don't worry, we will use table config's info replace this later.
// cfg.Tables.Schema => cfg.Tables.Tables => target/source Schema.Table
cfgTables = make([]*config.TableConfig, 0, len(TargetTablesList))
version := utils.TryToGetVersion(ctx, downStreamConn)
for _, tables := range TargetTablesList {
if cfg.Task.TargetCheckTables.MatchTable(tables.OriginSchema, tables.OriginTable) {
log.Debug("match target table", zap.String("table", dbutil.TableName(tables.OriginSchema, tables.OriginTable)))
tableInfo, err := dbutil.GetTableInfo(ctx, downStreamConn, tables.OriginSchema, tables.OriginTable)

tableInfo, err := dbutil.GetTableInfoWithVersion(ctx, downStreamConn, tables.OriginSchema, tables.OriginTable, version)
if err != nil {
return nil, errors.Errorf("get table %s.%s's information error %s", tables.OriginSchema, tables.OriginTable, errors.ErrorStack(err))
}
Expand Down
40 changes: 38 additions & 2 deletions sync_diff_inspector/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestTiDBSource(t *testing.T) {
{
schema: "source_test",
table: "test1",
createTableSQL: "CREATE TABLE `source_test`.`test1` (`a` int, `b` varchar(24), `c` float, primary key(`a`, `b`))",
createTableSQL: "CREATE TABLE `source_test`.`test1` (`a` int, `b` varchar(24), `c` float, PRIMARY KEY(`a`)\n)",
rangeColumns: []string{"a", "b"},
rangeLeft: []string{"3", "b"},
rangeRight: []string{"5", "f"},
Expand All @@ -129,7 +129,7 @@ func TestTiDBSource(t *testing.T) {
{
schema: "source_test",
table: "test2",
createTableSQL: "CREATE TABLE `source_test`.`test2` (`a` int, `b` varchar(24), `c` float, `d` datetime, primary key(`a`, `b`))",
createTableSQL: "CREATE TABLE `source_test`.`test2` (`a` int, `b` varchar(24), `c` float, `d` datetime, PRIMARY KEY(`a`)\n)",
rangeColumns: []string{"a", "b"},
rangeLeft: []string{"3", "b"},
rangeRight: []string{"5", "f"},
Expand All @@ -140,13 +140,47 @@ func TestTiDBSource(t *testing.T) {

mock.ExpectQuery("SHOW DATABASES").WillReturnRows(sqlmock.NewRows([]string{"Database"}).AddRow("mysql").AddRow("source_test"))
mock.ExpectQuery("SHOW FULL TABLES*").WillReturnRows(sqlmock.NewRows([]string{"Table", "type"}).AddRow("test1", "base").AddRow("test2", "base"))
mock.ExpectQuery("SELECT version()*").WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.12"))

f, err := filter.Parse([]string{"source_test.*"})
require.NoError(t, err)
tidb, err := NewTiDBSource(ctx, tableDiffs, &config.DataSource{Conn: conn}, utils.NewWorkerPool(1, "bucketIter"), f)
require.NoError(t, err)

caseFn := []struct {
check func(sqlmock.Sqlmock, Source) (bool, error)
}{
{
check: func(mock sqlmock.Sqlmock, source Source) (bool, error) {
mock.ExpectQuery("SHOW CREATE TABLE*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tableCases[0].table, tableCases[0].createTableSQL))
mock.ExpectQuery("SELECT _tidb_rowid FROM*").WillReturnRows(sqlmock.NewRows([]string{"_tidb_rowid"}))
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'*").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"))
tableInfo, err := source.GetSourceStructInfo(ctx, 0)
if err != nil {
return false, err
}
return !tableInfo[0].PKIsHandle, nil
},
},
{
check: func(mock sqlmock.Sqlmock, source Source) (bool, error) {
mock.ExpectQuery("SHOW CREATE TABLE*").WillReturnRows(sqlmock.NewRows([]string{"Table", "Create Table"}).AddRow(tableCases[1].table, tableCases[1].createTableSQL))
mock.ExpectQuery("SELECT _tidb_rowid FROM*").WillReturnError(fmt.Errorf("ERROR 1054 (42S22): Unknown column '_tidb_rowid' in 'field list'"))
mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'*").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"))
tableInfo, err := source.GetSourceStructInfo(ctx, 0)
if err != nil {
return false, err
}
return tableInfo[0].PKIsHandle, nil
},
},
}

for n, tableCase := range tableCases {
t.Log(n)
check, err := caseFn[n].check(mock, tidb)
require.NoError(t, err)
require.True(t, check)
require.Equal(t, n, tableCase.rangeInfo.GetTableIndex())
countRows := sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456)
mock.ExpectQuery("SELECT COUNT.*").WillReturnRows(countRows)
Expand Down Expand Up @@ -248,6 +282,7 @@ func TestFallbackToRandomIfRangeIsSet(t *testing.T) {
statsRows.AddRow("source_test", "test1", "PRIMARY", 1, (i+1)*64, (i+1)*64, 1,
fmt.Sprintf("(%d, %d)", i*64, i*12), fmt.Sprintf("(%d, %d)", (i+1)*64-1, (i+1)*12-1))
}
mock.ExpectQuery("SELECT version()*").WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.12"))
mock.ExpectQuery(regexp.QuoteMeta("SELECT COUNT(1) cnt")).WillReturnRows(sqlmock.NewRows([]string{"cnt"}).AddRow(100))

f, err := filter.Parse([]string{"source_test.*"})
Expand Down Expand Up @@ -571,6 +606,7 @@ func TestTiDBRouter(t *testing.T) {
mock.ExpectQuery("SHOW FULL TABLES IN.*").WillReturnRows(tablesRows)
tablesRows = sqlmock.NewRows([]string{"Tables_in_test", "Table_type"}).AddRow("test2", "BASE TABLE")
mock.ExpectQuery("SHOW FULL TABLES IN.*").WillReturnRows(tablesRows)
mock.ExpectQuery("SELECT version()*").WillReturnRows(sqlmock.NewRows([]string{"version()"}).AddRow("5.7.25-TiDB-v4.0.12"))

f, err := filter.Parse([]string{"*.*"})
require.NoError(t, err)
Expand Down
6 changes: 5 additions & 1 deletion sync_diff_inspector/source/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"time"

"github.com/coreos/go-semver/semver"
tableFilter "github.com/pingcap/tidb-tools/pkg/table-filter"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -88,6 +89,8 @@ type TiDBSource struct {
// bucketSpliterPool is the shared pool to produce chunks using bucket
bucketSpliterPool *utils.WorkerPool
dbConn *sql.DB

version *semver.Version
}

func (s *TiDBSource) GetTableAnalyzer() TableAnalyzer {
Expand Down Expand Up @@ -143,7 +146,7 @@ func (s *TiDBSource) GetSourceStructInfo(ctx context.Context, tableIndex int) ([
tableInfos := make([]*model.TableInfo, 1)
tableDiff := s.GetTables()[tableIndex]
source := getMatchSource(s.sourceTableMap, tableDiff)
tableInfos[0], err = dbutil.GetTableInfo(ctx, s.GetDB(), source.OriginSchema, source.OriginTable)
tableInfos[0], err = dbutil.GetTableInfoWithVersion(ctx, s.GetDB(), source.OriginSchema, source.OriginTable, s.version)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -267,6 +270,7 @@ func NewTiDBSource(ctx context.Context, tableDiffs []*common.TableDiff, ds *conf
snapshot: ds.Snapshot,
dbConn: ds.Conn,
bucketSpliterPool: bucketSpliterPool,
version: utils.TryToGetVersion(ctx, ds.Conn),
}
return ts, nil
}
28 changes: 25 additions & 3 deletions sync_diff_inspector/utils/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,38 @@ func GetSpecifiedColumnValueAndClose(rows *sql.Rows, columnName string) ([]strin
return strs, errors.Trace(rows.Err())
}

// parse versino string to semver.Version
func parseVersion(versionStr string) (*semver.Version, error) {
versionStr = tidbVersionRegex.FindString(versionStr)[1:]
versionStr = strings.TrimPrefix(versionStr, "v")
return semver.NewVersion(versionStr)
}

// It's OK to failed to get db version
func TryToGetVersion(ctx context.Context, db *sql.DB) *semver.Version {
versionStr, err := dbutil.GetDBVersion(ctx, db)
if err != nil {
return nil
}
if !strings.Contains(strings.ToLower(versionStr), "tidb") {
return nil
}
version, err := parseVersion(versionStr)
if err != nil {
// It's OK when parse version failed
version = nil
}
return version
}

// StartGCSavepointUpdateService keeps GC safePoint stop moving forward.
func StartGCSavepointUpdateService(ctx context.Context, pdCli pd.Client, db *sql.DB, snapshot string) error {
versionStr, err := selectVersion(db)
if err != nil {
log.Info("detect version of tidb failed")
return nil
}
versionStr = tidbVersionRegex.FindString(versionStr)[1:]
versionStr = strings.TrimPrefix(versionStr, "v")
tidbVersion, err := semver.NewVersion(versionStr)
tidbVersion, err := parseVersion(versionStr)
if err != nil {
log.Info("parse version of tidb failed")
return nil
Expand Down

0 comments on commit 4572996

Please sign in to comment.