Skip to content

Commit

Permalink
sync-diff-inspector: record downstream count in the report (#554)
Browse files Browse the repository at this point in the history
close #588
  • Loading branch information
Leavrth authored Jan 18, 2022
1 parent 961d2c0 commit d9fdfa2
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 58 deletions.
33 changes: 19 additions & 14 deletions sync_diff_inspector/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool
schema, table := tableDiff.Schema, tableDiff.Table
var state string = checkpoints.SuccessState

isEqual, count, err := df.compareChecksumAndGetCount(ctx, rangeInfo)
isEqual, upCount, downCount, err := df.compareChecksumAndGetCount(ctx, rangeInfo)
if err != nil {
// If an error occurs during the checksum phase, skip the data compare phase.
state = checkpoints.FailedState
Expand All @@ -430,9 +430,9 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool
state = checkpoints.FailedState
// if the chunk's checksum differ, try to do binary check
info := rangeInfo
if count > splitter.SplitThreshold {
log.Debug("count greater than threshold, start do bingenerate", zap.Any("chunk id", rangeInfo.ChunkRange.Index), zap.Int64("chunk size", count))
info, err = df.BinGenerate(ctx, df.workSource, rangeInfo, count)
if upCount > splitter.SplitThreshold {
log.Debug("count greater than threshold, start do bingenerate", zap.Any("chunk id", rangeInfo.ChunkRange.Index), zap.Int64("upstream chunk size", upCount))
info, err = df.BinGenerate(ctx, df.workSource, rangeInfo, upCount)
if err != nil {
log.Error("fail to do binary search.", zap.Error(err))
df.report.SetTableMeetError(schema, table, err)
Expand All @@ -450,7 +450,7 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool
}
dml.node.State = state
id := rangeInfo.ChunkRange.Index
df.report.SetTableDataCheckResult(schema, table, isEqual, dml.rowAdd, dml.rowDelete, id)
df.report.SetTableDataCheckResult(schema, table, isEqual, dml.rowAdd, dml.rowDelete, upCount, downCount, id)
return isEqual
}

Expand Down Expand Up @@ -510,22 +510,27 @@ func (df *Diff) binSearch(ctx context.Context, targetSource source.Source, table
chunkLimits, args := tableRange.ChunkRange.ToString(tableDiff.Collation)
limitRange := fmt.Sprintf("(%s) AND (%s)", chunkLimits, tableDiff.Range)
midValues, err := utils.GetApproximateMidBySize(ctx, targetSource.GetDB(), tableDiff.Schema, tableDiff.Table, indexColumns, limitRange, args, count)
log.Debug("mid values", zap.Reflect("mid values", midValues), zap.Reflect("indices", indexColumns), zap.Reflect("bounds", tableRange.ChunkRange.Bounds))
if err != nil {
return nil, errors.Trace(err)
}
if midValues == nil {
// TODO Since the count is from upstream,
// the midValues may be empty when downstream has much less rows in this chunk.
return tableRange, nil
}
log.Debug("mid values", zap.Reflect("mid values", midValues), zap.Reflect("indices", indexColumns), zap.Reflect("bounds", tableRange.ChunkRange.Bounds))
log.Debug("table ranges", zap.Reflect("original range", tableRange))
for i := range indexColumns {
log.Debug("update tableRange", zap.String("field", indexColumns[i].Name.O), zap.String("value", midValues[indexColumns[i].Name.O]))
tableRange1.Update(indexColumns[i].Name.O, "", midValues[indexColumns[i].Name.O], false, true, tableDiff.Collation, tableDiff.Range)
tableRange2.Update(indexColumns[i].Name.O, midValues[indexColumns[i].Name.O], "", true, false, tableDiff.Collation, tableDiff.Range)
}
log.Debug("table ranges", zap.Reflect("tableRange 1", tableRange1), zap.Reflect("tableRange 2", tableRange2))
isEqual1, count1, err = df.compareChecksumAndGetCount(ctx, tableRange1)
isEqual1, count1, _, err = df.compareChecksumAndGetCount(ctx, tableRange1)
if err != nil {
return nil, errors.Trace(err)
}
isEqual2, count2, err = df.compareChecksumAndGetCount(ctx, tableRange2)
isEqual2, count2, _, err = df.compareChecksumAndGetCount(ctx, tableRange2)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -561,7 +566,7 @@ func (df *Diff) binSearch(ctx context.Context, targetSource source.Source, table
}
}

func (df *Diff) compareChecksumAndGetCount(ctx context.Context, tableRange *splitter.RangeInfo) (bool, int64, error) {
func (df *Diff) compareChecksumAndGetCount(ctx context.Context, tableRange *splitter.RangeInfo) (bool, int64, int64, error) {
var wg sync.WaitGroup
var upstreamInfo, downstreamInfo *source.ChecksumInfo
wg.Add(1)
Expand All @@ -574,19 +579,19 @@ func (df *Diff) compareChecksumAndGetCount(ctx context.Context, tableRange *spli

if upstreamInfo.Err != nil {
log.Warn("failed to compare upstream checksum")
return false, -1, errors.Trace(upstreamInfo.Err)
return false, -1, -1, errors.Trace(upstreamInfo.Err)
}
if downstreamInfo.Err != nil {
log.Warn("failed to compare downstream checksum")
return false, -1, errors.Trace(downstreamInfo.Err)
return false, -1, -1, errors.Trace(downstreamInfo.Err)

}
// TODO two counts are not necessary equal

if upstreamInfo.Count == downstreamInfo.Count && upstreamInfo.Checksum == downstreamInfo.Checksum {
return true, upstreamInfo.Count, nil
return true, upstreamInfo.Count, downstreamInfo.Count, nil
}
log.Debug("checksum failed", zap.Any("chunk id", tableRange.ChunkRange.Index), zap.String("table", df.workSource.GetTables()[tableRange.GetTableIndex()].Table), zap.Int64("upstream chunk size", upstreamInfo.Count), zap.Int64("downstream chunk size", downstreamInfo.Count), zap.Int64("upstream checksum", upstreamInfo.Checksum), zap.Int64("downstream checksum", downstreamInfo.Checksum))
return false, upstreamInfo.Count, nil
return false, upstreamInfo.Count, downstreamInfo.Count, nil
}

func (df *Diff) compareRows(ctx context.Context, rangeInfo *splitter.RangeInfo, dml *ChunkDML) (bool, error) {
Expand Down
52 changes: 35 additions & 17 deletions sync_diff_inspector/report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -62,12 +63,15 @@ type TableResult struct {
DataEqual bool `json:"data-equal"`
MeetError error `json:"-"`
ChunkMap map[string]*ChunkResult `json:"chunk-result"` // `ChunkMap` stores the `ChunkResult` of each chunk of the table
UpCount int64 `json:"up-count"` // `UpCount` is the number of rows in the table from upstream
DownCount int64 `json:"down-count"` // `DownCount` is the number of rows in the table from downstream

}

// ChunkResult save the necessarily information to provide summary information
type ChunkResult struct {
RowsAdd int `json:"rows-add"` // `RowAdd` is the number of rows needed to add
RowsDelete int `json:"rows-delete"` // `RowDelete` is the number of rows needed to delete
RowsAdd int `json:"rows-add"` // `RowsAdd` is the number of rows needed to add
RowsDelete int `json:"rows-delete"` // `RowsDelete` is the number of rows needed to delete
}

// Report saves the check results.
Expand Down Expand Up @@ -101,16 +105,20 @@ func (r *Report) LoadReport(reportInfo *Report) {
}
}

func (r *Report) getSortedTables() []string {
equalTables := make([]string, 0)
func (r *Report) getSortedTables() [][]string {
equalTables := make([][]string, 0)
for schema, tableMap := range r.TableResults {
for table, result := range tableMap {
if result.StructEqual && result.DataEqual {
equalTables = append(equalTables, dbutil.TableName(schema, table))
equalRow := make([]string, 0, 3)
equalRow = append(equalRow, dbutil.TableName(schema, table))
equalRow = append(equalRow, strconv.FormatInt(result.UpCount, 10))
equalRow = append(equalRow, strconv.FormatInt(result.DownCount, 10))
equalTables = append(equalTables, equalRow)
}
}
}
sort.Slice(equalTables, func(i, j int) bool { return equalTables[i] < equalTables[j] })
sort.Slice(equalTables, func(i, j int) bool { return equalTables[i][0] < equalTables[j][0] })
return equalTables
}

Expand All @@ -128,12 +136,12 @@ func (r *Report) getDiffRows() [][]string {
} else {
diffRow = append(diffRow, "true")
}
rowAdd, rowDelete := 0, 0
rowsAdd, rowsDelete := 0, 0
for _, chunkResult := range result.ChunkMap {
rowAdd += chunkResult.RowsAdd
rowDelete += chunkResult.RowsDelete
rowsAdd += chunkResult.RowsAdd
rowsDelete += chunkResult.RowsDelete
}
diffRow = append(diffRow, fmt.Sprintf("+%d/-%d", rowAdd, rowDelete))
diffRow = append(diffRow, fmt.Sprintf("+%d/-%d", rowsAdd, rowsDelete), strconv.FormatInt(result.UpCount, 10), strconv.FormatInt(result.DownCount, 10))
diffRows = append(diffRows, diffRow)
}
}
Expand Down Expand Up @@ -189,14 +197,22 @@ func (r *Report) CommitSummary() error {
summaryFile.WriteString("Comparison Result\n\n\n\n")
summaryFile.WriteString("The table structure and data in following tables are equivalent\n\n")
equalTables := r.getSortedTables()
for _, table := range equalTables {
summaryFile.WriteString(table + "\n")
if len(equalTables) > 0 {
tableString := &strings.Builder{}
table := tablewriter.NewWriter(tableString)
table.SetHeader([]string{"Table", "UpCount", "DownCount"})
for _, v := range equalTables {
table.Append(v)
}
table.Render()
summaryFile.WriteString(tableString.String())
summaryFile.WriteString("\n\n")
}
if r.Result == Fail {
summaryFile.WriteString("\nThe following tables contains inconsistent data\n\n")
summaryFile.WriteString("The following tables contains inconsistent data\n\n")
tableString := &strings.Builder{}
table := tablewriter.NewWriter(tableString)
table.SetHeader([]string{"Table", "Structure equality", "Data diff rows"})
table.SetHeader([]string{"Table", "Structure equality", "Data diff rows", "UpCount", "DownCount"})
diffRows := r.getDiffRows()
for _, v := range diffRows {
table.Append(v)
Expand All @@ -205,7 +221,7 @@ func (r *Report) CommitSummary() error {
summaryFile.WriteString(tableString.String())
}
duration := r.Duration + time.Since(r.StartTime)
summaryFile.WriteString(fmt.Sprintf("Time Cost: %s\n", duration))
summaryFile.WriteString(fmt.Sprintf("\nTime Cost: %s\n", duration))
summaryFile.WriteString(fmt.Sprintf("Average Speed: %fMB/s\n", float64(r.TotalSize)/(1024.0*1024.0*duration.Seconds())))
return nil
}
Expand Down Expand Up @@ -291,11 +307,13 @@ func (r *Report) SetTableStructCheckResult(schema, table string, equal bool, ski
}

// SetTableDataCheckResult sets the data check result for table.
func (r *Report) SetTableDataCheckResult(schema, table string, equal bool, rowsAdd, rowsDelete int, id *chunk.ChunkID) {
func (r *Report) SetTableDataCheckResult(schema, table string, equal bool, rowsAdd, rowsDelete int, upCount, downCount int64, id *chunk.ChunkID) {
r.Lock()
defer r.Unlock()
result := r.TableResults[schema][table]
result.UpCount += upCount
result.DownCount += downCount
if !equal {
result := r.TableResults[schema][table]
result.DataEqual = equal
if _, ok := result.ChunkMap[id.ToString()]; !ok {
result.ChunkMap[id.ToString()] = &ChunkResult{
Expand Down
58 changes: 31 additions & 27 deletions sync_diff_inspector/report/report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestReport(t *testing.T) {

// Test Table Report
report.SetTableStructCheckResult("test", "tbl", true, false)
report.SetTableDataCheckResult("test", "tbl", true, 100, 200, &chunk.ChunkID{1, 1, 1, 1, 2})
report.SetTableDataCheckResult("test", "tbl", true, 100, 200, 222, 222, &chunk.ChunkID{1, 1, 1, 1, 2})
report.SetTableMeetError("test", "tbl", errors.New("eeee"))

new_report := NewReport(task)
Expand All @@ -118,17 +118,17 @@ func TestReport(t *testing.T) {
require.True(t, result.DataEqual)
require.True(t, result.StructEqual)

require.Equal(t, new_report.getSortedTables(), []string{"`atest`.`atbl`", "`ctest`.`atbl`", "`test`.`tbl`"})
require.Equal(t, new_report.getSortedTables(), [][]string{{"`atest`.`atbl`", "0", "0"}, {"`ctest`.`atbl`", "0", "0"}, {"`test`.`tbl`", "222", "222"}})
require.Equal(t, new_report.getDiffRows(), [][]string{})

new_report.SetTableStructCheckResult("atest", "atbl", true, false)
new_report.SetTableDataCheckResult("atest", "atbl", false, 111, 222, &chunk.ChunkID{1, 1, 1, 1, 2})
require.Equal(t, new_report.getSortedTables(), []string{"`ctest`.`atbl`", "`test`.`tbl`"})
require.Equal(t, new_report.getDiffRows(), [][]string{{"`atest`.`atbl`", "true", "+111/-222"}})
new_report.SetTableDataCheckResult("atest", "atbl", false, 111, 222, 333, 333, &chunk.ChunkID{1, 1, 1, 1, 2})
require.Equal(t, new_report.getSortedTables(), [][]string{{"`ctest`.`atbl`", "0", "0"}, {"`test`.`tbl`", "222", "222"}})
require.Equal(t, new_report.getDiffRows(), [][]string{{"`atest`.`atbl`", "true", "+111/-222", "333", "333"}})

new_report.SetTableStructCheckResult("atest", "atbl", false, false)
require.Equal(t, new_report.getSortedTables(), []string{"`ctest`.`atbl`", "`test`.`tbl`"})
require.Equal(t, new_report.getDiffRows(), [][]string{{"`atest`.`atbl`", "false", "+111/-222"}})
require.Equal(t, new_report.getSortedTables(), [][]string{{"`ctest`.`atbl`", "0", "0"}, {"`test`.`tbl`", "222", "222"}})
require.Equal(t, new_report.getDiffRows(), [][]string{{"`atest`.`atbl`", "false", "+111/-222", "333", "333"}})

new_report.SetTableStructCheckResult("ctest", "atbl", false, true)

Expand Down Expand Up @@ -246,7 +246,7 @@ func TestPrint(t *testing.T) {
var buf *bytes.Buffer
// All Pass
report.SetTableStructCheckResult("test", "tbl", true, false)
report.SetTableDataCheckResult("test", "tbl", true, 0, 0, &chunk.ChunkID{0, 0, 0, 0, 1})
report.SetTableDataCheckResult("test", "tbl", true, 0, 0, 22, 22, &chunk.ChunkID{0, 0, 0, 0, 1})
buf = new(bytes.Buffer)
report.Print(buf)
require.Equal(t, buf.String(), "A total of 0 table have been compared and all are equal.\n"+
Expand Down Expand Up @@ -320,19 +320,19 @@ func TestGetSnapshot(t *testing.T) {
report.Init(tableDiffs, configsBytes[:2], configsBytes[2])

report.SetTableStructCheckResult("test", "tbl", true, false)
report.SetTableDataCheckResult("test", "tbl", false, 100, 100, &chunk.ChunkID{0, 0, 0, 1, 10})
report.SetTableDataCheckResult("test", "tbl", true, 0, 0, &chunk.ChunkID{0, 0, 0, 3, 10})
report.SetTableDataCheckResult("test", "tbl", false, 200, 200, &chunk.ChunkID{0, 0, 0, 3, 10})
report.SetTableDataCheckResult("test", "tbl", false, 100, 100, 200, 300, &chunk.ChunkID{0, 0, 0, 1, 10})
report.SetTableDataCheckResult("test", "tbl", true, 0, 0, 300, 300, &chunk.ChunkID{0, 0, 0, 3, 10})
report.SetTableDataCheckResult("test", "tbl", false, 200, 200, 400, 500, &chunk.ChunkID{0, 0, 0, 3, 10})

report.SetTableStructCheckResult("atest", "tbl", true, false)
report.SetTableDataCheckResult("atest", "tbl", false, 100, 100, &chunk.ChunkID{0, 0, 0, 0, 10})
report.SetTableDataCheckResult("atest", "tbl", true, 0, 0, &chunk.ChunkID{0, 0, 0, 3, 10})
report.SetTableDataCheckResult("atest", "tbl", false, 200, 200, &chunk.ChunkID{0, 0, 0, 3, 10})
report.SetTableDataCheckResult("atest", "tbl", false, 100, 100, 500, 600, &chunk.ChunkID{0, 0, 0, 0, 10})
report.SetTableDataCheckResult("atest", "tbl", true, 0, 0, 600, 600, &chunk.ChunkID{0, 0, 0, 3, 10})
report.SetTableDataCheckResult("atest", "tbl", false, 200, 200, 700, 800, &chunk.ChunkID{0, 0, 0, 3, 10})

report.SetTableStructCheckResult("xtest", "tbl", true, false)
report.SetTableDataCheckResult("xtest", "tbl", false, 100, 100, &chunk.ChunkID{0, 0, 0, 0, 10})
report.SetTableDataCheckResult("xtest", "tbl", true, 0, 0, &chunk.ChunkID{0, 0, 0, 1, 10})
report.SetTableDataCheckResult("xtest", "tbl", false, 200, 200, &chunk.ChunkID{0, 0, 0, 3, 10})
report.SetTableDataCheckResult("xtest", "tbl", false, 100, 100, 800, 900, &chunk.ChunkID{0, 0, 0, 0, 10})
report.SetTableDataCheckResult("xtest", "tbl", true, 0, 0, 900, 900, &chunk.ChunkID{0, 0, 0, 1, 10})
report.SetTableDataCheckResult("xtest", "tbl", false, 200, 200, 1000, 1100, &chunk.ChunkID{0, 0, 0, 3, 10})

report_snap, err := report.GetSnapshot(&chunk.ChunkID{0, 0, 0, 1, 10}, "test", "tbl")
require.NoError(t, err)
Expand Down Expand Up @@ -442,13 +442,13 @@ func TestCommitSummary(t *testing.T) {
report.Init(tableDiffs, configsBytes[:2], configsBytes[2])

report.SetTableStructCheckResult("test", "tbl", true, false)
report.SetTableDataCheckResult("test", "tbl", true, 100, 200, &chunk.ChunkID{0, 0, 0, 1, 10})
report.SetTableDataCheckResult("test", "tbl", true, 100, 200, 400, 400, &chunk.ChunkID{0, 0, 0, 1, 10})

report.SetTableStructCheckResult("atest", "tbl", true, false)
report.SetTableDataCheckResult("atest", "tbl", false, 100, 200, &chunk.ChunkID{0, 0, 0, 2, 10})
report.SetTableDataCheckResult("atest", "tbl", false, 100, 200, 500, 600, &chunk.ChunkID{0, 0, 0, 2, 10})

report.SetTableStructCheckResult("xtest", "tbl", false, false)
report.SetTableDataCheckResult("xtest", "tbl", false, 100, 200, &chunk.ChunkID{0, 0, 0, 3, 10})
report.SetTableDataCheckResult("xtest", "tbl", false, 100, 200, 600, 700, &chunk.ChunkID{0, 0, 0, 3, 10})

err = report.CommitSummary()
require.NoError(t, err)
Expand All @@ -473,16 +473,20 @@ func TestCommitSummary(t *testing.T) {
"user = \"root\"\n\n"+
"Comparison Result\n\n\n\n"+
"The table structure and data in following tables are equivalent\n\n"+
"`test`.`tbl`\n"+
"`ytest`.`tbl`\n\n"+
"+---------------+---------+-----------+\n"+
"| TABLE | UPCOUNT | DOWNCOUNT |\n"+
"+---------------+---------+-----------+\n"+
"| `test`.`tbl` | 400 | 400 |\n"+
"| `ytest`.`tbl` | 0 | 0 |\n"+
"+---------------+---------+-----------+\n\n\n"+
"The following tables contains inconsistent data\n\n"+
"+---------------+--------------------+----------------+\n"+
"| TABLE | STRUCTURE EQUALITY | DATA DIFF ROWS |\n"+
"+---------------+--------------------+----------------+\n")
"+---------------+--------------------+----------------+---------+-----------+\n"+
"| TABLE | STRUCTURE EQUALITY | DATA DIFF ROWS | UPCOUNT | DOWNCOUNT |\n"+
"+---------------+--------------------+----------------+---------+-----------+\n")
require.Contains(t, str,
"| `atest`.`tbl` | true | +100/-200 |")
"| `atest`.`tbl` | true | +100/-200 | 500 | 600 |\n")
require.Contains(t, str,
"| `xtest`.`tbl` | false | +100/-200 |")
"| `xtest`.`tbl` | false | +100/-200 | 600 | 700 |\n")

file.Close()
err = os.Remove(filename)
Expand Down
1 change: 1 addition & 0 deletions sync_diff_inspector/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ func GetApproximateMidBySize(ctx context.Context, db *sql.DB, schema, table stri
return nil, errors.Trace(err)
}
log.Error("there is no row in result set")
return nil, nil
}
err = rows.Scan(columns...)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions sync_diff_inspector/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,14 @@ func TestGetApproximateMid(t *testing.T) {
require.NoError(t, err)
require.Equal(t, data["a"], "5")
require.Equal(t, data["b"], "10")

// no data
rows = sqlmock.NewRows([]string{"a", "b"})
mock.ExpectQuery("SELECT `a`, `b` FROM `test`\\.`test_utils` WHERE 2222.* LIMIT 1 OFFSET 10*").WithArgs("aaaa").WillReturnRows(rows)

data, err = GetApproximateMidBySize(ctx, conn, "test", "test_utils", tableInfo.Columns, "2222", []interface{}{"aaaa"}, 20)
require.NoError(t, err)
require.Nil(t, data)
}

func TestGenerateSQLs(t *testing.T) {
Expand Down

0 comments on commit d9fdfa2

Please sign in to comment.