Skip to content

Commit 1f111aa

Browse files
authored
sync_diff_inspector: support check between clusters with different time_zone setting (#436)
1 parent beed8dd commit 1f111aa

File tree

8 files changed

+164
-15
lines changed

8 files changed

+164
-15
lines changed

pkg/dbutil/common.go

+48-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"database/sql"
1919
"encoding/json"
2020
"fmt"
21+
"net/url"
2122
"os"
2223
"strconv"
2324
"strings"
@@ -109,7 +110,7 @@ func GetDBConfigFromEnv(schema string) DBConfig {
109110
}
110111

111112
// OpenDB opens a mysql connection FD
112-
func OpenDB(cfg DBConfig) (*sql.DB, error) {
113+
func OpenDB(cfg DBConfig, vars map[string]string) (*sql.DB, error) {
113114
var dbDSN string
114115
if len(cfg.Snapshot) != 0 {
115116
log.Info("create connection with snapshot", zap.String("snapshot", cfg.Snapshot))
@@ -118,6 +119,11 @@ func OpenDB(cfg DBConfig) (*sql.DB, error) {
118119
dbDSN = fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4", cfg.User, cfg.Password, cfg.Host, cfg.Port)
119120
}
120121

122+
for key, val := range vars {
123+
// key='val'. add single quote for better compatibility.
124+
dbDSN += fmt.Sprintf("&%s=%%27%s%%27", key, url.QueryEscape(val))
125+
}
126+
121127
dbConn, err := sql.Open("mysql", dbDSN)
122128
if err != nil {
123129
return nil, errors.Trace(err)
@@ -286,6 +292,47 @@ func GetMinMaxValue(ctx context.Context, db *sql.DB, schema, table, column strin
286292
return min.String, max.String, errors.Trace(rows.Err())
287293
}
288294

295+
func GetTimeZoneOffset(ctx context.Context, db *sql.DB) (time.Duration, error) {
296+
var timeStr string
297+
err := db.QueryRowContext(ctx, "SELECT cast(TIMEDIFF(NOW(6), UTC_TIMESTAMP(6)) as time);").Scan(&timeStr)
298+
if err != nil {
299+
return 0, errors.Trace(err)
300+
}
301+
factor := time.Duration(1)
302+
if timeStr[0] == '-' || timeStr[0] == '+' {
303+
if timeStr[0] == '-' {
304+
factor *= -1
305+
}
306+
timeStr = timeStr[1:]
307+
}
308+
t, err := time.Parse("15:04:05", timeStr)
309+
if err != nil {
310+
return 0, errors.Trace(err)
311+
}
312+
313+
if t.IsZero() {
314+
return 0, nil
315+
}
316+
317+
hour, minute, second := t.Clock()
318+
319+
d := time.Duration(hour*3600+minute*60+second) * time.Second * factor
320+
return d, nil
321+
}
322+
323+
func FormatTimeZoneOffset(offset time.Duration) string {
324+
prefix := "+"
325+
if offset < 0 {
326+
prefix = "-"
327+
offset *= -1
328+
}
329+
hours := offset / time.Hour
330+
minutes := (offset % time.Hour) / time.Minute
331+
332+
return fmt.Sprintf("%s%02d:%02d", prefix, hours, minutes)
333+
334+
}
335+
289336
func queryTables(ctx context.Context, db *sql.DB, q string) (tables []string, err error) {
290337
log.Debug("query tables", zap.String("query", q))
291338
rows, err := db.QueryContext(ctx, q)

pkg/dbutil/common_test.go

+16
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package dbutil
1515

1616
import (
1717
"context"
18+
"time"
1819

1920
sqlmock "github.com/DATA-DOG/go-sqlmock"
2021
"github.com/go-sql-driver/mysql"
@@ -227,3 +228,18 @@ func (s *testDBSuite) TestAnalyzeValuesFromBuckets(c *C) {
227228
c.Assert(val[0], Equals, ca.expect)
228229
}
229230
}
231+
232+
func (s *testDBSuite) TestFormatTimeZoneOffset(c *C) {
233+
cases := map[string]time.Duration{
234+
"+00:00": 0,
235+
"+01:00": time.Hour,
236+
"-08:03": -1 * (8*time.Hour + 3*time.Minute),
237+
"-12:59": -1 * (12*time.Hour + 59*time.Minute),
238+
"+12:59": 12*time.Hour + 59*time.Minute,
239+
}
240+
241+
for k, v := range cases {
242+
offset := FormatTimeZoneOffset(v)
243+
c.Assert(k, Equals, offset)
244+
}
245+
}

pkg/ddl-checker/ddl_syncer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type DDLSyncer struct {
2929

3030
// NewDDLSyncer create a new DDLSyncer
3131
func NewDDLSyncer(cfg *dbutil.DBConfig, executableChecker *ExecutableChecker) (*DDLSyncer, error) {
32-
db, err := dbutil.OpenDB(*cfg)
32+
db, err := dbutil.OpenDB(*cfg, nil)
3333
if err != nil {
3434
return nil, errors.Trace(err)
3535
}

pkg/diff/conn.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import (
2222
)
2323

2424
// CreateDB creates sql.DB used for select data
25-
func CreateDB(ctx context.Context, dbConfig dbutil.DBConfig, num int) (db *sql.DB, err error) {
26-
db, err = dbutil.OpenDB(dbConfig)
25+
func CreateDB(ctx context.Context, dbConfig dbutil.DBConfig, vars map[string]string, num int) (db *sql.DB, err error) {
26+
db, err = dbutil.OpenDB(dbConfig, vars)
2727
if err != nil {
2828
return nil, errors.Errorf("create db connections %s error %v", dbConfig.String(), err)
2929
}
@@ -40,7 +40,7 @@ func CreateDB(ctx context.Context, dbConfig dbutil.DBConfig, num int) (db *sql.D
4040
func CreateDBForCP(ctx context.Context, dbConfig dbutil.DBConfig) (cpDB *sql.DB, err error) {
4141
// set snapshot to empty, this DB used for write checkpoint data
4242
dbConfig.Snapshot = ""
43-
cpDB, err = dbutil.OpenDB(dbConfig)
43+
cpDB, err = dbutil.OpenDB(dbConfig, nil)
4444
if err != nil {
4545
return nil, errors.Errorf("create db connections %+v error %v", dbConfig, err)
4646
}

pkg/diff/diff_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ func createTableDiff(conn *sql.DB, schema string, sourceTableNames []string, tar
289289
}
290290

291291
func createConn() (*sql.DB, error) {
292-
return dbutil.OpenDB(dbutil.GetDBConfigFromEnv(""))
292+
return dbutil.OpenDB(dbutil.GetDBConfigFromEnv(""), nil)
293293
}
294294

295295
func generateData(ctx context.Context, db *sql.DB, dbCfg dbutil.DBConfig, sourceTables []string, targetTable string, hasEmptyTable bool) error {

sync_diff_inspector/diff.go

+18-9
Original file line numberDiff line numberDiff line change
@@ -122,21 +122,30 @@ func (df *Diff) init(cfg *Config) (err error) {
122122

123123
// CreateDBConn creates db connections for source and target.
124124
func (df *Diff) CreateDBConn(cfg *Config) (err error) {
125-
for _, source := range cfg.SourceDBCfg {
126-
source.Conn, err = diff.CreateDB(df.ctx, source.DBConfig, cfg.CheckThreadCount)
127-
if err != nil {
128-
return errors.Errorf("create source db %s error %v", source.DBConfig.String(), err)
129-
}
130-
df.sourceDBs[source.InstanceID] = source
131-
}
132-
133125
// create connection for target.
134-
cfg.TargetDBCfg.Conn, err = diff.CreateDB(df.ctx, cfg.TargetDBCfg.DBConfig, cfg.CheckThreadCount)
126+
cfg.TargetDBCfg.Conn, err = diff.CreateDB(df.ctx, cfg.TargetDBCfg.DBConfig, nil, cfg.CheckThreadCount)
135127
if err != nil {
136128
return errors.Errorf("create target db %s error %v", cfg.TargetDBCfg.DBConfig.String(), err)
137129
}
138130
df.targetDB = cfg.TargetDBCfg
139131

132+
targetTZOffset, err := dbutil.GetTimeZoneOffset(df.ctx, cfg.TargetDBCfg.Conn)
133+
if err != nil {
134+
return errors.Annotatef(err, "fetch target db %s time zone offset failed", cfg.TargetDBCfg.DBConfig.String())
135+
}
136+
vars := map[string]string{
137+
"time_zone": dbutil.FormatTimeZoneOffset(targetTZOffset),
138+
}
139+
140+
for _, source := range cfg.SourceDBCfg {
141+
// connect source db with target db time_zone
142+
source.Conn, err = diff.CreateDB(df.ctx, source.DBConfig, vars, cfg.CheckThreadCount)
143+
if err != nil {
144+
return errors.Annotatef(err, "create source db %s failed", source.DBConfig.String())
145+
}
146+
df.sourceDBs[source.InstanceID] = source
147+
}
148+
140149
df.cpDB, err = diff.CreateDBForCP(df.ctx, cfg.TargetDBCfg.DBConfig)
141150
if err != nil {
142151
return errors.Errorf("create checkpoint db %s error %v", cfg.TargetDBCfg.DBConfig.String(), err)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# diff Configuration.
2+
3+
log-level = "debug"
4+
5+
# for example, the whole data is [1...100]
6+
# we can split these data to [1...10], [11...20], ..., [91...100]
7+
# the [1...10] is a chunk, and it's chunk size is 10
8+
# size of the split chunk
9+
chunk-size = 1000
10+
11+
# how many goroutines are created to check data
12+
check-thread-count = 4
13+
14+
# sampling check percent, for example 10 means only check 10% data
15+
sample-percent = 100
16+
17+
# calculate the data's checksum, and compare data by checksum.
18+
# set false if want to comapre the data directly
19+
use-checksum = true
20+
21+
# set true will continue check from the latest checkpoint
22+
use-checkpoint = false
23+
24+
# the name of the file which saves sqls used to fix different data.
25+
fix-sql-file = "/tmp/tidb_tools_test/sync_diff_inspector/fix.sql"
26+
27+
# tables need to check.
28+
[[check-tables]]
29+
# schema name in target database.
30+
schema = "tz_test"
31+
32+
# table list which need check in target database.
33+
tables = ["diff"]
34+
35+
[[source-db]]
36+
host = "127.0.0.1"
37+
port = 4001
38+
user = "root"
39+
password = ""
40+
instance-id = "source-1"
41+
42+
[target-db]
43+
host = "127.0.0.1"
44+
port = 4000
45+
user = "root"
46+
password = ""
47+
instance-id = "target-1"
+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#!/bin/sh
2+
3+
set -e
4+
5+
cd "$(dirname "$0")"
6+
OUT_DIR=/tmp/tidb_tools_test/sync_diff_inspector
7+
8+
mysql -uroot -h 127.0.0.1 -P 4000 -e "create database if not exists tz_test"
9+
mysql -uroot -h 127.0.0.1 -P 4000 -e "create table tz_test.diff(id int, dt datetime, ts timestamp);"
10+
mysql -uroot -h 127.0.0.1 -P 4000 -e "insert into table tz_test.diff values (1, now(), now());"
11+
12+
echo "dump data and then load to tidb"
13+
rm -rf $OUT_DIR/dump_tz_diff
14+
mydumper --host 127.0.0.1 --port 4000 --user root --outputdir $OUT_DIR/dump_tz_diff -B tz_test -T diff
15+
loader -h 127.0.0.1 -P 4001 -u root -d $OUT_DIR/dump_tz_diff
16+
17+
echo "check with the same time_zone, check result should be pass"
18+
sync_diff_inspector --config=./config.toml > $OUT_DIR/time_zone_diff.log
19+
check_contains "check pass!!!" $OUT_DIR/time_zone_diff.log
20+
21+
# check upstream and downstream time_zone
22+
mysql -uroot -h 127.0.0.1 -P 4000 -e "SET @@global.time_zone = '+08:00'";
23+
mysql -uroot -h 127.0.0.1 -P 4001 -e "SET @@global.time_zone = '+00:00'";
24+
echo "check with different time_zone, check result should be pass again"
25+
sync_diff_inspector --config=./config.toml > $OUT_DIR/time_zone_diff.log
26+
check_contains "check pass!!!" $OUT_DIR/time_zone_diff.log
27+
28+
# reset time_zone
29+
mysql -uroot -h 127.0.0.1 -P 4000 -e "SET @@global.time_zone = 'SYSTEM'";
30+
mysql -uroot -h 127.0.0.1 -P 4001 -e "SET @@global.time_zone = 'SYSTEM'";

0 commit comments

Comments
 (0)