Skip to content

Commit

Permalink
sync-diff-inspector: fix index fields not taking effect (#624)
Browse files Browse the repository at this point in the history
close #623
  • Loading branch information
liuzix authored Jun 7, 2022
1 parent b3ea358 commit cf6b7a8
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 9 deletions.
49 changes: 49 additions & 0 deletions sync_diff_inspector/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"database/sql/driver"
"fmt"
"os"
"regexp"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -254,6 +255,54 @@ func TestTiDBSource(t *testing.T) {
tidb.Close()
}

func TestFallbackToRandomIfRangeIsSet(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

conn, mock, err := sqlmock.New()
require.NoError(t, err)
defer conn.Close()

mock.ExpectQuery("SHOW DATABASES").WillReturnRows(sqlmock.NewRows([]string{"Database"}).AddRow("mysql").AddRow("source_test"))
mock.ExpectQuery("SHOW FULL TABLES*").WillReturnRows(sqlmock.NewRows([]string{"Table", "type"}).AddRow("test1", "base"))
statsRows := sqlmock.NewRows([]string{"Db_name", "Table_name", "Column_name", "Is_index", "Bucket_id", "Count", "Repeats", "Lower_Bound", "Upper_Bound"})
for i := 0; i < 5; i++ {
statsRows.AddRow("source_test", "test1", "PRIMARY", 1, (i+1)*64, (i+1)*64, 1,
fmt.Sprintf("(%d, %d)", i*64, i*12), fmt.Sprintf("(%d, %d)", (i+1)*64-1, (i+1)*12-1))
}
mock.ExpectQuery(regexp.QuoteMeta("SELECT COUNT(1) cnt")).WillReturnRows(sqlmock.NewRows([]string{"cnt"}).AddRow(100))

f, err := filter.Parse([]string{"source_test.*"})
require.NoError(t, err)

createTableSQL1 := "CREATE TABLE `test1` " +
"(`id` int(11) NOT NULL AUTO_INCREMENT, " +
" `k` int(11) NOT NULL DEFAULT '0', " +
"`c` char(120) NOT NULL DEFAULT '', " +
"PRIMARY KEY (`id`), KEY `k_1` (`k`))"

tableInfo, err := dbutil.GetTableInfoBySQL(createTableSQL1, parser.New())
require.NoError(t, err)

table1 := &common.TableDiff{
Schema: "source_test",
Table: "test1",
Info: tableInfo,
Range: "id < 10", // This should prevent using BucketIterator
}

tidb, err := NewTiDBSource(ctx, []*common.TableDiff{table1}, &config.DataSource{Conn: conn}, 1, f)
require.NoError(t, err)

analyze := tidb.GetTableAnalyzer()
chunkIter, err := analyze.AnalyzeSplitter(ctx, table1, nil)
require.NoError(t, err)
require.IsType(t, &splitter.RandomIterator{}, chunkIter)

chunkIter.Close()
tidb.Close()
}

func TestMysqlShardSources(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand Down
46 changes: 37 additions & 9 deletions sync_diff_inspector/splitter/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package splitter
import (
"context"
"database/sql"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
Expand Down Expand Up @@ -55,6 +54,12 @@ func NewBucketIterator(ctx context.Context, progressID string, table *common.Tab
}

func NewBucketIteratorWithCheckpoint(ctx context.Context, progressID string, table *common.TableDiff, dbConn *sql.DB, startRange *RangeInfo, checkThreadCount int) (*BucketIterator, error) {
if !utils.IsRangeTrivial(table.Range) {
return nil, errors.Errorf(
"BucketIterator does not support user configured Range. Range: %s",
table.Range)
}

bctx, cancel := context.WithCancel(ctx)
bs := &BucketIterator{
table: table,
Expand Down Expand Up @@ -125,34 +130,57 @@ func (s *BucketIterator) Next() (*chunk.Range, error) {
}

func (s *BucketIterator) init(startRange *RangeInfo) error {
fields, err := indexFieldsFromConfigString(s.table.Fields, s.table.Info)
if err != nil {
return err
}

s.nextChunk = 0
buckets, err := dbutil.GetBucketsInfo(context.Background(), s.dbConn, s.table.Schema, s.table.Table, s.table.Info)
if err != nil {
return errors.Trace(err)
}
indices, err := utils.GetBetterIndex(context.Background(), s.dbConn, s.table.Schema, s.table.Table, s.table.Info)
if err != nil {
return errors.Trace(err)

var indices []*model.IndexInfo
if fields.IsEmpty() {
indices, err = utils.GetBetterIndex(context.Background(), s.dbConn, s.table.Schema, s.table.Table, s.table.Info)
if err != nil {
return errors.Trace(err)
}
} else {
// There are user configured "index-fields", so we will try to match from all indices.
indices = dbutil.FindAllIndex(s.table.Info)
}

for _, index := range indices {
if index == nil {
continue
}
if startRange != nil && startRange.IndexID != index.ID {
continue
}
bucket, ok := buckets[index.Name.O]
if !ok {
return errors.NotFoundf("index %s in buckets info", index.Name.O)
}
log.Debug("buckets for index", zap.String("index", index.Name.O), zap.Reflect("buckets", buckets))

indexColumns := utils.GetColumnsFromIndex(index, s.table.Info)

if len(indexColumns) < len(index.Columns) {
// some column in index is ignored.
continue
}

if !fields.MatchesIndex(index) {
// We are enforcing user configured "index-fields" settings.
continue
}

bucket, ok := buckets[index.Name.O]
if !ok {
// We found an index matching the "index-fields", but no bucket is found
// for that index. Returning an error here will make the caller retry with
// the random splitter.
return errors.NotFoundf("index %s in buckets info", index.Name.O)
}
log.Debug("buckets for index", zap.String("index", index.Name.O), zap.Reflect("buckets", buckets))

s.buckets = bucket
s.indexColumns = indexColumns
s.indexID = index.ID
Expand Down
111 changes: 111 additions & 0 deletions sync_diff_inspector/splitter/index_fields.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package splitter

import (
"sort"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-tools/sync_diff_inspector/utils"
"github.com/pingcap/tidb/parser/model"
"go.uber.org/zap"
)

// indexFields wraps the column info for the user config "index-fields".
type indexFields struct {
cols []*model.ColumnInfo
tableInfo *model.TableInfo
empty bool
}

func indexFieldsFromConfigString(strFields string, tableInfo *model.TableInfo) (*indexFields, error) {
if len(strFields) == 0 {
// Empty option
return &indexFields{empty: true}, nil
}

if tableInfo == nil {
log.Panic("parsing index fields with empty tableInfo",
zap.String("index-fields", strFields))
}

splitFieldArr := strings.Split(strFields, ",")
for i := range splitFieldArr {
splitFieldArr[i] = strings.TrimSpace(splitFieldArr[i])
}

fields, err := GetSplitFields(tableInfo, splitFieldArr)
if err != nil {
return nil, errors.Trace(err)
}

// Sort the columns to help with comparison.
sortColsInPlace(fields)

return &indexFields{
cols: fields,
tableInfo: tableInfo,
}, nil
}

func (f *indexFields) MatchesIndex(index *model.IndexInfo) bool {
if f.empty {
// Default config matches all.
return true
}

// Sanity checks.
if index == nil {
log.Panic("matching with empty index")
}
if len(f.cols) == 0 {
log.Panic("unexpected cols with length 0")
}

if len(index.Columns) != len(f.cols) {
// We need an exact match.
// Lengths not matching eliminates the possibility.
return false
}

indexCols := utils.GetColumnsFromIndex(index, f.tableInfo)
// Sort for comparison
sortColsInPlace(indexCols)

for i := 0; i < len(indexCols); i++ {
if f.cols[i].ID != indexCols[i].ID {
return false
}
}

return true
}

func (f *indexFields) Cols() []*model.ColumnInfo {
return f.cols
}

// IsEmpty returns true if the struct represents an empty
// user-configured "index-fields" option.
func (f *indexFields) IsEmpty() bool {
return f.empty
}

func sortColsInPlace(cols []*model.ColumnInfo) {
sort.SliceStable(cols, func(i, j int) bool {
return cols[i].ID < cols[j].ID
})
}
106 changes: 106 additions & 0 deletions sync_diff_inspector/splitter/index_fields_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package splitter

import (
"testing"

"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb/parser"
"github.com/stretchr/testify/require"
)

func TestIndexFieldsSimple(t *testing.T) {
t.Parallel()

createTableSQL1 := "CREATE TABLE `sbtest1` " +
"(`id` int(11) NOT NULL AUTO_INCREMENT, " +
" `k` int(11) NOT NULL DEFAULT '0', " +
"`c` char(120) NOT NULL DEFAULT '', " +
"PRIMARY KEY (`id`), KEY `k_1` (`k`))"

tableInfo, err := dbutil.GetTableInfoBySQL(createTableSQL1, parser.New())
require.NoError(t, err)

fields, err := indexFieldsFromConfigString("k", tableInfo)
require.NoError(t, err)
require.False(t, fields.IsEmpty())
require.Len(t, fields.Cols(), 1)

for _, index := range tableInfo.Indices {
switch index.Name.String() {
case "PRIMARY":
require.False(t, fields.MatchesIndex(index))
case "k_1":
require.True(t, fields.MatchesIndex(index))
default:
require.FailNow(t, "unreachable")
}
}
}

func TestIndexFieldsComposite(t *testing.T) {
t.Parallel()

createTableSQL1 := "CREATE TABLE `sbtest1` " +
"(`id` int(11) NOT NULL AUTO_INCREMENT, " +
" `k` int(11) NOT NULL DEFAULT '0', " +
"`c` char(120) NOT NULL DEFAULT '', " +
"PRIMARY KEY (`id`, `k`)," +
"KEY `k_1` (`k`)," +
"UNIQUE INDEX `c_1` (`c`))"

tableInfo, err := dbutil.GetTableInfoBySQL(createTableSQL1, parser.New())
require.NoError(t, err)

fields, err := indexFieldsFromConfigString("id, k", tableInfo)
require.NoError(t, err)
require.False(t, fields.IsEmpty())
require.Len(t, fields.Cols(), 2)

for _, index := range tableInfo.Indices {
switch index.Name.String() {
case "PRIMARY":
require.True(t, fields.MatchesIndex(index))
case "k_1":
require.False(t, fields.MatchesIndex(index))
case "c_1":
require.False(t, fields.MatchesIndex(index))
default:
require.FailNow(t, "unreachable")
}
}
}

func TestIndexFieldsEmpty(t *testing.T) {
t.Parallel()

createTableSQL1 := "CREATE TABLE `sbtest1` " +
"(`id` int(11) NOT NULL AUTO_INCREMENT, " +
" `k` int(11) NOT NULL DEFAULT '0', " +
"`c` char(120) NOT NULL DEFAULT '', " +
"PRIMARY KEY (`id`), KEY `k_1` (`k`))"

tableInfo, err := dbutil.GetTableInfoBySQL(createTableSQL1, parser.New())
require.NoError(t, err)

fields, err := indexFieldsFromConfigString("", tableInfo)
require.NoError(t, err)
require.True(t, fields.IsEmpty())

for _, index := range tableInfo.Indices {
// Expected to match all.
require.True(t, fields.MatchesIndex(index))
}
}
8 changes: 8 additions & 0 deletions sync_diff_inspector/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,3 +905,11 @@ func GetChunkIDFromSQLFileName(fileIDStr string) (int, int, int, int, error) {
}
return tableIndex, bucketIndexLeft, bucketIndexRight, chunkIndex, nil
}

// IsRangeTrivial checks if a user configured Range is empty or `TRUE`.
func IsRangeTrivial(rangeCond string) bool {
if rangeCond == "" {
return true
}
return strings.ToLower(rangeCond) == "true"
}

0 comments on commit cf6b7a8

Please sign in to comment.