Skip to content

Commit

Permalink
sync-diff: make sure user setting can cover the default same table ro…
Browse files Browse the repository at this point in the history
…ute rules (#611)

close #558
  • Loading branch information
3pointer authored Apr 26, 2022
1 parent 3f5c34e commit 748edcb
Show file tree
Hide file tree
Showing 11 changed files with 546 additions and 10 deletions.
11 changes: 9 additions & 2 deletions sync_diff_inspector/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ type DataSource struct {
SqlMode string `toml:"sql-mode" json:"sql-mode"`
Snapshot string `toml:"snapshot" json:"snapshot"`

RouteRules []string `toml:"route-rules" json:"route-rules"`
Router *router.Table
RouteRules []string `toml:"route-rules" json:"route-rules"`
Router *router.Table
RouteTargetSet map[string]struct{} `json:"-"`

Conn *sql.DB
// SourceType string `toml:"source-type" json:"source-type"`
Expand Down Expand Up @@ -402,6 +403,7 @@ func (c *Config) adjustConfigByDMSubTasks() (err error) {
}
for _, subTaskCfg := range subTaskCfgs {
tableRouter, err := router.NewTableRouter(subTaskCfg.CaseSensitive, []*router.TableRule{})
routeTargetSet := make(map[string]struct{})
if err != nil {
return errors.Trace(err)
}
Expand All @@ -410,6 +412,7 @@ func (c *Config) adjustConfigByDMSubTasks() (err error) {
if err != nil {
return errors.Trace(err)
}
routeTargetSet[dbutil.TableName(rule.TargetSchema, rule.TargetTable)] = struct{}{}
}
dataSources[subTaskCfg.SourceID] = &DataSource{
Host: subTaskCfg.From.Host,
Expand All @@ -418,6 +421,8 @@ func (c *Config) adjustConfigByDMSubTasks() (err error) {
Password: subTaskCfg.From.Password,
SqlMode: sqlMode,
Router: tableRouter,

RouteTargetSet: routeTargetSet,
}
}
c.DataSources = dataSources
Expand Down Expand Up @@ -445,12 +450,14 @@ func (c *Config) Init() (err error) {
}
for _, d := range c.DataSources {
routeRuleList := make([]*router.TableRule, 0, len(c.Routes))
d.RouteTargetSet = make(map[string]struct{})
// if we had rules
for _, r := range d.RouteRules {
rr, ok := c.Routes[r]
if !ok {
return errors.Errorf("not found source routes for rule %s, please correct the config", r)
}
d.RouteTargetSet[dbutil.TableName(rr.TargetSchema, rr.TargetTable)] = struct{}{}
routeRuleList = append(routeRuleList, rr)
}
// t.SourceRoute can be nil, the caller should check it.
Expand Down
54 changes: 47 additions & 7 deletions sync_diff_inspector/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ const (

const UnifiedTimeZone string = "+0:00"

const (
ShieldDBName = "_no__exists__db_"
ShieldTableName = "_no__exists__table_"
)

type ChecksumInfo struct {
Checksum int64
Count int64
Expand Down Expand Up @@ -134,13 +139,48 @@ func NewSources(ctx context.Context, cfg *config.Config) (downstream Source, ups
// When the router set case-sensitive false,
// that add rule match itself will make table case unsensitive.
for _, d := range cfg.Task.SourceInstances {
if d.Router.AddRule(&router.TableRule{
SchemaPattern: tableConfig.Schema,
TablePattern: tableConfig.Table,
TargetSchema: tableConfig.Schema,
TargetTable: tableConfig.Table,
}) != nil {
return nil, nil, errors.Errorf("set case unsensitive failed. The schema/table name cannot be parttern. [schema = %s] [table = %s]", tableConfig.Schema, tableConfig.Table)
if _, ok := d.RouteTargetSet[dbutil.TableName(tableConfig.Schema, tableConfig.Table)]; ok {
// There is a user rule routing to `tableConfig.Schema`.`tableConfig.Table`
rules := d.Router.Match(tableConfig.Schema, tableConfig.Table)

if len(rules) == 0 {
// There is no self match in these user rules.
// Need to shield the table for this source.
if d.Router.AddRule(&router.TableRule{
SchemaPattern: tableConfig.Schema,
TablePattern: tableConfig.Table,
TargetSchema: ShieldDBName,
TargetTable: ShieldTableName,
}) != nil {
return nil, nil, errors.Errorf("add shield rule failed [schema = %s] [table = %s]", tableConfig.Schema, tableConfig.Table)
}
}
} else if _, ok := d.RouteTargetSet[dbutil.TableName(tableConfig.Schema, "")]; ok {
// There is a user rule routing to `tableConfig.Schema`
rules := d.Router.Match(tableConfig.Schema, tableConfig.Table)

if len(rules) == 0 {
// There is no self match in these user rules.
// Need to shield the table for this source.
if d.Router.AddRule(&router.TableRule{
SchemaPattern: tableConfig.Schema,
TablePattern: tableConfig.Table,
TargetSchema: ShieldDBName,
TargetTable: ShieldTableName,
}) != nil {
return nil, nil, errors.Errorf("add shield rule failed [schema = %s] [table = %s]", tableConfig.Schema, tableConfig.Table)
}
}
} else {
// Add the default rule to match upper/lower case
if d.Router.AddRule(&router.TableRule{
SchemaPattern: tableConfig.Schema,
TablePattern: tableConfig.Table,
TargetSchema: tableConfig.Schema,
TargetTable: tableConfig.Table,
}) != nil {
return nil, nil, errors.Errorf("add rule failed [schema = %s] [table = %s]", tableConfig.Schema, tableConfig.Table)
}
}
}
}
Expand Down
107 changes: 107 additions & 0 deletions sync_diff_inspector/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,113 @@ func TestSource(t *testing.T) {
require.NoError(t, err)
}

func TestRouterRules(t *testing.T) {
host, isExist := os.LookupEnv("MYSQL_HOST")
if host == "" || !isExist {
return
}
portStr, isExist := os.LookupEnv("MYSQL_PORT")
if portStr == "" || !isExist {
//return
}
port, err := strconv.Atoi(portStr)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

r, err := router.NewTableRouter(false, []*router.TableRule{
// make sure this rule works
{
SchemaPattern: "schema1",
TablePattern: "tbl",
TargetSchema: "schema2",
TargetTable: "tbl",
},
})
cfg := &config.Config{
LogLevel: "debug",
CheckThreadCount: 4,
ExportFixSQL: true,
CheckStructOnly: false,
DataSources: map[string]*config.DataSource{
"mysql1": {
Host: host,
Port: port,
User: "root",
},
"tidb": {
Host: host,
Port: port,
User: "root",
},
},
Routes: nil,
Task: config.TaskConfig{
Source: []string{"mysql1"},
Routes: nil,
Target: "tidb",
CheckTables: []string{"schema2.tbl"},
OutputDir: "./output",
SourceInstances: []*config.DataSource{
{
Host: host,
Port: port,
User: "root",
Router: r,
RouteTargetSet: make(map[string]struct{}),
},
},
TargetInstance: &config.DataSource{
Host: host,
Port: port,
User: "root",
},
TargetCheckTables: nil,
FixDir: "output/fix-on-tidb0",
CheckpointDir: "output/checkpoint",
HashFile: "",
},
ConfigFile: "config.toml",
PrintVersion: false,
}
cfg.Task.TargetCheckTables, err = filter.Parse([]string{"schema2.tbl", "schema_test.tbl"})
require.NoError(t, err)
cfg.Task.SourceInstances[0].RouteTargetSet[dbutil.TableName("schema2", "tbl")] = struct{}{}

// create table
conn, err := sql.Open("mysql", fmt.Sprintf("root:@tcp(%s:%d)/?charset=utf8mb4", host, port))
require.NoError(t, err)

conn.Exec("CREATE DATABASE IF NOT EXISTS schema1")
conn.Exec("CREATE TABLE IF NOT EXISTS `schema1`.`tbl` (`a` int, `b` varchar(24), `c` float, `d` datetime, primary key(`a`, `b`))")
conn.Exec("CREATE DATABASE IF NOT EXISTS schema2")
conn.Exec("CREATE TABLE IF NOT EXISTS `schema2`.`tbl` (`a` int, `b` varchar(24), `c` float, `d` datetime, primary key(`a`, `b`))")
conn.Exec("CREATE DATABASE IF NOT EXISTS schema_test")
conn.Exec("CREATE TABLE IF NOT EXISTS `schema_test`.`tbl` (`a` int, `b` varchar(24), `c` float, `d` datetime, primary key(`a`, `b`))")

_, _, err = NewSources(ctx, cfg)
require.NoError(t, err)

require.Equal(t, 1, len(cfg.Task.SourceInstances))
targetSchema, targetTable, err := cfg.Task.SourceInstances[0].Router.Route("schema1", "tbl")
require.NoError(t, err)
require.Equal(t, "schema2", targetSchema)
require.Equal(t, "tbl", targetTable)
targetSchema, targetTable, err = cfg.Task.SourceInstances[0].Router.Route("schema2", "tbl")
require.NoError(t, err)
require.Equal(t, ShieldDBName, targetSchema)
require.Equal(t, ShieldTableName, targetTable)
targetSchema, targetTable, err = cfg.Task.SourceInstances[0].Router.Route("schema_test", "tbl")
require.NoError(t, err)
require.Equal(t, "schema_test", targetSchema)
require.Equal(t, "tbl", targetTable)
_, tableRules := cfg.Task.SourceInstances[0].Router.AllRules()
require.Equal(t, 1, len(tableRules["schema1"]))
require.Equal(t, 1, len(tableRules["schema2"]))
require.Equal(t, 1, len(tableRules["schema_test"]))
}

func TestInitTables(t *testing.T) {
ctx := context.Background()
cfg := config.NewConfig()
Expand Down
3 changes: 2 additions & 1 deletion sync_diff_inspector/source/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ func getSourceTableMap(ctx context.Context, tableDiffs []*common.TableDiff, ds *
}
if _, ok := targetUniqueTableMap[uniqueId]; ok {
if _, ok := sourceTableMap[uniqueId]; ok {
log.Fatal("TiDB source don't merge multiple tables into one table")
log.Error("TiDB source don't support compare multiple source tables with one downstream table," +
" if this happening when diff on same instance is fine. otherwise we are not guarantee this diff result is right")
}
sourceTableMap[uniqueId] = &common.TableSource{
OriginSchema: schema,
Expand Down
18 changes: 18 additions & 0 deletions tests/_utils/check_contains_count
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/sh

# argument 1 is the string need grep
# argument 2 is the filename
# argument 3 is the match count

set -eu
OUT_DIR=/tmp/tidb_tools_test

count=$(grep -F "$1" "$2" | wc -l)

if [ "$count" -ne "$3" ]; then
echo "TEST FAILED: '$2' DOES NOT CONTAIN '$1' $3 times"
echo "____________________________________"
cat "$2"
echo "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
exit 1
fi
55 changes: 55 additions & 0 deletions tests/sync_diff_inspector/shard/config_router_1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Diff Configuration.

######################### Global config #########################


# how many goroutines are created to check data
check-thread-count = 4

# set false if just want compare data by checksum, will skip select data when checksum is not equal.
# set true if want compare all different rows, will slow down the total compare time.
export-fix-sql = true

# ignore check table's data
check-struct-only = false


######################### Databases config #########################
[data-sources.mysql1]
host = "127.0.0.1"#MYSQL_HOST
port = 3306#MYSQL_PORT
user = "root"
password = ""

route-rules = ["rule1"]
# remove comment if use tidb's snapshot data
# snapshot = "2016-10-08 16:45:26"

[data-sources.tidb]
host = "127.0.0.1"#MYSQL_HOST
port = 3306#MYSQL_HOST
user = "root"
password = ""
# remove comment if use tidb's snapshot data
# snapshot = "2016-10-08 16:45:26"

[routes.rule1]
schema-pattern = "router_test_0"
table-pattern = "tbl"
target-schema = "router_test_1"
target-table = "tbl"

######################### Task config #########################
[task]
# 1 fix sql: fix-target-TIDB1.sql
# 2 log: sync-diff.log
# 3 summary: summary.txt
# 4 checkpoint: a dir
output-dir = "/tmp/tidb_tools_test/sync_diff_inspector/output"

source-instances = ["mysql1"]

target-instance = "tidb"

# tables need to check.
target-check-tables = ["router_test_1.tbl"]
53 changes: 53 additions & 0 deletions tests/sync_diff_inspector/shard/config_router_2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Diff Configuration.

######################### Global config #########################


# how many goroutines are created to check data
check-thread-count = 4

# set false if just want compare data by checksum, will skip select data when checksum is not equal.
# set true if want compare all different rows, will slow down the total compare time.
export-fix-sql = true

# ignore check table's data
check-struct-only = false


######################### Databases config #########################
[data-sources.mysql1]
host = "127.0.0.1"#MYSQL_HOST
port = 3306#MYSQL_PORT
user = "root"
password = ""

route-rules = ["rule1"]
# remove comment if use tidb's snapshot data
# snapshot = "2016-10-08 16:45:26"

[data-sources.tidb]
host = "127.0.0.1"#MYSQL_HOST
port = 3306#MYSQL_HOST
user = "root"
password = ""
# remove comment if use tidb's snapshot data
# snapshot = "2016-10-08 16:45:26"

[routes.rule1]
schema-pattern = "router_test_0"
target-schema = "router_test_1"

######################### Task config #########################
[task]
# 1 fix sql: fix-target-TIDB1.sql
# 2 log: sync-diff.log
# 3 summary: summary.txt
# 4 checkpoint: a dir
output-dir = "/tmp/tidb_tools_test/sync_diff_inspector/output"

source-instances = ["mysql1"]

target-instance = "tidb"

# tables need to check.
target-check-tables = ["router_test_1.tbl"]
Loading

0 comments on commit 748edcb

Please sign in to comment.