Skip to content

Commit

Permalink
sync-diff-inspector: Clear some codes and optimize GetRandomValues (#547
Browse files Browse the repository at this point in the history
)

ref #670
  • Loading branch information
Leavrth authored Aug 12, 2022
1 parent f036de0 commit d3dcedd
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 101 deletions.
17 changes: 9 additions & 8 deletions sync_diff_inspector/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,6 @@ func (t *TaskConfig) Init(
return errors.Trace(err)
}

// Set default value when output is empty
if t.OutputDir == "" {
t.OutputDir = timestampOutputDir()
if err := os.RemoveAll(t.OutputDir); err != nil && !os.IsNotExist(err) {
log.Fatal("fail to remove the temp directory", zap.String("path", t.OutputDir), zap.String("error", err.Error()))
}
}

ok, err = pathExists(t.OutputDir)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -376,7 +368,16 @@ func (c *Config) Parse(arguments []string) error {
return errors.Errorf("'%s' is an invalid flag", c.FlagSet.Arg(0))
}

// Set default value when output is empty
if c.Task.OutputDir == "" {
c.Task.OutputDir = timestampOutputDir()
if err := os.RemoveAll(c.Task.OutputDir); err != nil && !os.IsNotExist(err) {
log.Fatal("fail to remove the temp directory", zap.String("path", c.Task.OutputDir), zap.String("error", err.Error()))
}
}

c.SplitThreadCount = baseSplitThreadCount + c.CheckThreadCount/2

return nil
}

