From 9ab2794b96d3f775fd5a100de5f163ec165c06f4 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 29 May 2023 15:10:40 +0800 Subject: [PATCH] drainer: support load table infos to save memory and add gc safepoint update (#1233) ref pingcap/tidb-binlog#1137 --- drainer/config.go | 2 ++ drainer/safepoint.go | 49 ++++++++++++++++++++++++++++++++++++ drainer/schema.go | 5 ++-- drainer/schema_test.go | 6 ----- drainer/server.go | 21 +++++++++++++++- drainer/util.go | 56 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 129 insertions(+), 10 deletions(-) create mode 100644 drainer/safepoint.go diff --git a/drainer/config.go b/drainer/config.go index 8ad0a3cb3..e086a3f7a 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -122,6 +122,7 @@ type SyncerConfig struct { EnableCausalityFlag *bool `toml:"-" json:"enable-detect-flag"` DisableCausalityFile *bool `toml:"disable-detect" json:"disable-detect"` EnableCausalityFile *bool `toml:"enable-detect" json:"enable-detect"` + LoadTableInfos bool `toml:"load-table-infos" json:"load-table-infos"` // v2 filter rules CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"` @@ -252,6 +253,7 @@ func NewConfig() *Config { fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant") fs.BoolVar(cfg.SyncerCfg.DisableCausalityFlag, "disable-detect", false, "DEPRECATED, use enable-detect") fs.BoolVar(cfg.SyncerCfg.EnableCausalityFlag, "enable-detect", true, "enable detect causality") + fs.BoolVar(&cfg.SyncerCfg.LoadTableInfos, "load-table-infos", false, "load table infos") fs.IntVar(&maxBinlogItemCount, "cache-binlog-count", defaultBinlogItemCount, "blurry count of binlogs in cache, limit cache size") fs.IntVar(&cfg.SyncedCheckTime, "synced-check-time", defaultSyncedCheckTime, "if we can't detect new binlog after many minute, we think the all binlog is all synced") fs.StringVar(new(string), "log-rotate", "", "DEPRECATED") diff --git a/drainer/safepoint.go b/drainer/safepoint.go new file mode 100644 index 000000000..be7f1b682 --- /dev/null +++ b/drainer/safepoint.go @@ -0,0 +1,49 @@ +package drainer + +import ( + "context" + "fmt" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/tidb-binlog/drainer/checkpoint" + pd "github.com/tikv/pd/client" + "go.uber.org/zap" +) + +const ( + drainerServiceSafePointPrefix = "drainer" + defaultDrainerGCSafePointTTL = 5 * 60 +) + +func updateServiceSafePoint(ctx context.Context, pdClient pd.Client, cpt checkpoint.CheckPoint, ttl int64) { + updateInterval := time.Duration(ttl/2) * time.Second + tick := time.NewTicker(updateInterval) + defer tick.Stop() + dumplingServiceSafePointID := fmt.Sprintf("%s_%d", drainerServiceSafePointPrefix, time.Now().UnixNano()) + log.Info("generate drainer gc safePoint id", zap.String("id", dumplingServiceSafePointID)) + + for { + snapshotTS := uint64(cpt.TS()) + log.Debug("update PD safePoint limit with ttl", + zap.Uint64("safePoint", snapshotTS), + zap.Int64("ttl", ttl)) + for retryCnt := 0; retryCnt <= 10; retryCnt++ { + _, err := pdClient.UpdateServiceGCSafePoint(ctx, dumplingServiceSafePointID, ttl, snapshotTS) + if err == nil { + break + } + log.Debug("update PD safePoint failed", zap.Error(err), zap.Int("retryTime", retryCnt)) + select { + case <-ctx.Done(): + return + case <-time.After(time.Second): + } + } + select { + case <-ctx.Done(): + return + case <-tick.C: + } + } +} diff --git a/drainer/schema.go b/drainer/schema.go index 839739c3a..d278905f4 100644 --- a/drainer/schema.go +++ b/drainer/schema.go @@ -364,9 +364,8 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string, log.Debug("Handle job", zap.Stringer("job", job)) - sql = job.Query - if sql == "" { - return "", "", "", errors.Errorf("[ddl job sql miss]%+v", job) + if job.Query == "" { + log.Warn("job query is empty", zap.Stringer("job", job)) } switch job.Type { diff --git a/drainer/schema_test.go b/drainer/schema_test.go index 56a4bc040..4a5e724a5 100644 --- a/drainer/schema_test.go +++ b/drainer/schema_test.go @@ -286,12 +286,6 @@ func (t *schemaSuite) TestHandleDDL(c *C) { c.Assert(err, IsNil) c.Assert(sql, Equals, "") - // check job.Query is empty - job = &model.Job{ID: 1, State: model.JobStateDone} - _, _, sql, err = schema.handleDDL(job) - c.Assert(sql, Equals, "") - c.Assert(err, NotNil, Commentf("should return not found job.Query")) - // db info dbInfo := &model.DBInfo{ ID: 2, diff --git a/drainer/server.go b/drainer/server.go index 36233d291..fc73f5b95 100644 --- a/drainer/server.go +++ b/drainer/server.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/store" "github.com/pingcap/tidb/store/driver" "github.com/pingcap/tipb/go-binlog" @@ -200,7 +201,12 @@ func createSyncer(etcdURLs string, cp checkpoint.CheckPoint, cfg *SyncerConfig) } defer tiStore.Close() - jobs, err := loadHistoryDDLJobs(tiStore) + var jobs []*model.Job + if cfg.LoadTableInfos { + jobs, err = loadTableInfos(tiStore, cp.TS()) + } else { + jobs, err = loadHistoryDDLJobs(tiStore) + } if err != nil { return nil, errors.Trace(err) } @@ -275,6 +281,19 @@ func (s *Server) Start() error { } }) + if s.cfg.SyncerCfg != nil && s.cfg.SyncerCfg.LoadTableInfos { + s.tg.GoNoPanic("gc_safepoint", func() { + defer func() { go s.Close() }() + pdCli, err := getPdClient(s.cfg.EtcdURLs, s.cfg.Security) + if err != nil { + log.Error("fail to create pdCli", zap.Error(err)) + errCh <- err + } + updateServiceSafePoint(s.ctx, pdCli, s.cp, defaultDrainerGCSafePointTTL) + pdCli.Close() + }) + } + s.tg.GoNoPanic("collect", func() { defer func() { go s.Close() }() s.collector.Start(s.ctx) diff --git a/drainer/util.go b/drainer/util.go index ec9d85a61..b9d2e4bb8 100644 --- a/drainer/util.go +++ b/drainer/util.go @@ -195,6 +195,34 @@ func loadHistoryDDLJobs(tiStore kv.Storage) ([]*model.Job, error) { return jobs, nil } +// loadTableInfos loads all table infos after startTs +func loadTableInfos(tiStore kv.Storage, startTs int64) ([]*model.Job, error) { + meta := getSnapshotMetaFromTs(tiStore, startTs) + dbinfos, err := meta.ListDatabases() + if err != nil { + return nil, errors.Trace(err) + } + jobs := make([]*model.Job, 0, len(dbinfos)) + version := int64(1) + for _, dbinfo := range dbinfos { + log.L().Info("load db info", zap.Stringer("db", dbinfo.Name), zap.Int64("version", version)) + jobs = append(jobs, mockCreateSchemaJob(dbinfo, version)) + version++ + } + for _, dbinfo := range dbinfos { + tableInfos, err := meta.ListTables(dbinfo.ID) + if err != nil { + return nil, errors.Trace(err) + } + for _, tableInfo := range tableInfos { + log.L().Debug("load table info", zap.Stringer("db", dbinfo.Name), zap.Stringer("table", tableInfo.Name), zap.Int64("version", version)) + jobs = append(jobs, mockCreateTableJob(tableInfo, dbinfo.ID, version)) + version++ + } + } + return jobs, nil +} + func getSnapshotMeta(tiStore kv.Storage) (*meta.Meta, error) { version, err := tiStore.CurrentVersion(oracle.GlobalTxnScope) if err != nil { @@ -204,6 +232,11 @@ func getSnapshotMeta(tiStore kv.Storage) (*meta.Meta, error) { return meta.NewSnapshotMeta(snapshot), nil } +func getSnapshotMetaFromTs(tiStore kv.Storage, ts int64) *meta.Meta { + snapshot := tiStore.GetSnapshot(kv.NewVersion(uint64(ts))) + return meta.NewSnapshotMeta(snapshot) +} + func genDrainerID(listenAddr string) (string, error) { urllis, err := url.Parse(listenAddr) if err != nil { @@ -335,3 +368,26 @@ func combineFilterRules(filterRules []*bf.BinlogEventRule) []*bf.BinlogEventRule } return rules } + +func mockCreateSchemaJob(dbInfo *model.DBInfo, schemaVersion int64) *model.Job { + return &model.Job{ + Type: model.ActionCreateSchema, + State: model.JobStateDone, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: schemaVersion, + DBInfo: dbInfo, + }, + } +} + +func mockCreateTableJob(tableInfo *model.TableInfo, schemaID, schemaVersion int64) *model.Job { + return &model.Job{ + Type: model.ActionCreateTable, + State: model.JobStateDone, + SchemaID: schemaID, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: schemaVersion, + TableInfo: tableInfo, + }, + } +}