diff --git a/sync_diff_inspector/diff.go b/sync_diff_inspector/diff.go index 1420d1ba3..90606de55 100644 --- a/sync_diff_inspector/diff.go +++ b/sync_diff_inspector/diff.go @@ -421,7 +421,7 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool schema, table := tableDiff.Schema, tableDiff.Table var state string = checkpoints.SuccessState - isEqual, count, err := df.compareChecksumAndGetCount(ctx, rangeInfo) + isEqual, upCount, downCount, err := df.compareChecksumAndGetCount(ctx, rangeInfo) if err != nil { // If an error occurs during the checksum phase, skip the data compare phase. state = checkpoints.FailedState @@ -430,9 +430,9 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool state = checkpoints.FailedState // if the chunk's checksum differ, try to do binary check info := rangeInfo - if count > splitter.SplitThreshold { - log.Debug("count greater than threshold, start do bingenerate", zap.Any("chunk id", rangeInfo.ChunkRange.Index), zap.Int64("chunk size", count)) - info, err = df.BinGenerate(ctx, df.workSource, rangeInfo, count) + if upCount > splitter.SplitThreshold { + log.Debug("count greater than threshold, start do bingenerate", zap.Any("chunk id", rangeInfo.ChunkRange.Index), zap.Int64("upstream chunk size", upCount)) + info, err = df.BinGenerate(ctx, df.workSource, rangeInfo, upCount) if err != nil { log.Error("fail to do binary search.", zap.Error(err)) df.report.SetTableMeetError(schema, table, err) @@ -450,7 +450,7 @@ func (df *Diff) consume(ctx context.Context, rangeInfo *splitter.RangeInfo) bool } dml.node.State = state id := rangeInfo.ChunkRange.Index - df.report.SetTableDataCheckResult(schema, table, isEqual, dml.rowAdd, dml.rowDelete, id) + df.report.SetTableDataCheckResult(schema, table, isEqual, dml.rowAdd, dml.rowDelete, upCount, downCount, id) return isEqual } @@ -510,10 +510,15 @@ func (df *Diff) binSearch(ctx context.Context, targetSource source.Source, table chunkLimits, args := tableRange.ChunkRange.ToString(tableDiff.Collation) limitRange := fmt.Sprintf("(%s) AND (%s)", chunkLimits, tableDiff.Range) midValues, err := utils.GetApproximateMidBySize(ctx, targetSource.GetDB(), tableDiff.Schema, tableDiff.Table, indexColumns, limitRange, args, count) - log.Debug("mid values", zap.Reflect("mid values", midValues), zap.Reflect("indices", indexColumns), zap.Reflect("bounds", tableRange.ChunkRange.Bounds)) if err != nil { return nil, errors.Trace(err) } + if midValues == nil { + // TODO Since the count is from upstream, + // the midValues may be empty when downstream has much less rows in this chunk. + return tableRange, nil + } + log.Debug("mid values", zap.Reflect("mid values", midValues), zap.Reflect("indices", indexColumns), zap.Reflect("bounds", tableRange.ChunkRange.Bounds)) log.Debug("table ranges", zap.Reflect("original range", tableRange)) for i := range indexColumns { log.Debug("update tableRange", zap.String("field", indexColumns[i].Name.O), zap.String("value", midValues[indexColumns[i].Name.O])) @@ -521,11 +526,11 @@ func (df *Diff) binSearch(ctx context.Context, targetSource source.Source, table tableRange2.Update(indexColumns[i].Name.O, midValues[indexColumns[i].Name.O], "", true, false, tableDiff.Collation, tableDiff.Range) } log.Debug("table ranges", zap.Reflect("tableRange 1", tableRange1), zap.Reflect("tableRange 2", tableRange2)) - isEqual1, count1, err = df.compareChecksumAndGetCount(ctx, tableRange1) + isEqual1, count1, _, err = df.compareChecksumAndGetCount(ctx, tableRange1) if err != nil { return nil, errors.Trace(err) } - isEqual2, count2, err = df.compareChecksumAndGetCount(ctx, tableRange2) + isEqual2, count2, _, err = df.compareChecksumAndGetCount(ctx, tableRange2) if err != nil { return nil, errors.Trace(err) } @@ -561,7 +566,7 @@ func (df *Diff) binSearch(ctx context.Context, targetSource source.Source, table } } -func (df *Diff) compareChecksumAndGetCount(ctx context.Context, tableRange *splitter.RangeInfo) (bool, int64, error) { +func (df *Diff) compareChecksumAndGetCount(ctx context.Context, tableRange *splitter.RangeInfo) (bool, int64, int64, error) { var wg sync.WaitGroup var upstreamInfo, downstreamInfo *source.ChecksumInfo wg.Add(1) @@ -574,19 +579,19 @@ func (df *Diff) compareChecksumAndGetCount(ctx context.Context, tableRange *spli if upstreamInfo.Err != nil { log.Warn("failed to compare upstream checksum") - return false, -1, errors.Trace(upstreamInfo.Err) + return false, -1, -1, errors.Trace(upstreamInfo.Err) } if downstreamInfo.Err != nil { log.Warn("failed to compare downstream checksum") - return false, -1, errors.Trace(downstreamInfo.Err) + return false, -1, -1, errors.Trace(downstreamInfo.Err) } - // TODO two counts are not necessary equal + if upstreamInfo.Count == downstreamInfo.Count && upstreamInfo.Checksum == downstreamInfo.Checksum { - return true, upstreamInfo.Count, nil + return true, upstreamInfo.Count, downstreamInfo.Count, nil } log.Debug("checksum failed", zap.Any("chunk id", tableRange.ChunkRange.Index), zap.String("table", df.workSource.GetTables()[tableRange.GetTableIndex()].Table), zap.Int64("upstream chunk size", upstreamInfo.Count), zap.Int64("downstream chunk size", downstreamInfo.Count), zap.Int64("upstream checksum", upstreamInfo.Checksum), zap.Int64("downstream checksum", downstreamInfo.Checksum)) - return false, upstreamInfo.Count, nil + return false, upstreamInfo.Count, downstreamInfo.Count, nil } func (df *Diff) compareRows(ctx context.Context, rangeInfo *splitter.RangeInfo, dml *ChunkDML) (bool, error) { diff --git a/sync_diff_inspector/report/report.go b/sync_diff_inspector/report/report.go index 793e005b1..684e581c1 100644 --- a/sync_diff_inspector/report/report.go +++ b/sync_diff_inspector/report/report.go @@ -21,6 +21,7 @@ import ( "os" "path/filepath" "sort" + "strconv" "strings" "sync" "time" @@ -62,12 +63,15 @@ type TableResult struct { DataEqual bool `json:"data-equal"` MeetError error `json:"-"` ChunkMap map[string]*ChunkResult `json:"chunk-result"` // `ChunkMap` stores the `ChunkResult` of each chunk of the table + UpCount int64 `json:"up-count"` // `UpCount` is the number of rows in the table from upstream + DownCount int64 `json:"down-count"` // `DownCount` is the number of rows in the table from downstream + } // ChunkResult save the necessarily information to provide summary information type ChunkResult struct { - RowsAdd int `json:"rows-add"` // `RowAdd` is the number of rows needed to add - RowsDelete int `json:"rows-delete"` // `RowDelete` is the number of rows needed to delete + RowsAdd int `json:"rows-add"` // `RowsAdd` is the number of rows needed to add + RowsDelete int `json:"rows-delete"` // `RowsDelete` is the number of rows needed to delete } // Report saves the check results. @@ -101,16 +105,20 @@ func (r *Report) LoadReport(reportInfo *Report) { } } -func (r *Report) getSortedTables() []string { - equalTables := make([]string, 0) +func (r *Report) getSortedTables() [][]string { + equalTables := make([][]string, 0) for schema, tableMap := range r.TableResults { for table, result := range tableMap { if result.StructEqual && result.DataEqual { - equalTables = append(equalTables, dbutil.TableName(schema, table)) + equalRow := make([]string, 0, 3) + equalRow = append(equalRow, dbutil.TableName(schema, table)) + equalRow = append(equalRow, strconv.FormatInt(result.UpCount, 10)) + equalRow = append(equalRow, strconv.FormatInt(result.DownCount, 10)) + equalTables = append(equalTables, equalRow) } } } - sort.Slice(equalTables, func(i, j int) bool { return equalTables[i] < equalTables[j] }) + sort.Slice(equalTables, func(i, j int) bool { return equalTables[i][0] < equalTables[j][0] }) return equalTables } @@ -128,12 +136,12 @@ func (r *Report) getDiffRows() [][]string { } else { diffRow = append(diffRow, "true") } - rowAdd, rowDelete := 0, 0 + rowsAdd, rowsDelete := 0, 0 for _, chunkResult := range result.ChunkMap { - rowAdd += chunkResult.RowsAdd - rowDelete += chunkResult.RowsDelete + rowsAdd += chunkResult.RowsAdd + rowsDelete += chunkResult.RowsDelete } - diffRow = append(diffRow, fmt.Sprintf("+%d/-%d", rowAdd, rowDelete)) + diffRow = append(diffRow, fmt.Sprintf("+%d/-%d", rowsAdd, rowsDelete), strconv.FormatInt(result.UpCount, 10), strconv.FormatInt(result.DownCount, 10)) diffRows = append(diffRows, diffRow) } } @@ -189,14 +197,22 @@ func (r *Report) CommitSummary() error { summaryFile.WriteString("Comparison Result\n\n\n\n") summaryFile.WriteString("The table structure and data in following tables are equivalent\n\n") equalTables := r.getSortedTables() - for _, table := range equalTables { - summaryFile.WriteString(table + "\n") + if len(equalTables) > 0 { + tableString := &strings.Builder{} + table := tablewriter.NewWriter(tableString) + table.SetHeader([]string{"Table", "UpCount", "DownCount"}) + for _, v := range equalTables { + table.Append(v) + } + table.Render() + summaryFile.WriteString(tableString.String()) + summaryFile.WriteString("\n\n") } if r.Result == Fail { - summaryFile.WriteString("\nThe following tables contains inconsistent data\n\n") + summaryFile.WriteString("The following tables contains inconsistent data\n\n") tableString := &strings.Builder{} table := tablewriter.NewWriter(tableString) - table.SetHeader([]string{"Table", "Structure equality", "Data diff rows"}) + table.SetHeader([]string{"Table", "Structure equality", "Data diff rows", "UpCount", "DownCount"}) diffRows := r.getDiffRows() for _, v := range diffRows { table.Append(v) @@ -205,7 +221,7 @@ func (r *Report) CommitSummary() error { summaryFile.WriteString(tableString.String()) } duration := r.Duration + time.Since(r.StartTime) - summaryFile.WriteString(fmt.Sprintf("Time Cost: %s\n", duration)) + summaryFile.WriteString(fmt.Sprintf("\nTime Cost: %s\n", duration)) summaryFile.WriteString(fmt.Sprintf("Average Speed: %fMB/s\n", float64(r.TotalSize)/(1024.0*1024.0*duration.Seconds()))) return nil } @@ -291,11 +307,13 @@ func (r *Report) SetTableStructCheckResult(schema, table string, equal bool, ski } // SetTableDataCheckResult sets the data check result for table. -func (r *Report) SetTableDataCheckResult(schema, table string, equal bool, rowsAdd, rowsDelete int, id *chunk.ChunkID) { +func (r *Report) SetTableDataCheckResult(schema, table string, equal bool, rowsAdd, rowsDelete int, upCount, downCount int64, id *chunk.ChunkID) { r.Lock() defer r.Unlock() + result := r.TableResults[schema][table] + result.UpCount += upCount + result.DownCount += downCount if !equal { - result := r.TableResults[schema][table] result.DataEqual = equal if _, ok := result.ChunkMap[id.ToString()]; !ok { result.ChunkMap[id.ToString()] = &ChunkResult{ diff --git a/sync_diff_inspector/report/report_test.go b/sync_diff_inspector/report/report_test.go index 85ebaabde..9bfc324ab 100644 --- a/sync_diff_inspector/report/report_test.go +++ b/sync_diff_inspector/report/report_test.go @@ -105,7 +105,7 @@ func TestReport(t *testing.T) { // Test Table Report report.SetTableStructCheckResult("test", "tbl", true, false) - report.SetTableDataCheckResult("test", "tbl", true, 100, 200, &chunk.ChunkID{1, 1, 1, 1, 2}) + report.SetTableDataCheckResult("test", "tbl", true, 100, 200, 222, 222, &chunk.ChunkID{1, 1, 1, 1, 2}) report.SetTableMeetError("test", "tbl", errors.New("eeee")) new_report := NewReport(task) @@ -118,17 +118,17 @@ func TestReport(t *testing.T) { require.True(t, result.DataEqual) require.True(t, result.StructEqual) - require.Equal(t, new_report.getSortedTables(), []string{"`atest`.`atbl`", "`ctest`.`atbl`", "`test`.`tbl`"}) + require.Equal(t, new_report.getSortedTables(), [][]string{{"`atest`.`atbl`", "0", "0"}, {"`ctest`.`atbl`", "0", "0"}, {"`test`.`tbl`", "222", "222"}}) require.Equal(t, new_report.getDiffRows(), [][]string{}) new_report.SetTableStructCheckResult("atest", "atbl", true, false) - new_report.SetTableDataCheckResult("atest", "atbl", false, 111, 222, &chunk.ChunkID{1, 1, 1, 1, 2}) - require.Equal(t, new_report.getSortedTables(), []string{"`ctest`.`atbl`", "`test`.`tbl`"}) - require.Equal(t, new_report.getDiffRows(), [][]string{{"`atest`.`atbl`", "true", "+111/-222"}}) + new_report.SetTableDataCheckResult("atest", "atbl", false, 111, 222, 333, 333, &chunk.ChunkID{1, 1, 1, 1, 2}) + require.Equal(t, new_report.getSortedTables(), [][]string{{"`ctest`.`atbl`", "0", "0"}, {"`test`.`tbl`", "222", "222"}}) + require.Equal(t, new_report.getDiffRows(), [][]string{{"`atest`.`atbl`", "true", "+111/-222", "333", "333"}}) new_report.SetTableStructCheckResult("atest", "atbl", false, false) - require.Equal(t, new_report.getSortedTables(), []string{"`ctest`.`atbl`", "`test`.`tbl`"}) - require.Equal(t, new_report.getDiffRows(), [][]string{{"`atest`.`atbl`", "false", "+111/-222"}}) + require.Equal(t, new_report.getSortedTables(), [][]string{{"`ctest`.`atbl`", "0", "0"}, {"`test`.`tbl`", "222", "222"}}) + require.Equal(t, new_report.getDiffRows(), [][]string{{"`atest`.`atbl`", "false", "+111/-222", "333", "333"}}) new_report.SetTableStructCheckResult("ctest", "atbl", false, true) @@ -246,7 +246,7 @@ func TestPrint(t *testing.T) { var buf *bytes.Buffer // All Pass report.SetTableStructCheckResult("test", "tbl", true, false) - report.SetTableDataCheckResult("test", "tbl", true, 0, 0, &chunk.ChunkID{0, 0, 0, 0, 1}) + report.SetTableDataCheckResult("test", "tbl", true, 0, 0, 22, 22, &chunk.ChunkID{0, 0, 0, 0, 1}) buf = new(bytes.Buffer) report.Print(buf) require.Equal(t, buf.String(), "A total of 0 table have been compared and all are equal.\n"+ @@ -320,19 +320,19 @@ func TestGetSnapshot(t *testing.T) { report.Init(tableDiffs, configsBytes[:2], configsBytes[2]) report.SetTableStructCheckResult("test", "tbl", true, false) - report.SetTableDataCheckResult("test", "tbl", false, 100, 100, &chunk.ChunkID{0, 0, 0, 1, 10}) - report.SetTableDataCheckResult("test", "tbl", true, 0, 0, &chunk.ChunkID{0, 0, 0, 3, 10}) - report.SetTableDataCheckResult("test", "tbl", false, 200, 200, &chunk.ChunkID{0, 0, 0, 3, 10}) + report.SetTableDataCheckResult("test", "tbl", false, 100, 100, 200, 300, &chunk.ChunkID{0, 0, 0, 1, 10}) + report.SetTableDataCheckResult("test", "tbl", true, 0, 0, 300, 300, &chunk.ChunkID{0, 0, 0, 3, 10}) + report.SetTableDataCheckResult("test", "tbl", false, 200, 200, 400, 500, &chunk.ChunkID{0, 0, 0, 3, 10}) report.SetTableStructCheckResult("atest", "tbl", true, false) - report.SetTableDataCheckResult("atest", "tbl", false, 100, 100, &chunk.ChunkID{0, 0, 0, 0, 10}) - report.SetTableDataCheckResult("atest", "tbl", true, 0, 0, &chunk.ChunkID{0, 0, 0, 3, 10}) - report.SetTableDataCheckResult("atest", "tbl", false, 200, 200, &chunk.ChunkID{0, 0, 0, 3, 10}) + report.SetTableDataCheckResult("atest", "tbl", false, 100, 100, 500, 600, &chunk.ChunkID{0, 0, 0, 0, 10}) + report.SetTableDataCheckResult("atest", "tbl", true, 0, 0, 600, 600, &chunk.ChunkID{0, 0, 0, 3, 10}) + report.SetTableDataCheckResult("atest", "tbl", false, 200, 200, 700, 800, &chunk.ChunkID{0, 0, 0, 3, 10}) report.SetTableStructCheckResult("xtest", "tbl", true, false) - report.SetTableDataCheckResult("xtest", "tbl", false, 100, 100, &chunk.ChunkID{0, 0, 0, 0, 10}) - report.SetTableDataCheckResult("xtest", "tbl", true, 0, 0, &chunk.ChunkID{0, 0, 0, 1, 10}) - report.SetTableDataCheckResult("xtest", "tbl", false, 200, 200, &chunk.ChunkID{0, 0, 0, 3, 10}) + report.SetTableDataCheckResult("xtest", "tbl", false, 100, 100, 800, 900, &chunk.ChunkID{0, 0, 0, 0, 10}) + report.SetTableDataCheckResult("xtest", "tbl", true, 0, 0, 900, 900, &chunk.ChunkID{0, 0, 0, 1, 10}) + report.SetTableDataCheckResult("xtest", "tbl", false, 200, 200, 1000, 1100, &chunk.ChunkID{0, 0, 0, 3, 10}) report_snap, err := report.GetSnapshot(&chunk.ChunkID{0, 0, 0, 1, 10}, "test", "tbl") require.NoError(t, err) @@ -442,13 +442,13 @@ func TestCommitSummary(t *testing.T) { report.Init(tableDiffs, configsBytes[:2], configsBytes[2]) report.SetTableStructCheckResult("test", "tbl", true, false) - report.SetTableDataCheckResult("test", "tbl", true, 100, 200, &chunk.ChunkID{0, 0, 0, 1, 10}) + report.SetTableDataCheckResult("test", "tbl", true, 100, 200, 400, 400, &chunk.ChunkID{0, 0, 0, 1, 10}) report.SetTableStructCheckResult("atest", "tbl", true, false) - report.SetTableDataCheckResult("atest", "tbl", false, 100, 200, &chunk.ChunkID{0, 0, 0, 2, 10}) + report.SetTableDataCheckResult("atest", "tbl", false, 100, 200, 500, 600, &chunk.ChunkID{0, 0, 0, 2, 10}) report.SetTableStructCheckResult("xtest", "tbl", false, false) - report.SetTableDataCheckResult("xtest", "tbl", false, 100, 200, &chunk.ChunkID{0, 0, 0, 3, 10}) + report.SetTableDataCheckResult("xtest", "tbl", false, 100, 200, 600, 700, &chunk.ChunkID{0, 0, 0, 3, 10}) err = report.CommitSummary() require.NoError(t, err) @@ -473,16 +473,20 @@ func TestCommitSummary(t *testing.T) { "user = \"root\"\n\n"+ "Comparison Result\n\n\n\n"+ "The table structure and data in following tables are equivalent\n\n"+ - "`test`.`tbl`\n"+ - "`ytest`.`tbl`\n\n"+ + "+---------------+---------+-----------+\n"+ + "| TABLE | UPCOUNT | DOWNCOUNT |\n"+ + "+---------------+---------+-----------+\n"+ + "| `test`.`tbl` | 400 | 400 |\n"+ + "| `ytest`.`tbl` | 0 | 0 |\n"+ + "+---------------+---------+-----------+\n\n\n"+ "The following tables contains inconsistent data\n\n"+ - "+---------------+--------------------+----------------+\n"+ - "| TABLE | STRUCTURE EQUALITY | DATA DIFF ROWS |\n"+ - "+---------------+--------------------+----------------+\n") + "+---------------+--------------------+----------------+---------+-----------+\n"+ + "| TABLE | STRUCTURE EQUALITY | DATA DIFF ROWS | UPCOUNT | DOWNCOUNT |\n"+ + "+---------------+--------------------+----------------+---------+-----------+\n") require.Contains(t, str, - "| `atest`.`tbl` | true | +100/-200 |") + "| `atest`.`tbl` | true | +100/-200 | 500 | 600 |\n") require.Contains(t, str, - "| `xtest`.`tbl` | false | +100/-200 |") + "| `xtest`.`tbl` | false | +100/-200 | 600 | 700 |\n") file.Close() err = os.Remove(filename) diff --git a/sync_diff_inspector/utils/utils.go b/sync_diff_inspector/utils/utils.go index 30170c086..269c5576c 100644 --- a/sync_diff_inspector/utils/utils.go +++ b/sync_diff_inspector/utils/utils.go @@ -679,6 +679,7 @@ func GetApproximateMidBySize(ctx context.Context, db *sql.DB, schema, table stri return nil, errors.Trace(err) } log.Error("there is no row in result set") + return nil, nil } err = rows.Scan(columns...) if err != nil { diff --git a/sync_diff_inspector/utils/utils_test.go b/sync_diff_inspector/utils/utils_test.go index c20fbfee6..583e32a99 100644 --- a/sync_diff_inspector/utils/utils_test.go +++ b/sync_diff_inspector/utils/utils_test.go @@ -268,6 +268,14 @@ func TestGetApproximateMid(t *testing.T) { require.NoError(t, err) require.Equal(t, data["a"], "5") require.Equal(t, data["b"], "10") + + // no data + rows = sqlmock.NewRows([]string{"a", "b"}) + mock.ExpectQuery("SELECT `a`, `b` FROM `test`\\.`test_utils` WHERE 2222.* LIMIT 1 OFFSET 10*").WithArgs("aaaa").WillReturnRows(rows) + + data, err = GetApproximateMidBySize(ctx, conn, "test", "test_utils", tableInfo.Columns, "2222", []interface{}{"aaaa"}, 20) + require.NoError(t, err) + require.Nil(t, data) } func TestGenerateSQLs(t *testing.T) {