Expand Down
2 changes: 0 additions & 2 deletions sync_diff_inspector/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,9 @@ type Diff struct {
// workSource is one of upstream/downstream by some policy in #pickSource.
workSource source.Source

sample int
checkThreadCount int
splitThreadCount int
exportFixSQL bool
useCheckpoint bool
ignoreDataCheck bool
sqlWg sync.WaitGroup
checkpointWg sync.WaitGroup
Expand Down
2 changes: 1 addition & 1 deletion sync_diff_inspector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func main() {
}

if cfg.PrintVersion {
fmt.Printf(utils.GetRawInfo("sync_diff_inspector v2.0"))
fmt.Print(utils.GetRawInfo("sync_diff_inspector v2.0"))
return
}

Expand Down
5 changes: 0 additions & 5 deletions sync_diff_inspector/source/chunks_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type ChunksIterator struct {
nextTableIndex int
chunksCh chan *splitter.RangeInfo
errCh chan error
limit int
splitThreadCount int

cancel context.CancelFunc
Expand Down Expand Up @@ -181,10 +180,6 @@ func (t *ChunksIterator) Close() {
t.cancel()
}

func (t *ChunksIterator) getCurTableIndex() int {
return t.nextTableIndex - 1
}

// TODO: getCurTableIndexID only used for binary search, should be optimized later.
func getCurTableIndexID(tableIter splitter.ChunkIterator) int64 {
if bt, ok := tableIter.(*splitter.BucketIterator); ok {
Expand Down
24 changes: 1 addition & 23 deletions sync_diff_inspector/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,29 +62,6 @@ type MockChunkIterator struct {
const CHUNKS = 5
const BUCKETS = 1

func equal(a, b *chunk.ChunkID) bool {
return a.TableIndex == b.TableIndex && a.BucketIndexLeft == b.BucketIndexLeft && a.ChunkIndex == b.ChunkIndex
}

func next(a *chunk.ChunkID) {
if a.ChunkIndex == a.ChunkCnt-1 {
a.TableIndex++
a.ChunkIndex = 0
} else {
a.ChunkIndex = a.ChunkIndex + 1
}
}

func newIndex() *chunk.ChunkID {
return &chunk.ChunkID{
TableIndex: 0,
BucketIndexLeft: 0,
BucketIndexRight: 0,
ChunkIndex: 0,
ChunkCnt: CHUNKS,
}
}

func (m *MockChunkIterator) Next() (*chunk.Range, error) {
if m.index.ChunkIndex == m.index.ChunkCnt-1 {
return nil, nil
Expand Down Expand Up @@ -652,6 +629,7 @@ func TestSource(t *testing.T) {
defer cancel()

router, err := router.NewTableRouter(false, nil)
require.NoError(t, err)
cfg := &config.Config{
LogLevel: "debug",
CheckThreadCount: 4,
Expand Down
2 changes: 1 addition & 1 deletion sync_diff_inspector/splitter/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (s *BucketIterator) splitChunkForBucket(ctx context.Context, firstBucketID,
s.wg.Add(1)
s.chunkPool.Apply(func() {
defer s.wg.Done()
chunks, err := splitRangeByRandom(s.dbConn, chunkRange, splitChunkCnt, s.table.Schema, s.table.Table, s.indexColumns, s.table.Range, s.table.Collation)
chunks, err := splitRangeByRandom(ctx, s.dbConn, chunkRange, splitChunkCnt, s.table.Schema, s.table.Table, s.indexColumns, s.table.Range, s.table.Collation)
if err != nil {
select {
case <-ctx.Done():
Expand Down
2 changes: 2 additions & 0 deletions sync_diff_inspector/splitter/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,5 +253,7 @@ func generateLimitQueryTemplate(indexColumns []*model.ColumnInfo, table *common.
}
columns := strings.Join(fields, ", ")

// TODO: the limit splitter has not been used yet.
// once it is used, need to add `collation` after `ORDER BY`.
return fmt.Sprintf("SELECT %s FROM %s WHERE %%s ORDER BY %s LIMIT %d,1", columns, dbutil.TableName(table.Schema, table.Table), columns, chunkSize)
}
35 changes: 13 additions & 22 deletions sync_diff_inspector/splitter/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func NewRandomIteratorWithCheckpoint(ctx context.Context, progressID string, tab
bucketChunkCnt = chunkCnt
}

chunks, err := splitRangeByRandom(dbConn, chunkRange, chunkCnt, table.Schema, table.Table, fields, table.Range, table.Collation)
chunks, err := splitRangeByRandom(ctx, dbConn, chunkRange, chunkCnt, table.Schema, table.Table, fields, table.Range, table.Collation)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -199,12 +199,7 @@ func GetSplitFields(table *model.TableInfo, splitFields []string) ([]*model.Colu

// splitRangeByRandom splits a chunk to multiple chunks by random
// Notice: If the `count <= 1`, it will skip splitting and return `chunk` as a slice directly.
// TODO: This function will get random row for each cols individually.
// For example, for a table' schema which is `create table tbl(a int, b int, primary key(a, b));`,
// there are 3 rows(`[a: 2, b: 2]`, `[a: 3, b: 5]`, `[a: 4, b: 4]`) in the table.
// and finally this function might generate `[a:2,b:2]` and `[a:3,b:4]` (from `a` get random value 2,4, `b` get random value 2,4) as split points, which means
// chunk whose range is (`a:2,b:2`, `a:3,b:4`], so we get a empty chunk.
func splitRangeByRandom(db *sql.DB, chunk *chunk.Range, count int, schema string, table string, columns []*model.ColumnInfo, limits, collation string) (chunks []*chunk.Range, err error) {
func splitRangeByRandom(ctx context.Context, db *sql.DB, chunk *chunk.Range, count int, schema string, table string, columns []*model.ColumnInfo, limits, collation string) (chunks []*chunk.Range, err error) {
if count <= 1 {
chunks = append(chunks, chunk)
return chunks, nil
Expand All @@ -213,29 +208,25 @@ func splitRangeByRandom(db *sql.DB, chunk *chunk.Range, count int, schema string
chunkLimits, args := chunk.ToString(collation)
limitRange := fmt.Sprintf("(%s) AND (%s)", chunkLimits, limits)

randomValues := make([][]string, len(columns))
for i, column := range columns {
randomValues[i], err = dbutil.GetRandomValues(context.Background(), db, schema, table, column.Name.O, count-1, limitRange, args, collation)
if err != nil {
return nil, errors.Trace(err)
}

log.Debug("get split values by random", zap.Stringer("chunk", chunk), zap.String("column", column.Name.O), zap.Int("random values num", len(randomValues[i])))
randomValues, err := utils.GetRandomValues(ctx, db, schema, table, columns, count-1, limitRange, args, collation)
if err != nil {
return nil, errors.Trace(err)
}

for i := 0; i <= utils.MinLenInSlices(randomValues); i++ {
log.Debug("get split values by random", zap.Stringer("chunk", chunk), zap.Int("random values num", len(randomValues)))
for i := 0; i <= len(randomValues); i++ {
newChunk := chunk.Copy()

for j, column := range columns {
if i == 0 {
if len(randomValues[j]) == 0 {
if len(randomValues) == 0 {
// randomValues is empty, so chunks will append chunk itself.
break
}
newChunk.Update(column.Name.O, "", randomValues[j][i], false, true)
} else if i == len(randomValues[j]) {
newChunk.Update(column.Name.O, randomValues[j][i-1], "", true, false)
newChunk.Update(column.Name.O, "", randomValues[i][j], false, true)
} else if i == len(randomValues) {
newChunk.Update(column.Name.O, randomValues[i-1][j], "", true, false)
} else {
newChunk.Update(column.Name.O, randomValues[j][i-1], randomValues[j][i], true, true)
newChunk.Update(column.Name.O, randomValues[i-1][j], randomValues[i][j], true, true)
}
}
chunks = append(chunks, newChunk)
Expand Down
Loading

0 comments on commit d3dcedd

Please sign in to comment.