Skip to content

Commit

Permalink
sync_diff_inspector: add config item: chunk-thread-count (#630)
Browse files Browse the repository at this point in the history
close #629
  • Loading branch information
Leavrth authored Aug 12, 2022
1 parent cd52c1e commit f036de0
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 81 deletions.
27 changes: 9 additions & 18 deletions sync_diff_inspector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,15 @@ sync-diff-inspector is a tool for comparing two database's data.
## How to use

```shell
Usage of sync_diff_inspector:
-L string
log level: debug, info, warn, error, fatal (default "info")
-V print version of sync_diff_inspector
-check-thread-count int
how many goroutines are created to check data (default 1)
-chunk-size int
diff check chunk size (default 1000)
-config string
Config file
-fix-sql-file string
the name of the file which saves sqls used to fix different data (default "fix.sql")
-sample int
the percent of sampling check (default 100)
-source-snapshot string
source database's snapshot config
-target-snapshot string
target database's snapshot config
Usage of diff:
-V, --version print version of sync_diff_inspector
-L, --log-level string log level: debug, info, warn, error, fatal (default "info")
-C, --config string Config file
-T, --template string <dm|norm> export a template config file in the current directory
--dm-addr string the address of DM
--dm-task string identifier of dm task
--check-thread-count int how many goroutines are created to check data (default 4)
--export-fix-sql set true if want to compare rows or set to false will only compare checksum (default true)
```
Expand Down
7 changes: 6 additions & 1 deletion sync_diff_inspector/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
LocalFilePerm os.FileMode = 0o644

LogFileName = "sync_diff.log"

baseSplitThreadCount = 3
)

