@@ -45,8 +45,9 @@ const (
45
45
const UnifiedTimeZone string = "+0:00"
46
46
47
47
const (
48
- ShieldDBName = "_no__exists__db_"
49
- ShieldTableName = "_no__exists__table_"
48
+ ShieldDBName = "_no__exists__db_"
49
+ ShieldTableName = "_no__exists__table_"
50
+ GetSyncPointQuery = "SELECT primary_ts, secondary_ts FROM tidb_cdc.syncpoint_v1 ORDER BY primary_ts DESC LIMIT 1"
50
51
)
51
52
52
53
type ChecksumInfo struct {
@@ -226,11 +227,43 @@ func buildSourceFromCfg(ctx context.Context, tableDiffs []*common.TableDiff, che
226
227
return NewMySQLSources (ctx , tableDiffs , dbs , checkThreadCount , f )
227
228
}
228
229
230
+ func getAutoSnapshotPosition (dbConfig * dbutil.DBConfig , vars map [string ]string ) (string , string , error ) {
231
+ tmpConn , err := dbutil .OpenDB (* dbConfig , vars )
232
+ if err != nil {
233
+ return "" , "" , errors .Annotatef (err , "connecting to auto-position tidb_snapshot failed" )
234
+ }
235
+ defer tmpConn .Close ()
236
+ var primaryTs , secondaryTs string
237
+ err = tmpConn .QueryRow (GetSyncPointQuery ).Scan (& primaryTs , & secondaryTs )
238
+ if err != nil {
239
+ return "" , "" , errors .Annotatef (err , "fetching auto-position tidb_snapshot failed" )
240
+ }
241
+ return primaryTs , secondaryTs , nil
242
+ }
243
+
229
244
func initDBConn (ctx context.Context , cfg * config.Config ) error {
230
245
// Unified time zone
231
246
vars := map [string ]string {
232
247
"time_zone" : UnifiedTimeZone ,
233
248
}
249
+ // Fill in tidb_snapshot if it is set to AUTO
250
+ // This is only supported when set to auto on both target/source.
251
+ if cfg .Task .TargetInstance .IsAutoSnapshot () {
252
+ if len (cfg .Task .SourceInstances ) > 1 {
253
+ return errors .Errorf ("'auto' snapshot only supports one tidb source" )
254
+ }
255
+ if ! cfg .Task .SourceInstances [0 ].IsAutoSnapshot () {
256
+ return errors .Errorf ("'auto' snapshot should be set on both target and source" )
257
+ }
258
+ tmpDBConfig := cfg .Task .TargetInstance .ToDBConfig ()
259
+ tmpDBConfig .Snapshot = ""
260
+ primaryTs , secondaryTs , err := getAutoSnapshotPosition (tmpDBConfig , vars )
261
+ if err != nil {
262
+ return err
263
+ }
264
+ cfg .Task .TargetInstance .SetSnapshot (secondaryTs )
265
+ cfg .Task .SourceInstances [0 ].SetSnapshot (primaryTs )
266
+ }
234
267
// we had 3 producers and `cfg.CheckThreadCount` consumer to use db connections.
235
268
// so the connection count need to be cfg.CheckThreadCount + 3.
236
269
targetConn , err := common .CreateDB (ctx , cfg .Task .TargetInstance .ToDBConfig (), vars , cfg .CheckThreadCount + 3 )
@@ -241,6 +274,11 @@ func initDBConn(ctx context.Context, cfg *config.Config) error {
241
274
cfg .Task .TargetInstance .Conn = targetConn
242
275
243
276
for _ , source := range cfg .Task .SourceInstances {
277
+ // If it is still set to AUTO it means it was not set on the target.
278
+ // We require it to be set to AUTO on both.
279
+ if source .IsAutoSnapshot () {
280
+ return errors .Errorf ("'auto' snapshot should be set on both target and source" )
281
+ }
244
282
// connect source db with target db time_zone
245
283
conn , err := common .CreateDB (ctx , source .ToDBConfig (), vars , cfg .CheckThreadCount + 1 )
246
284
if err != nil {
0 commit comments