// TableConfig is the config of table.
Expand Down Expand Up @@ -290,6 +292,8 @@ type Config struct {
LogLevel string `toml:"-" json:"-"`
// how many goroutines are created to check data
CheckThreadCount int `toml:"check-thread-count" json:"check-thread-count"`
// how many goroutines are created to split chunk. A goroutine splits one table at a time.
SplitThreadCount int `toml:"-" json:"split-thread-count"`
// set true if want to compare rows
// set false won't compare rows.
ExportFixSQL bool `toml:"export-fix-sql" json:"export-fix-sql"`
Expand Down Expand Up @@ -329,7 +333,7 @@ func NewConfig() *Config {
fs.StringVarP(&cfg.Template, "template", "T", "", "<dm|norm> export a template config file in the current directory")
fs.StringVar(&cfg.DMAddr, "dm-addr", "", "the address of DM")
fs.StringVar(&cfg.DMTask, "dm-task", "", "identifier of dm task")
fs.IntVar(&cfg.CheckThreadCount, "check-thread-count", 1, "how many goroutines are created to check data")
fs.IntVar(&cfg.CheckThreadCount, "check-thread-count", 4, "how many goroutines are created to check data")
fs.BoolVar(&cfg.ExportFixSQL, "export-fix-sql", true, "set true if want to compare rows or set to false will only compare checksum")
fs.BoolVar(&cfg.CheckStructOnly, "check-struct-only", false, "ignore check table's data")

Expand Down Expand Up @@ -372,6 +376,7 @@ func (c *Config) Parse(arguments []string) error {
return errors.Errorf("'%s' is an invalid flag", c.FlagSet.Arg(0))
}

c.SplitThreadCount = baseSplitThreadCount + c.CheckThreadCount/2
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion sync_diff_inspector/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestParseConfig(t *testing.T) {
require.True(t, cfg.CheckConfig())

// we might not use the same config to run this test. e.g. MYSQL_PORT can be 4000
require.Equal(t, cfg.String(), "{\"check-thread-count\":4,\"export-fix-sql\":true,\"check-struct-only\":false,\"dm-addr\":\"\",\"dm-task\":\"\",\"data-sources\":{\"mysql1\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"\",\"sql-mode\":\"\",\"snapshot\":\"\",\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql2\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"\",\"sql-mode\":\"\",\"snapshot\":\"\",\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql3\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"\",\"sql-mode\":\"\",\"snapshot\":\"\",\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"tidb0\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"\",\"sql-mode\":\"\",\"snapshot\":\"\",\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null}},\"routes\":{\"rule1\":{\"schema-pattern\":\"test_*\",\"table-pattern\":\"t_*\",\"target-schema\":\"test\",\"target-table\":\"t\"},\"rule2\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test2\",\"target-table\":\"t2\"},\"rule3\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test\",\"target-table\":\"t\"}},\"table-configs\":{\"config1\":{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}},\"task\":{\"source-instances\":[\"mysql1\",\"mysql2\",\"mysql3\"],\"source-routes\":null,\"target-instance\":\"tidb0\",\"target-check-tables\":[\"schema*.table*\",\"!c.*\",\"test2.t2\"],\"target-configs\":[\"config1\"],\"output-dir\":\"/tmp/output/config\",\"SourceInstances\":[{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"\",\"sql-mode\":\"\",\"snapshot\":\"\",\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"\",\"sql-mode\":\"\",\"snapshot\":\"\",\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"\",\"sql-mode\":\"\",\"snapshot\":\"\",\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null}],\"TargetInstance\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"\",\"sql-mode\":\"\",\"snapshot\":\"\",\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null},\"TargetTableConfigs\":[{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}],\"TargetCheckTables\":[{},{},{}],\"FixDir\":\"/tmp/output/config/fix-on-tidb0\",\"CheckpointDir\":\"/tmp/output/config/checkpoint\",\"HashFile\":\"\"},\"ConfigFile\":\"config_sharding.toml\",\"PrintVersion\":false}")
require.Equal(t, cfg.String(), "{\"check-thread-count\":4,\"split-thread-count\":5,\"export-fix-sql\":true,\"check-struct-only\":false,\"dm-addr\":\"\",\"dm-task\":\"\",\"data-sources\":{\"mysql1\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"\",\"sql-mode\":\"\",\"snapshot\":\"\",\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql2\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"\",\"sql-mode\":\"\",\"snapshot\":\"\",\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql3\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"\",\"sql-mode\":\"\",\"snapshot\":\"\",\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"tidb0\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"\",\"sql-mode\":\"\",\"snapshot\":\"\",\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null}},\"routes\":{\"rule1\":{\"schema-pattern\":\"test_*\",\"table-pattern\":\"t_*\",\"target-schema\":\"test\",\"target-table\":\"t\"},\"rule2\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test2\",\"target-table\":\"t2\"},\"rule3\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test\",\"target-table\":\"t\"}},\"table-configs\":{\"config1\":{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}},\"task\":{\"source-instances\":[\"mysql1\",\"mysql2\",\"mysql3\"],\"source-routes\":null,\"target-instance\":\"tidb0\",\"target-check-tables\":[\"schema*.table*\",\"!c.*\",\"test2.t2\"],\"target-configs\":[\"config1\"],\"output-dir\":\"/tmp/output/config\",\"SourceInstances\":[{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"\",\"sql-mode\":\"\",\"snapshot\":\"\",\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"\",\"sql-mode\":\"\",\"snapshot\":\"\",\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"\",\"sql-mode\":\"\",\"snapshot\":\"\",\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null}],\"TargetInstance\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"\",\"sql-mode\":\"\",\"snapshot\":\"\",\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null},\"TargetTableConfigs\":[{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}],\"TargetCheckTables\":[{},{},{}],\"FixDir\":\"/tmp/output/config/fix-on-tidb0\",\"CheckpointDir\":\"/tmp/output/config/checkpoint\",\"HashFile\":\"\"},\"ConfigFile\":\"config_sharding.toml\",\"PrintVersion\":false}")
hash, err := cfg.Task.ComputeConfigHash()
require.NoError(t, err)
require.Equal(t, hash, "e03a88f9270c3906739d3f51b54d5011d7f04d55f8e14f4a3add59c93b3e877f")
Expand Down
4 changes: 3 additions & 1 deletion sync_diff_inspector/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Diff struct {

sample int
checkThreadCount int
splitThreadCount int
exportFixSQL bool
useCheckpoint bool
ignoreDataCheck bool
Expand All @@ -88,6 +89,7 @@ type Diff struct {
func NewDiff(ctx context.Context, cfg *config.Config) (diff *Diff, err error) {
diff = &Diff{
checkThreadCount: cfg.CheckThreadCount,
splitThreadCount: cfg.SplitThreadCount,
exportFixSQL: cfg.ExportFixSQL,
ignoreDataCheck: cfg.CheckStructOnly,
sqlCh: make(chan *ChunkDML, splitter.DefaultChannelBuffer),
Expand Down Expand Up @@ -366,7 +368,7 @@ func (df *Diff) pickSource(ctx context.Context) source.Source {
}

func (df *Diff) generateChunksIterator(ctx context.Context) (source.RangeIterator, error) {
return df.workSource.GetRangeIterator(ctx, df.startRange, df.workSource.GetTableAnalyzer())
return df.workSource.GetRangeIterator(ctx, df.startRange, df.workSource.GetTableAnalyzer(), df.splitThreadCount)
}

func (df *Diff) handleCheckpoints(ctx context.Context, stopCh chan struct{}) {
Expand Down
28 changes: 16 additions & 12 deletions sync_diff_inspector/source/chunks_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,35 @@ type ChunksIterator struct {
ID *chunk.ChunkID
tableAnalyzer TableAnalyzer

TableDiffs []*common.TableDiff
nextTableIndex int
chunksCh chan *splitter.RangeInfo
errCh chan error
limit int
TableDiffs []*common.TableDiff
nextTableIndex int
chunksCh chan *splitter.RangeInfo
errCh chan error
limit int
splitThreadCount int

cancel context.CancelFunc
}

func NewChunksIterator(ctx context.Context, analyzer TableAnalyzer, tableDiffs []*common.TableDiff, startRange *splitter.RangeInfo) (*ChunksIterator, error) {
func NewChunksIterator(ctx context.Context, analyzer TableAnalyzer, tableDiffs []*common.TableDiff, startRange *splitter.RangeInfo, splitThreadCount int) (*ChunksIterator, error) {
ctxx, cancel := context.WithCancel(ctx)
iter := &ChunksIterator{
tableAnalyzer: analyzer,
TableDiffs: tableDiffs,
chunksCh: make(chan *splitter.RangeInfo, 64),
errCh: make(chan error, len(tableDiffs)),
cancel: cancel,
splitThreadCount: splitThreadCount,
tableAnalyzer: analyzer,
TableDiffs: tableDiffs,

// reserve 30 capacity for each goroutine on average
chunksCh: make(chan *splitter.RangeInfo, 30*splitThreadCount),
errCh: make(chan error, len(tableDiffs)),
cancel: cancel,
}
go iter.produceChunks(ctxx, startRange)
return iter, nil
}

func (t *ChunksIterator) produceChunks(ctx context.Context, startRange *splitter.RangeInfo) {
defer close(t.chunksCh)
pool := utils.NewWorkerPool(3, "chunks producer")
pool := utils.NewWorkerPool(uint(t.splitThreadCount), "chunks producer")
t.nextTableIndex = 0

// If chunkRange
Expand Down
7 changes: 4 additions & 3 deletions sync_diff_inspector/source/mysql_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
"context"
"database/sql"
"fmt"
tableFilter "github.com/pingcap/tidb-tools/pkg/table-filter"
"time"

tableFilter "github.com/pingcap/tidb-tools/pkg/table-filter"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-tools/pkg/dbutil"
Expand Down Expand Up @@ -80,8 +81,8 @@ func (s *MySQLSources) GetTableAnalyzer() TableAnalyzer {
}
}

func (s *MySQLSources) GetRangeIterator(ctx context.Context, r *splitter.RangeInfo, analyzer TableAnalyzer) (RangeIterator, error) {
return NewChunksIterator(ctx, analyzer, s.tableDiffs, r)
func (s *MySQLSources) GetRangeIterator(ctx context.Context, r *splitter.RangeInfo, analyzer TableAnalyzer, splitThreadCount int) (RangeIterator, error) {
return NewChunksIterator(ctx, analyzer, s.tableDiffs, r, splitThreadCount)
}

func (s *MySQLSources) Close() {
Expand Down
29 changes: 19 additions & 10 deletions sync_diff_inspector/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type Source interface {
// this is the mainly iterator across the whole sync diff.
// One source has one range iterator to produce the range to channel.
// there are many workers consume the range from the channel to compare.
GetRangeIterator(context.Context, *splitter.RangeInfo, TableAnalyzer) (RangeIterator, error)
GetRangeIterator(context.Context, *splitter.RangeInfo, TableAnalyzer, int) (RangeIterator, error)

// GetCountAndCrc32 gets the crc32 result and the count from given range.
GetCountAndCrc32(context.Context, *splitter.RangeInfo) *ChecksumInfo
Expand Down Expand Up @@ -197,18 +197,26 @@ func NewSources(ctx context.Context, cfg *config.Config) (downstream Source, ups
tj := utils.UniqueID(tableDiffs[j].Schema, tableDiffs[j].Table)
return strings.Compare(ti, tj) > 0
})
upstream, err = buildSourceFromCfg(ctx, tableDiffs, cfg.CheckThreadCount, cfg.Task.TargetCheckTables, cfg.Task.SourceInstances...)

// If `bucket size` is much larger than `chunk size`,
// we need to split the bucket into some chunks, which wastes much time.
// So we use WorkPool to split buckets in parallel.
// Besides, bucketSpliters of each table use shared WorkPool
bucketSpliterPool := utils.NewWorkerPool(uint(cfg.CheckThreadCount), "bucketIter")
// for mysql_shard, it needs `cfg.CheckThreadCount` + `cfg.SplitThreadCount` at most, because it cannot use bucket.
mysqlConnCount := cfg.CheckThreadCount + cfg.SplitThreadCount
upstream, err = buildSourceFromCfg(ctx, tableDiffs, mysqlConnCount, bucketSpliterPool, cfg.Task.TargetCheckTables, cfg.Task.SourceInstances...)
if err != nil {
return nil, nil, errors.Annotate(err, "from upstream")
}
downstream, err = buildSourceFromCfg(ctx, tableDiffs, cfg.CheckThreadCount, cfg.Task.TargetCheckTables, cfg.Task.TargetInstance)
downstream, err = buildSourceFromCfg(ctx, tableDiffs, mysqlConnCount, bucketSpliterPool, cfg.Task.TargetCheckTables, cfg.Task.TargetInstance)
if err != nil {
return nil, nil, errors.Annotate(err, "from downstream")
}
return downstream, upstream, nil
}

func buildSourceFromCfg(ctx context.Context, tableDiffs []*common.TableDiff, checkThreadCount int, f tableFilter.Filter, dbs ...*config.DataSource) (Source, error) {
func buildSourceFromCfg(ctx context.Context, tableDiffs []*common.TableDiff, connCount int, bucketSpliterPool *utils.WorkerPool, f tableFilter.Filter, dbs ...*config.DataSource) (Source, error) {
if len(dbs) < 1 {
return nil, errors.Errorf("no db config detected")
}
Expand All @@ -219,12 +227,12 @@ func buildSourceFromCfg(ctx context.Context, tableDiffs []*common.TableDiff, che

if ok {
if len(dbs) == 1 {
return NewTiDBSource(ctx, tableDiffs, dbs[0], checkThreadCount, f)
return NewTiDBSource(ctx, tableDiffs, dbs[0], bucketSpliterPool, f)
} else {
log.Fatal("Don't support check table in multiple tidb instance, please specify one tidb instance.")
}
}
return NewMySQLSources(ctx, tableDiffs, dbs, checkThreadCount, f)
return NewMySQLSources(ctx, tableDiffs, dbs, connCount, f)
}

func getAutoSnapshotPosition(dbConfig *dbutil.DBConfig, vars map[string]string) (string, string, error) {
Expand All @@ -246,6 +254,7 @@ func initDBConn(ctx context.Context, cfg *config.Config) error {
vars := map[string]string{
"time_zone": UnifiedTimeZone,
}

// Fill in tidb_snapshot if it is set to AUTO
// This is only supported when set to auto on both target/source.
if cfg.Task.TargetInstance.IsAutoSnapshot() {
Expand All @@ -264,9 +273,9 @@ func initDBConn(ctx context.Context, cfg *config.Config) error {
cfg.Task.TargetInstance.SetSnapshot(secondaryTs)
cfg.Task.SourceInstances[0].SetSnapshot(primaryTs)
}
// we had 3 producers and `cfg.CheckThreadCount` consumer to use db connections.
// so the connection count need to be cfg.CheckThreadCount + 3.
targetConn, err := common.CreateDB(ctx, cfg.Task.TargetInstance.ToDBConfig(), vars, cfg.CheckThreadCount+3)
// we had `cfg.SplitThreadCount` producers and `cfg.CheckThreadCount` consumer to use db connections maybe and `cfg.CheckThreadCount` splitter to split buckets.
// so the connection count need to be cfg.SplitThreadCount + cfg.CheckThreadCount + cfg.CheckThreadCount.
targetConn, err := common.CreateDB(ctx, cfg.Task.TargetInstance.ToDBConfig(), vars, cfg.SplitThreadCount+2*cfg.CheckThreadCount)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -280,7 +289,7 @@ func initDBConn(ctx context.Context, cfg *config.Config) error {
return errors.Errorf("'auto' snapshot should be set on both target and source")
}
// connect source db with target db time_zone
conn, err := common.CreateDB(ctx, source.ToDBConfig(), vars, cfg.CheckThreadCount+1)
conn, err := common.CreateDB(ctx, source.ToDBConfig(), vars, cfg.SplitThreadCount+2*cfg.CheckThreadCount)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit f036de0

Please sign in to comment.