From 70ca906d64c6d23302009366b970b0dc66b6b87e Mon Sep 17 00:00:00 2001 From: shady Date: Tue, 28 Feb 2017 15:35:40 +0800 Subject: [PATCH] drainer: refactor executor & add self-description pb output (#121) *: add output self-description protobuf binlog --- cmd/drainer/drainer.toml | 12 +- drainer/collector.go | 10 +- drainer/config.go | 72 +- drainer/executor/executor.go | 25 + drainer/executor/mysql.go | 30 + drainer/executor/pb.go | 70 ++ drainer/executor/util.go | 93 ++ drainer/fliter.go | 48 +- drainer/pump.go | 3 +- drainer/server.go | 26 +- drainer/{executor.go => syncer.go} | 325 +++--- drainer/{executor_test.go => syncer_test.go} | 30 +- drainer/translator/mysql.go | 192 ++-- drainer/translator/mysql_test.go | 22 +- drainer/translator/pb.go | 270 +++++ .../{translater.go => translator.go} | 19 +- ...{translater_test.go => translator_test.go} | 146 +-- drainer/util.go | 59 +- generate-binlog.sh | 11 + proto/binlog.proto | 50 + proto/binlog/binlog.pb.go | 966 ++++++++++++++++++ 21 files changed, 1894 insertions(+), 585 deletions(-) create mode 100644 drainer/executor/executor.go create mode 100644 drainer/executor/mysql.go create mode 100644 drainer/executor/pb.go create mode 100644 drainer/executor/util.go rename drainer/{executor.go => syncer.go} (54%) rename drainer/{executor_test.go => syncer_test.go} (83%) create mode 100644 drainer/translator/pb.go rename drainer/translator/{translater.go => translator.go} (70%) rename drainer/translator/{translater_test.go => translator_test.go} (70%) create mode 100755 generate-binlog.sh create mode 100644 proto/binlog.proto create mode 100644 proto/binlog/binlog.pb.go diff --git a/cmd/drainer/drainer.toml b/cmd/drainer/drainer.toml index 23c1e6b0a..3b9272478 100644 --- a/cmd/drainer/drainer.toml +++ b/cmd/drainer/drainer.toml @@ -12,8 +12,8 @@ data-dir = "data.drainer" # a comma separated list of PD endpoints pd-urls = "http://127.0.0.1:2379" -# Executor Configuration. -[executor] +# syncer Configuration. +[syncer] # disable sync these schema ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql" @@ -24,8 +24,10 @@ txn-batch = 1 # work count to execute binlogs worker-count = 1 +disable-dispatch = true + # the downstream mysql protocol database -[executor.to] +[syncer.to] host = "127.0.0.1" user = "root" password = "" @@ -35,10 +37,10 @@ port = 3306 ##and we support regex expression , start with '~' declare use regex expression. # #replicate-do-db = ["~^b.*","s1"] -#[[executor.replicate-do-table]] +#[[syncer.replicate-do-table]] #db-name ="test" #tbl-name = "log" -#[[executor.replicate-do-table]] +#[[syncer.replicate-do-table]] #db-name ="test" #tbl-name = "~^a.*" diff --git a/drainer/collector.go b/drainer/collector.go index 9cdd2ca71..fa49952d8 100644 --- a/drainer/collector.go +++ b/drainer/collector.go @@ -38,7 +38,7 @@ type Collector struct { tiStore kv.Storage pumps map[string]*Pump bh *binlogHeap - executor *Executor + syncer *Syncer latestTS int64 meta Meta @@ -52,7 +52,7 @@ type Collector struct { } // NewCollector returns an instance of Collector -func NewCollector(cfg *Config, clusterID uint64, w *DepositWindow, e *Executor, m Meta) (*Collector, error) { +func NewCollector(cfg *Config, clusterID uint64, w *DepositWindow, s *Syncer, m Meta) (*Collector, error) { urlv, err := flags.NewURLsValue(cfg.EtcdURLs) if err != nil { return nil, errors.Trace(err) @@ -79,7 +79,7 @@ func NewCollector(cfg *Config, clusterID uint64, w *DepositWindow, e *Executor, pumps: make(map[string]*Pump), bh: newBinlogHeap(maxHeapSize), window: w, - executor: e, + syncer: s, meta: m, tiClient: tiClient, tiStore: tiStore, @@ -272,8 +272,8 @@ func (c *Collector) publishBinlogs(ctx context.Context, minTS, maxTS int64) { item := c.bh.pop() for item != nil { - c.executor.AddToExectorChan(item) - // if binlogOffsets[item.nodeID] == len(bss[item.nodeID]), all binlogs must be pushed into heap, then delete it from bss + c.syncer.Add(item) + // if binlogOffsets[item.nodeID] == len(bss[item.nodeID]), all binlogs must be pushed into heap, delete it from bss if binlogOffsets[item.nodeID] == len(bss[item.nodeID]) { delete(bss, item.nodeID) } else { diff --git a/drainer/config.go b/drainer/config.go index 8d9b75093..bea953c4a 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -3,6 +3,7 @@ package drainer import ( "flag" "fmt" + "math" "net" "net/url" "os" @@ -12,6 +13,7 @@ import ( "github.com/BurntSushi/toml" "github.com/juju/errors" + "github.com/pingcap/tidb-binlog/drainer/executor" "github.com/pingcap/tidb-binlog/pkg/flags" ) @@ -25,36 +27,29 @@ const ( defaultPumpTimeout = 5 * time.Second ) -// DBConfig is the DB configuration. -type DBConfig struct { - Host string `toml:"host" json:"host"` - User string `toml:"user" json:"user"` - Password string `toml:"password" json:"password"` - Port int `toml:"port" json:"port"` -} - -// ExecutorConfig is the Executor's configuration. -type ExecutorConfig struct { - IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"` - TxnBatch int `toml:"txn-batch" json:"txn-batch"` - WorkerCount int `toml:"worker-count" json:"worker-count"` - To DBConfig `toml:"to" json:"to"` - DoTables []TableName `toml:"replicate-do-table" json:"replicate-do-table"` - DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"` - DestDBType string `toml:"db-type" json:"db-type"` +// SyncerConfig is the Syncer's configuration. +type SyncerConfig struct { + IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"` + TxnBatch int `toml:"txn-batch" json:"txn-batch"` + WorkerCount int `toml:"worker-count" json:"worker-count"` + To *executor.DBConfig `toml:"to" json:"to"` + DoTables []TableName `toml:"replicate-do-table" json:"replicate-do-table"` + DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"` + DestDBType string `toml:"db-type" json:"db-type"` + DisableDispatch bool `toml:"disable-dispatch" json:"disable-dispatch"` } // Config holds the configuration of drainer type Config struct { *flag.FlagSet - LogLevel string `toml:"log-level" json:"log-level"` - ListenAddr string `toml:"addr" json:"addr"` - DataDir string `toml:"data-dir" json:"data-dir"` - DetectInterval int `toml:"detect-interval" json:"detect-interval"` - EtcdURLs string `toml:"pd-urls" json:"pd-urls"` - LogFile string `toml:"log-file" json:"log-file"` - LogRotate string `toml:"log-rotate" json:"log-rotate"` - ExecutorCfg *ExecutorConfig `toml:"executor" json:"executor"` + LogLevel string `toml:"log-level" json:"log-level"` + ListenAddr string `toml:"addr" json:"addr"` + DataDir string `toml:"data-dir" json:"data-dir"` + DetectInterval int `toml:"detect-interval" json:"detect-interval"` + EtcdURLs string `toml:"pd-urls" json:"pd-urls"` + LogFile string `toml:"log-file" json:"log-file"` + LogRotate string `toml:"log-rotate" json:"log-rotate"` + SyncerCfg *SyncerConfig `toml:"syncer" json:"sycner"` EtcdTimeout time.Duration PumpTimeout time.Duration MetricsAddr string @@ -68,7 +63,7 @@ func NewConfig() *Config { cfg := &Config{ EtcdTimeout: defaultEtcdTimeout, PumpTimeout: defaultPumpTimeout, - ExecutorCfg: new(ExecutorConfig), + SyncerCfg: new(SyncerConfig), } cfg.FlagSet = flag.NewFlagSet("drainer", flag.ContinueOnError) fs := cfg.FlagSet @@ -87,14 +82,11 @@ func NewConfig() *Config { fs.IntVar(&cfg.MetricsInterval, "metrics-interval", 15, "prometheus client push interval in second, set \"0\" to disable prometheus push") fs.StringVar(&cfg.LogFile, "log-file", "", "log file path") fs.StringVar(&cfg.LogRotate, "log-rotate", "", "log file rotate type, hour/day") - fs.IntVar(&cfg.ExecutorCfg.TxnBatch, "txn-batch", 1, "number of binlog events in a transaction batch") - fs.StringVar(&cfg.ExecutorCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync the meta schema") - fs.IntVar(&cfg.ExecutorCfg.WorkerCount, "c", 1, "parallel worker count") - fs.StringVar(&cfg.ExecutorCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql, postgresql") - fs.StringVar(&cfg.ExecutorCfg.To.Host, "db-host", "127.0.0.1", "host of target database") - fs.IntVar(&cfg.ExecutorCfg.To.Port, "db-port", 3306, "port of target database") - fs.StringVar(&cfg.ExecutorCfg.To.User, "db-username", "root", "username of target database") - fs.StringVar(&cfg.ExecutorCfg.To.Password, "db-password", "", "password of target database") + fs.IntVar(&cfg.SyncerCfg.TxnBatch, "txn-batch", 1, "number of binlog events in a transaction batch") + fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas") + fs.IntVar(&cfg.SyncerCfg.WorkerCount, "c", 1, "parallel worker count") + fs.StringVar(&cfg.SyncerCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql, postgresql") + fs.BoolVar(&cfg.SyncerCfg.DisableDispatch, "disable-dispatch", false, "disable dispatching sqls that in one same binlog; if set true, work-count and txn-batch would be useless") return cfg } @@ -137,11 +129,19 @@ func (cfg *Config) Parse(args []string) error { cfg.ListenAddr = "http://" + cfg.ListenAddr // add 'http:' scheme to facilitate parsing adjustString(&cfg.DataDir, defaultDataDir) adjustInt(&cfg.DetectInterval, defaultDetectInterval) - cfg.ExecutorCfg.adjustDoDBAndTable() + cfg.SyncerCfg.adjustWorkCount() + cfg.SyncerCfg.adjustDoDBAndTable() return cfg.validate() } -func (c *ExecutorConfig) adjustDoDBAndTable() { +func (c *SyncerConfig) adjustWorkCount() { + if c.DisableDispatch { + c.WorkerCount = 1 + c.TxnBatch = math.MaxInt32 + } +} + +func (c *SyncerConfig) adjustDoDBAndTable() { for i := 0; i < len(c.DoTables); i++ { c.DoTables[i].Table = strings.ToLower(c.DoTables[i].Table) c.DoTables[i].Schema = strings.ToLower(c.DoTables[i].Schema) diff --git a/drainer/executor/executor.go b/drainer/executor/executor.go new file mode 100644 index 000000000..e42177060 --- /dev/null +++ b/drainer/executor/executor.go @@ -0,0 +1,25 @@ +package executor + +import ( + "github.com/juju/errors" +) + +// Executor is the interface for execute TiDB binlog's sql +type Executor interface { + // Execute executes TiDB binlogs + Execute([]string, [][]interface{}, []int64, bool) error + // Close closes the executor + Close() error +} + +// New returns the an Executor instance by given name +func New(name string, cfg *DBConfig) (Executor, error) { + switch name { + case "mysql": + return newMysql(cfg) + case "pb": + return newPB(cfg) + default: + return nil, errors.Errorf("unsupport executor type %s", name) + } +} diff --git a/drainer/executor/mysql.go b/drainer/executor/mysql.go new file mode 100644 index 000000000..e64f9fbb0 --- /dev/null +++ b/drainer/executor/mysql.go @@ -0,0 +1,30 @@ +package executor + +import ( + "database/sql" + + "github.com/juju/errors" +) + +type mysqlExecutor struct { + db *sql.DB +} + +func newMysql(cfg *DBConfig) (Executor, error) { + db, err := openDB("mysql", cfg.Host, cfg.Port, cfg.User, cfg.Password) + if err != nil { + return nil, errors.Trace(err) + } + + return &mysqlExecutor{ + db: db, + }, nil +} + +func (m *mysqlExecutor) Execute(sqls []string, args [][]interface{}, commitTSs []int64, isDDL bool) error { + return executeSQLs(m.db, sqls, args, isDDL) +} + +func (m *mysqlExecutor) Close() error { + return m.db.Close() +} diff --git a/drainer/executor/pb.go b/drainer/executor/pb.go new file mode 100644 index 000000000..ce23e5aa9 --- /dev/null +++ b/drainer/executor/pb.go @@ -0,0 +1,70 @@ +package executor + +import ( + "github.com/juju/errors" + "github.com/pingcap/tidb-binlog/pkg/file" + pb "github.com/pingcap/tidb-binlog/proto/binlog" + "github.com/pingcap/tidb-binlog/pump" +) + +type pbExecutor struct { + dir string + binlogger pump.Binlogger +} + +func newPB(cfg *DBConfig) (Executor, error) { + var ( + binlogger pump.Binlogger + err error + ) + dirPath := cfg.BinlogFileDir + names, err := file.ReadDir(dirPath) + if err != nil { + return nil, errors.Trace(err) + } + + if len(names) > 0 { + binlogger, err = pump.OpenBinlogger(dirPath) + } else { + binlogger, err = pump.CreateBinlogger(dirPath) + } + if err != nil { + return nil, errors.Trace(err) + } + + return &pbExecutor{ + dir: cfg.BinlogFileDir, + binlogger: binlogger, + }, nil +} + +func (p *pbExecutor) Execute(sqls []string, args [][]interface{}, commitTSs []int64, isDDL bool) error { + binlog := &pb.Binlog{CommitTs: commitTSs[0]} + if isDDL { + binlog.Tp = pb.BinlogType_DDL + binlog.DdlQuery = []byte(sqls[0]) + return p.saveBinlog(binlog) + } + + binlog.Tp = pb.BinlogType_DML + binlog.DmlData = new(pb.DMLData) + for i := range sqls { + // event can be only pb.Event, otherwise need to panic + event := args[i][0].(*pb.Event) + binlog.DmlData.Events = append(binlog.DmlData.Events, *event) + } + return p.saveBinlog(binlog) +} + +func (p *pbExecutor) Close() error { + return p.binlogger.Close() +} + +func (p *pbExecutor) saveBinlog(binlog *pb.Binlog) error { + data, err := binlog.Marshal() + if err != nil { + return errors.Trace(err) + } + + return p.binlogger.WriteTail(data) +} diff --git a/drainer/executor/util.go b/drainer/executor/util.go new file mode 100644 index 000000000..98d216505 --- /dev/null +++ b/drainer/executor/util.go @@ -0,0 +1,93 @@ +package executor + +import ( + "database/sql" + "fmt" + "time" + + // mysql driver + _ "github.com/go-sql-driver/mysql" + "github.com/juju/errors" + "github.com/ngaut/log" +) + +var ( + maxDMLRetryCount = 100 + maxDDLRetryCount = 5 + retryWaitTime = 3 * time.Second +) + +// DBConfig is the DB configuration. +type DBConfig struct { + Host string `toml:"host" json:"host"` + User string `toml:"user" json:"user"` + Password string `toml:"password" json:"password"` + Port int `toml:"port" json:"port"` + BinlogFileDir string `toml:"dir" json:"dir"` +} + +func executeSQLs(db *sql.DB, sqls []string, args [][]interface{}, isDDL bool) error { + if len(sqls) == 0 { + return nil + } + + retryCount := maxDMLRetryCount + if isDDL { + retryCount = maxDDLRetryCount + } + + var err error + for i := 0; i < retryCount; i++ { + if i > 0 { + log.Warnf("exec sql retry %d - %v - %v", i, sqls, args) + time.Sleep(retryWaitTime) + } + + err = appleTxn(db, sqls, args) + if err == nil { + return nil + } + } + + return errors.Trace(err) +} + +func appleTxn(db *sql.DB, sqls []string, args [][]interface{}) error { + txn, err := db.Begin() + if err != nil { + log.Errorf("exec sqls[%v] begin failed %v", sqls, errors.ErrorStack(err)) + return errors.Trace(err) + } + + for i := range sqls { + log.Debugf("[exec][sql]%s[args]%v", sqls[i], args[i]) + + _, err = txn.Exec(sqls[i], args[i]...) + if err != nil { + log.Warnf("[exec][sql]%s[args]%v[error]%v", sqls[i], args[i], err) + rerr := txn.Rollback() + if rerr != nil { + log.Errorf("[exec][sql]%s[args]%v[error]%v", sqls[i], args[i], rerr) + } + return errors.Trace(err) + } + } + + err = txn.Commit() + if err != nil { + log.Errorf("exec sqls[%v] commit failed %v", sqls, errors.ErrorStack(err)) + return errors.Trace(err) + } + + return nil +} + +func openDB(proto string, host string, port int, username string, password string) (*sql.DB, error) { + dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8&multiStatements=true", username, password, host, port) + db, err := sql.Open(proto, dbDSN) + if err != nil { + return nil, errors.Trace(err) + } + + return db, nil +} diff --git a/drainer/fliter.go b/drainer/fliter.go index 37c06691f..0137f335f 100644 --- a/drainer/fliter.go +++ b/drainer/fliter.go @@ -4,90 +4,90 @@ import ( "regexp" ) -func (d *Executor) genRegexMap() { - for _, db := range d.cfg.DoDBs { +func (s *Syncer) genRegexMap() { + for _, db := range s.cfg.DoDBs { if db[0] != '~' { continue } - if _, ok := d.reMap[db]; !ok { - d.reMap[db] = regexp.MustCompile(db[1:]) + if _, ok := s.reMap[db]; !ok { + s.reMap[db] = regexp.MustCompile(db[1:]) } } - for _, tb := range d.cfg.DoTables { + for _, tb := range s.cfg.DoTables { if tb.Table[0] == '~' { - if _, ok := d.reMap[tb.Table]; !ok { - d.reMap[tb.Table] = regexp.MustCompile(tb.Table[1:]) + if _, ok := s.reMap[tb.Table]; !ok { + s.reMap[tb.Table] = regexp.MustCompile(tb.Table[1:]) } } if tb.Schema[0] == '~' { - if _, ok := d.reMap[tb.Schema]; !ok { - d.reMap[tb.Schema] = regexp.MustCompile(tb.Schema[1:]) + if _, ok := s.reMap[tb.Schema]; !ok { + s.reMap[tb.Schema] = regexp.MustCompile(tb.Schema[1:]) } } } } // whiteFilter whitelist filtering -func (d *Executor) whiteFilter(stbs []TableName) []TableName { +func (s *Syncer) whiteFilter(stbs []TableName) []TableName { var tbs []TableName - if len(d.cfg.DoTables) == 0 && len(d.cfg.DoDBs) == 0 { + if len(s.cfg.DoTables) == 0 && len(s.cfg.DoDBs) == 0 { return stbs } for _, tb := range stbs { - if d.matchTable(d.cfg.DoTables, tb) { + if s.matchTable(s.cfg.DoTables, tb) { tbs = append(tbs, tb) } - if d.matchDB(d.cfg.DoDBs, tb.Schema) { + if s.matchDB(s.cfg.DoDBs, tb.Schema) { tbs = append(tbs, tb) } } return tbs } -func (d *Executor) skipDML(schema string, table string) bool { +func (s *Syncer) skipDML(schema string, table string) bool { tbs := []TableName{{Schema: schema, Table: table}} - tbs = d.whiteFilter(tbs) + tbs = s.whiteFilter(tbs) if len(tbs) == 0 { return true } return false } -func (d *Executor) skipDDL(schema, table string) bool { +func (s *Syncer) skipDDL(schema, table string) bool { tbs := []TableName{{Schema: schema, Table: table}} - tbs = d.whiteFilter(tbs) + tbs = s.whiteFilter(tbs) if len(tbs) == 0 { return true } return false } -func (d *Executor) matchString(pattern string, t string) bool { - if re, ok := d.reMap[pattern]; ok { +func (s *Syncer) matchString(pattern string, t string) bool { + if re, ok := s.reMap[pattern]; ok { return re.MatchString(t) } return pattern == t } -func (d *Executor) matchDB(patternDBS []string, a string) bool { +func (s *Syncer) matchDB(patternDBS []string, a string) bool { for _, b := range patternDBS { - if d.matchString(b, a) { + if s.matchString(b, a) { return true } } return false } -func (d *Executor) matchTable(patternTBS []TableName, tb TableName) bool { +func (s *Syncer) matchTable(patternTBS []TableName, tb TableName) bool { for _, ptb := range patternTBS { - if d.matchString(ptb.Table, tb.Table) && d.matchString(ptb.Schema, tb.Schema) { + if s.matchString(ptb.Table, tb.Table) && s.matchString(ptb.Schema, tb.Schema) { return true } //create database or drop database if tb.Table == "" { - if d.matchString(tb.Schema, ptb.Schema) { + if s.matchString(tb.Schema, ptb.Schema) { return true } } diff --git a/drainer/pump.go b/drainer/pump.go index e6fdd50d5..4a9f7cedc 100644 --- a/drainer/pump.go +++ b/drainer/pump.go @@ -179,7 +179,8 @@ func (p *Pump) mustFindCommitBinlog(t *tikv.LockResolver, startTS int64) { select { case <-p.ctx.Done(): return - default: + // wait a moment to query tikv + case <-time.After(waitTime): } b, ok := p.getPrewriteBinlogEntity(startTS) diff --git a/drainer/server.go b/drainer/server.go index ad2c148c0..18b79faf5 100644 --- a/drainer/server.go +++ b/drainer/server.go @@ -42,7 +42,7 @@ type Server struct { ctx context.Context cancel context.CancelFunc wg sync.WaitGroup - executor *Executor + syncer *Syncer } func init() { @@ -53,7 +53,7 @@ func init() { // NewServer return a instance of binlog-server func NewServer(cfg *Config) (*Server, error) { - ID, err := genCisternID(cfg.ListenAddr) + ID, err := genDrainerID(cfg.ListenAddr) if err != nil { return nil, errors.Trace(err) } @@ -84,12 +84,12 @@ func NewServer(cfg *Config) (*Server, error) { } ctx, cancel := context.WithCancel(context.Background()) - executor, err := NewExecutor(ctx, meta, cfg.ExecutorCfg) + syncer, err := NewSyncer(ctx, meta, cfg.SyncerCfg) if err != nil { return nil, errors.Trace(err) } - c, err := NewCollector(cfg, clusterID, win, executor, meta) + c, err := NewCollector(cfg, clusterID, win, syncer, meta) if err != nil { return nil, errors.Trace(err) } @@ -112,7 +112,7 @@ func NewServer(cfg *Config) (*Server, error) { gs: grpc.NewServer(), ctx: ctx, cancel: cancel, - executor: executor, + syncer: syncer, }, nil } @@ -166,14 +166,14 @@ func (s *Server) StartMetrics() { }() } -// StartExecutor runs a executor in a goroutine -func (s *Server) StartExecutor(jobs []*model.Job) { +// StartSyncer runs a syncer in a goroutine +func (s *Server) StartSyncer(jobs []*model.Job) { s.wg.Add(1) go func() { defer s.wg.Done() - err := s.executor.Start(jobs) + err := s.syncer.Start(jobs) if err != nil { - log.Errorf("executor exited, error %v", err) + log.Errorf("syncer exited, error %v", err) } s.Close() }() @@ -237,8 +237,8 @@ func (s *Server) Start() error { // collect metrics to prometheus s.StartMetrics() - // start a executor - s.StartExecutor(jobs) + // start a syncer + s.StartSyncer(jobs) // start a TCP listener tcpURL, err := url.Parse(s.tcpAddr) @@ -267,8 +267,8 @@ func (s *Server) Start() error { func (s *Server) Close() { // stop gRPC server s.gs.Stop() - // stop executor - s.executor.Close() + // stop syncer + s.syncer.Close() // notify all goroutines to exit s.cancel() // waiting for goroutines exit diff --git a/drainer/executor.go b/drainer/syncer.go similarity index 54% rename from drainer/executor.go rename to drainer/syncer.go index 6b21b12d6..4b756bdad 100644 --- a/drainer/executor.go +++ b/drainer/syncer.go @@ -1,7 +1,6 @@ package drainer import ( - "database/sql" "regexp" "sync" "time" @@ -10,6 +9,7 @@ import ( "github.com/juju/errors" "github.com/ngaut/log" + "github.com/pingcap/tidb-binlog/drainer/executor" "github.com/pingcap/tidb-binlog/drainer/translator" "github.com/pingcap/tidb/model" pb "github.com/pingcap/tipb/go-binlog" @@ -23,12 +23,12 @@ var ( maxExecutionWaitTime = 3 * time.Second ) -// Executor converts tidb binlog to the specified DB sqls, and sync it to target DB -type Executor struct { +// Syncer converts tidb binlog to the specified DB sqls, and sync it to target DB +type Syncer struct { schema *Schema meta Meta - cfg *ExecutorConfig + cfg *SyncerConfig translator translator.SQLTranslator @@ -38,7 +38,7 @@ type Executor struct { jobWg sync.WaitGroup jobCh []chan *job - toDBs []*sql.DB + executors []executor.Executor poss map[string]pb.Pos initCommitTS int64 @@ -51,19 +51,19 @@ type Executor struct { reMap map[string]*regexp.Regexp } -// NewExecutor returns a Drainer instance -func NewExecutor(ctx context.Context, meta Meta, cfg *ExecutorConfig) (*Executor, error) { - executor := new(Executor) - executor.cfg = cfg - executor.ignoreSchemaNames = formatIgnoreSchemas(cfg.IgnoreSchemas) - executor.meta = meta - executor.input = make(chan *binlogItem, 1024*cfg.WorkerCount) - executor.jobCh = newJobChans(cfg.WorkerCount) - executor.reMap = make(map[string]*regexp.Regexp) - executor.ctx, executor.cancel = context.WithCancel(ctx) - executor.initCommitTS, executor.poss = meta.Pos() - - return executor, nil +// NewSyncer returns a Drainer instance +func NewSyncer(ctx context.Context, meta Meta, cfg *SyncerConfig) (*Syncer, error) { + syncer := new(Syncer) + syncer.cfg = cfg + syncer.ignoreSchemaNames = formatIgnoreSchemas(cfg.IgnoreSchemas) + syncer.meta = meta + syncer.input = make(chan *binlogItem, 1024*cfg.WorkerCount) + syncer.jobCh = newJobChans(cfg.WorkerCount) + syncer.reMap = make(map[string]*regexp.Regexp) + syncer.ctx, syncer.cancel = context.WithCancel(ctx) + syncer.initCommitTS, syncer.poss = meta.Pos() + + return syncer, nil } func newJobChans(count int) []chan *job { @@ -76,14 +76,14 @@ func newJobChans(count int) []chan *job { } // Start starts to sync. -func (d *Executor) Start(jobs []*model.Job) error { +func (s *Syncer) Start(jobs []*model.Job) error { // prepare schema for work - b, err := d.prepare(jobs) + b, err := s.prepare(jobs) if err != nil || b == nil { return errors.Trace(err) } - err = d.run(b) + err = s.run(b) if err != nil { return errors.Trace(err) } @@ -93,22 +93,22 @@ func (d *Executor) Start(jobs []*model.Job) error { // the binlog maybe not complete before the initCommitTS, so we should ignore them. // at the same time, we try to find the latest schema version before the initCommitTS to reconstruct local schemas. -func (d *Executor) prepare(jobs []*model.Job) (*binlogItem, error) { +func (s *Syncer) prepare(jobs []*model.Job) (*binlogItem, error) { var latestSchemaVersion int64 var b *binlogItem var err error for { select { - case <-d.ctx.Done(): + case <-s.ctx.Done(): return nil, nil - case b = <-d.input: + case b = <-s.input: } binlog := b.binlog commitTS := binlog.GetCommitTs() jobID := binlog.GetDdlJobId() - if commitTS <= d.initCommitTS { + if commitTS <= s.initCommitTS { if jobID > 0 { latestSchemaVersion = b.job.BinlogInfo.SchemaVersion } @@ -141,7 +141,7 @@ func (d *Executor) prepare(jobs []*model.Job) (*binlogItem, error) { } } - d.schema, err = NewSchema(exceptedJobs, d.ignoreSchemaNames) + s.schema, err = NewSchema(exceptedJobs, s.ignoreSchemaNames) if err != nil { return nil, errors.Trace(err) } @@ -155,7 +155,7 @@ func (d *Executor) prepare(jobs []*model.Job) (*binlogItem, error) { // the second value[string]: the table name // the third value[string]: the sql that is corresponding to the job // the fourth value[error]: the handleDDL execution's err -func (d *Executor) handleDDL(job *model.Job) (string, string, string, error) { +func (s *Syncer) handleDDL(job *model.Job) (string, string, string, error) { if job.State == model.JobCancelled { return "", "", "", nil } @@ -169,12 +169,12 @@ func (d *Executor) handleDDL(job *model.Job) (string, string, string, error) { case model.ActionCreateSchema: // get the DBInfo from job rawArgs schema := job.BinlogInfo.DBInfo - if filterIgnoreSchema(schema, d.ignoreSchemaNames) { - d.schema.AddIgnoreSchema(schema) + if filterIgnoreSchema(schema, s.ignoreSchemaNames) { + s.schema.AddIgnoreSchema(schema) return "", "", "", nil } - err := d.schema.CreateSchema(schema) + err := s.schema.CreateSchema(schema) if err != nil { return "", "", "", errors.Trace(err) } @@ -182,13 +182,13 @@ func (d *Executor) handleDDL(job *model.Job) (string, string, string, error) { return schema.Name.L, "", sql, nil case model.ActionDropSchema: - _, ok := d.schema.IgnoreSchemaByID(job.SchemaID) + _, ok := s.schema.IgnoreSchemaByID(job.SchemaID) if ok { - d.schema.DropIgnoreSchema(job.SchemaID) + s.schema.DropIgnoreSchema(job.SchemaID) return "", "", "", nil } - schemaName, err := d.schema.DropSchema(job.SchemaID) + schemaName, err := s.schema.DropSchema(job.SchemaID) if err != nil { return "", "", "", errors.Trace(err) } @@ -197,27 +197,27 @@ func (d *Executor) handleDDL(job *model.Job) (string, string, string, error) { case model.ActionRenameTable: // ignore schema doesn't support reanme ddl - _, ok := d.schema.SchemaByTableID(job.TableID) + _, ok := s.schema.SchemaByTableID(job.TableID) if !ok { return "", "", "", errors.NotFoundf("table(%d) or it's schema", job.TableID) } - _, ok = d.schema.IgnoreSchemaByID(job.SchemaID) + _, ok = s.schema.IgnoreSchemaByID(job.SchemaID) if ok { return "", "", "", errors.Errorf("ignore schema %d doesn't support rename ddl sql %s", job.SchemaID, sql) } // first drop the table - _, err := d.schema.DropTable(job.TableID) + _, err := s.schema.DropTable(job.TableID) if err != nil { return "", "", "", errors.Trace(err) } // create table table := job.BinlogInfo.TableInfo - schema, ok := d.schema.SchemaByID(job.SchemaID) + schema, ok := s.schema.SchemaByID(job.SchemaID) if !ok { return "", "", "", errors.NotFoundf("schema %d", job.SchemaID) } - err = d.schema.CreateTable(schema, table) + err = s.schema.CreateTable(schema, table) if err != nil { return "", "", "", errors.Trace(err) } @@ -230,17 +230,17 @@ func (d *Executor) handleDDL(job *model.Job) (string, string, string, error) { return "", "", "", errors.NotFoundf("table %d", job.TableID) } - _, ok := d.schema.IgnoreSchemaByID(job.SchemaID) + _, ok := s.schema.IgnoreSchemaByID(job.SchemaID) if ok { return "", "", "", nil } - schema, ok := d.schema.SchemaByID(job.SchemaID) + schema, ok := s.schema.SchemaByID(job.SchemaID) if !ok { return "", "", "", errors.NotFoundf("schema %d", job.SchemaID) } - err := d.schema.CreateTable(schema, table) + err := s.schema.CreateTable(schema, table) if err != nil { return "", "", "", errors.Trace(err) } @@ -248,17 +248,17 @@ func (d *Executor) handleDDL(job *model.Job) (string, string, string, error) { return schema.Name.L, table.Name.L, sql, nil case model.ActionDropTable: - _, ok := d.schema.IgnoreSchemaByID(job.SchemaID) + _, ok := s.schema.IgnoreSchemaByID(job.SchemaID) if ok { return "", "", "", nil } - schema, ok := d.schema.SchemaByID(job.SchemaID) + schema, ok := s.schema.SchemaByID(job.SchemaID) if !ok { return "", "", "", errors.NotFoundf("schema %d", job.SchemaID) } - tableName, err := d.schema.DropTable(job.TableID) + tableName, err := s.schema.DropTable(job.TableID) if err != nil { return "", "", "", errors.Trace(err) } @@ -266,17 +266,17 @@ func (d *Executor) handleDDL(job *model.Job) (string, string, string, error) { return schema.Name.L, tableName, sql, nil case model.ActionTruncateTable: - _, ok := d.schema.IgnoreSchemaByID(job.SchemaID) + _, ok := s.schema.IgnoreSchemaByID(job.SchemaID) if ok { return "", "", "", nil } - schema, ok := d.schema.SchemaByID(job.SchemaID) + schema, ok := s.schema.SchemaByID(job.SchemaID) if !ok { return "", "", "", errors.NotFoundf("schema %d", job.SchemaID) } - _, err := d.schema.DropTable(job.TableID) + _, err := s.schema.DropTable(job.TableID) if err != nil { return "", "", "", errors.Trace(err) } @@ -286,7 +286,7 @@ func (d *Executor) handleDDL(job *model.Job) (string, string, string, error) { return "", "", "", errors.NotFoundf("table %d", job.TableID) } - err = d.schema.CreateTable(schema, table) + err = s.schema.CreateTable(schema, table) if err != nil { return "", "", "", errors.Trace(err) } @@ -299,17 +299,17 @@ func (d *Executor) handleDDL(job *model.Job) (string, string, string, error) { return "", "", "", errors.NotFoundf("table %d", job.TableID) } - _, ok := d.schema.IgnoreSchemaByID(job.SchemaID) + _, ok := s.schema.IgnoreSchemaByID(job.SchemaID) if ok { return "", "", "", nil } - schema, ok := d.schema.SchemaByID(job.SchemaID) + schema, ok := s.schema.SchemaByID(job.SchemaID) if !ok { return "", "", "", errors.NotFoundf("schema %d", job.SchemaID) } - err := d.schema.ReplaceTable(tbInfo) + err := s.schema.ReplaceTable(tbInfo) if err != nil { return "", "", "", errors.Trace(err) } @@ -318,66 +318,79 @@ func (d *Executor) handleDDL(job *model.Job) (string, string, string, error) { } } -func (d *Executor) addCount(tp translator.OpType, nums int) { +func (s *Syncer) addDMLCount(tp pb.MutationType, nums int) { switch tp { - case translator.Insert: + case pb.MutationType_Insert: eventCounter.WithLabelValues("Insert").Add(float64(nums)) - case translator.Update: + case pb.MutationType_Update: eventCounter.WithLabelValues("Update").Add(float64(nums)) - case translator.Del: + case pb.MutationType_DeleteRow: eventCounter.WithLabelValues("Delete").Add(float64(nums)) - case translator.DDL: - eventCounter.WithLabelValues("DDL").Add(float64(nums)) } } -func (d *Executor) checkWait(job *job) bool { - if job.tp == translator.DDL { +func (s *Syncer) addDDLCount() { + eventCounter.WithLabelValues("DDL").Add(1) +} + +func (s *Syncer) checkWait(job *job) bool { + if job.binlogTp == translator.DDL { return true } - if d.meta.Check() { + if (!s.cfg.DisableDispatch && s.meta.Check()) && job.isCompleteBinlog { return true } return false } type job struct { - tp translator.OpType - sql string - args []interface{} - key string - commitTS int64 - pos pb.Pos - nodeID string + binlogTp translator.OpType + mutationTp pb.MutationType + sql string + args []interface{} + key string + commitTS int64 + pos pb.Pos + nodeID string + isCompleteBinlog bool +} + +func newDMLJob(tp pb.MutationType, sql string, args []interface{}, key string, commitTS int64, pos pb.Pos, nodeID string) *job { + return &job{binlogTp: translator.DML, mutationTp: tp, sql: sql, args: args, key: key, commitTS: commitTS, pos: pos, nodeID: nodeID} +} + +func newDDLJob(sql string, args []interface{}, key string, commitTS int64, pos pb.Pos, nodeID string) *job { + return &job{binlogTp: translator.DDL, sql: sql, args: args, key: key, commitTS: commitTS, pos: pos, nodeID: nodeID} } -func newJob(tp translator.OpType, sql string, args []interface{}, key string, commitTS int64, pos pb.Pos, nodeID string) *job { - return &job{tp: tp, sql: sql, args: args, key: key, commitTS: commitTS, pos: pos, nodeID: nodeID} +// binlog bounadary job is used to group jobs, like a barrier +func newBinlogBoundaryJob(commitTS int64, pos pb.Pos, nodeID string) *job { + return &job{binlogTp: translator.DML, commitTS: commitTS, pos: pos, nodeID: nodeID, isCompleteBinlog: true} } -func (d *Executor) addJob(job *job) { +func (s *Syncer) addJob(job *job) { // make all DMLs be executed before DDL - if job.tp == translator.DDL { - d.jobWg.Wait() + if job.binlogTp == translator.DDL { + s.jobWg.Wait() } - d.jobWg.Add(1) - idx := int(genHashKey(job.key)) % d.cfg.WorkerCount - d.jobCh[idx] <- job + s.jobWg.Add(1) + idx := int(genHashKey(job.key)) % s.cfg.WorkerCount + s.jobCh[idx] <- job - if pos, ok := d.poss[job.nodeID]; !ok || pos.Suffix < job.pos.Suffix { - d.poss[job.nodeID] = job.pos + if pos, ok := s.poss[job.nodeID]; !ok || pos.Suffix < job.pos.Suffix { + s.poss[job.nodeID] = job.pos } - wait := d.checkWait(job) + wait := s.checkWait(job) if wait { - d.jobWg.Wait() - d.savePoint(job.commitTS, d.poss) + s.jobWg.Wait() + s.savePoint(job.commitTS, s.poss) } } -func (d *Executor) savePoint(ts int64, poss map[string]pb.Pos) { - err := d.meta.Save(ts, poss) +func (s *Syncer) savePoint(ts int64, poss map[string]pb.Pos) { + err := s.meta.Save(ts, poss) if err != nil { log.Fatalf("[write save point]%d[error]%v", ts, err) } @@ -385,28 +398,30 @@ func (d *Executor) savePoint(ts int64, poss map[string]pb.Pos) { positionGauge.Set(float64(ts)) } -func (d *Executor) sync(db *sql.DB, jobChan chan *job) { - d.wg.Add(1) - defer d.wg.Done() +func (s *Syncer) sync(executor executor.Executor, jobChan chan *job) { + s.wg.Add(1) + defer s.wg.Done() idx := 0 - count := d.cfg.TxnBatch + count := s.cfg.TxnBatch sqls := make([]string, 0, count) args := make([][]interface{}, 0, count) + commitTSs := make([]int64, 0, count) lastSyncTime := time.Now() - tpCnt := make(map[translator.OpType]int) + tpCnt := make(map[pb.MutationType]int) clearF := func() { for i := 0; i < idx; i++ { - d.jobWg.Done() + s.jobWg.Done() } idx = 0 sqls = sqls[0:0] args = args[0:0] + commitTSs = commitTSs[0:0] lastSyncTime = time.Now() for tpName, v := range tpCnt { - d.addCount(tpName, v) + s.addDMLCount(tpName, v) tpCnt[tpName] = 0 } } @@ -420,8 +435,9 @@ func (d *Executor) sync(db *sql.DB, jobChan chan *job) { } idx++ - if job.tp == translator.DDL { - err = executeSQLs(db, []string{job.sql}, [][]interface{}{job.args}, true) + if job.binlogTp == translator.DDL { + // compute txn duration + err = execute(executor, []string{job.sql}, [][]interface{}{job.args}, []int64{job.commitTS}, true) if err != nil { if !ignoreDDLError(err) { log.Fatalf(errors.ErrorStack(err)) @@ -429,16 +445,17 @@ func (d *Executor) sync(db *sql.DB, jobChan chan *job) { log.Warnf("[ignore ddl error][sql]%s[args]%v[error]%v", job.sql, job.args, err) } } - tpCnt[job.tp]++ + s.addDDLCount() clearF() - } else { + } else if !job.isCompleteBinlog { sqls = append(sqls, job.sql) args = append(args, job.args) - tpCnt[job.tp]++ + commitTSs = append(commitTSs, job.commitTS) + tpCnt[job.mutationTp]++ } - if idx >= count { - err = executeSQLs(db, sqls, args, false) + if (!s.cfg.DisableDispatch && idx >= count) || job.isCompleteBinlog { + err = executor.Execute(sqls, args, commitTSs, false) if err != nil { log.Fatalf(errors.ErrorStack(err)) } @@ -447,8 +464,8 @@ func (d *Executor) sync(db *sql.DB, jobChan chan *job) { default: now := time.Now() - if now.Sub(lastSyncTime) >= maxExecutionWaitTime { - err = executeSQLs(db, sqls, args, false) + if now.Sub(lastSyncTime) >= maxExecutionWaitTime && !s.cfg.DisableDispatch { + err = executor.Execute(sqls, args, commitTSs, false) if err != nil { log.Fatalf(errors.ErrorStack(err)) } @@ -460,22 +477,22 @@ func (d *Executor) sync(db *sql.DB, jobChan chan *job) { } } -func (d *Executor) run(b *binlogItem) error { +func (s *Syncer) run(b *binlogItem) error { var err error - d.genRegexMap() - d.toDBs, err = createDBs(d.cfg.DestDBType, d.cfg.To, d.cfg.WorkerCount) + s.genRegexMap() + s.executors, err = createExecutors(s.cfg.DestDBType, s.cfg.To, s.cfg.WorkerCount) if err != nil { return errors.Trace(err) } - d.translator, err = translator.New(d.cfg.DestDBType) + s.translator, err = translator.New(s.cfg.DestDBType) if err != nil { return errors.Trace(err) } - for i := 0; i < d.cfg.WorkerCount; i++ { - go d.sync(d.toDBs[i], d.jobCh[i]) + for i := 0; i < s.cfg.WorkerCount; i++ { + go s.sync(s.executors[i], s.jobCh[i]) } for { @@ -490,130 +507,116 @@ func (d *Executor) run(b *binlogItem) error { if err != nil { return errors.Errorf("prewrite %s unmarshal error %v", preWriteValue, err) } - err = d.translateSqls(preWrite.GetMutations(), commitTS, b.pos, b.nodeID) + err = s.translateSqls(preWrite.GetMutations(), commitTS, b.pos, b.nodeID) if err != nil { return errors.Trace(err) } + // send binlog boundary job for dml binlog, disdispatch also disables batch + if s.cfg.DisableDispatch { + s.addJob(newBinlogBoundaryJob(commitTS, b.pos, b.nodeID)) + } + } else if jobID > 0 { - schema, table, sql, err := d.handleDDL(b.job) + schema, table, sql, err := s.handleDDL(b.job) if err != nil { return errors.Trace(err) } - if d.skipDDL(schema, table) { + if s.skipDDL(schema, table) { log.Debugf("[skip ddl]db:%s table:%s, sql:%s, commit ts %d, pos %v", schema, table, sql, commitTS, b.pos) } else if sql != "" { - sql, err = d.translator.GenDDLSQL(sql, schema) + sql, err = s.translator.GenDDLSQL(sql, schema) if err != nil { return errors.Trace(err) } log.Infof("[ddl][start]%s[commit ts]%v[pos]%v", sql, commitTS, b.pos) - job := newJob(translator.DDL, sql, nil, "", commitTS, b.pos, b.nodeID) - d.addJob(job) + job := newDDLJob(sql, nil, "", commitTS, b.pos, b.nodeID) + s.addJob(job) log.Infof("[ddl][end]%s[commit ts]%v[pos]%v", sql, commitTS, b.pos) } } select { - case <-d.ctx.Done(): + case <-s.ctx.Done(): return nil - case b = <-d.input: + case b = <-s.input: } } } -func (d *Executor) translateSqls(mutations []pb.TableMutation, commitTS int64, pos pb.Pos, nodeID string) error { +func (s *Syncer) translateSqls(mutations []pb.TableMutation, commitTS int64, pos pb.Pos, nodeID string) error { for _, mutation := range mutations { - table, ok := d.schema.TableByID(mutation.GetTableId()) + table, ok := s.schema.TableByID(mutation.GetTableId()) if !ok { continue } - schemaName, tableName, ok := d.schema.SchemaAndTableName(mutation.GetTableId()) + schemaName, tableName, ok := s.schema.SchemaAndTableName(mutation.GetTableId()) if !ok { continue } - if d.skipDML(schemaName, tableName) { + if s.skipDML(schemaName, tableName) { log.Debugf("[skip dml]db:%s table:%s", schemaName, tableName) continue } var ( - err error - - // the restored sqls, 0 => insert, 1 => update, 2 => deleteByIds, 3 => deleteByPks, 4 => deleteByRows - sqls = make([][]string, 5) + err error + sqls = make(map[pb.MutationType][]string) // the dispatch keys - keys = make([][]string, 5) + keys = make(map[pb.MutationType][]string) // the restored sqls's args, ditto - args = make([][][]interface{}, 5) + args = make(map[pb.MutationType][][]interface{}) // the offset of specified type sql - offsets = make([]int, 5) + offsets = make(map[pb.MutationType]int) // the binlog dml sort sequences = mutation.GetSequence() - - // sql opType - tps = []translator.OpType{translator.Insert, translator.Update, translator.Del, translator.Del, translator.Del} ) if len(mutation.GetInsertedRows()) > 0 { - sqls[0], keys[0], args[0], err = d.translator.GenInsertSQLs(schemaName, table, mutation.GetInsertedRows()) + sqls[pb.MutationType_Insert], keys[pb.MutationType_Insert], args[pb.MutationType_Insert], err = s.translator.GenInsertSQLs(schemaName, table, mutation.GetInsertedRows()) if err != nil { return errors.Errorf("gen insert sqls failed: %v, schema: %s, table: %s", err, schemaName, tableName) } + offsets[pb.MutationType_Insert] = 0 } if len(mutation.GetUpdatedRows()) > 0 { - sqls[1], keys[1], args[1], err = d.translator.GenUpdateSQLs(schemaName, table, mutation.GetUpdatedRows()) + sqls[pb.MutationType_Update], keys[pb.MutationType_Update], args[pb.MutationType_Update], err = s.translator.GenUpdateSQLs(schemaName, table, mutation.GetUpdatedRows()) if err != nil { return errors.Errorf("gen update sqls failed: %v, schema: %s, table: %s", err, schemaName, tableName) } - } - - if len(mutation.GetDeletedIds()) > 0 { - sqls[2], keys[2], args[2], err = d.translator.GenDeleteSQLsByID(schemaName, table, mutation.GetDeletedIds()) - if err != nil { - return errors.Errorf("gen delete sqls failed: %v, schema: %s, table: %s", err, schemaName, tableName) - } - } - - if len(mutation.GetDeletedPks()) > 0 { - sqls[3], keys[3], args[3], err = d.translator.GenDeleteSQLs(schemaName, table, translator.DelByPK, mutation.GetDeletedPks()) - if err != nil { - return errors.Errorf("gen delete sqls failed: %v, schema: %s, table: %s", err, schemaName, tableName) - } + offsets[pb.MutationType_Update] = 0 } if len(mutation.GetDeletedRows()) > 0 { - sqls[4], keys[4], args[4], err = d.translator.GenDeleteSQLs(schemaName, table, translator.DelByCol, mutation.GetDeletedRows()) + sqls[pb.MutationType_DeleteRow], keys[pb.MutationType_DeleteRow], args[pb.MutationType_DeleteRow], err = s.translator.GenDeleteSQLs(schemaName, table, mutation.GetDeletedRows()) if err != nil { return errors.Errorf("gen delete sqls failed: %v, schema: %s, table: %s", err, schemaName, tableName) } + offsets[pb.MutationType_DeleteRow] = 0 } for _, dmlType := range sequences { - index := int32(dmlType) - if offsets[index] >= len(sqls[index]) { - return errors.Errorf("gen sqls failed: sequence %v execution %s sqls %v", sequences, dmlType, sqls[index]) + if offsets[dmlType] >= len(sqls[dmlType]) { + return errors.Errorf("gen sqls failed: sequence %v execution %s sqls %v", sequences, dmlType, sqls[dmlType]) } - job := newJob(tps[index], sqls[index][offsets[index]], args[index][offsets[index]], keys[index][offsets[index]], commitTS, pos, nodeID) - d.addJob(job) - offsets[index] = offsets[index] + 1 + job := newDMLJob(dmlType, sqls[dmlType][offsets[dmlType]], args[dmlType][offsets[dmlType]], keys[dmlType][offsets[dmlType]], commitTS, pos, nodeID) + s.addJob(job) + offsets[dmlType] = offsets[dmlType] + 1 } - // Compatible with the old format that don't have sequence, will be remove in the futhure - for i := 0; i < 5; i++ { - for j := offsets[i]; j < len(sqls[i]); j++ { - job := newJob(tps[i], sqls[i][j], args[i][j], keys[i][j], commitTS, pos, nodeID) - d.addJob(job) + for tp := range sqls { + if offsets[tp] != len(sqls[tp]) { + return errors.Errorf("binlog is corruption, item %v", mutations) } } } @@ -621,14 +624,14 @@ func (d *Executor) translateSqls(mutations []pb.TableMutation, commitTS int64, p return nil } -// AddToExectorChan adds binlogItem to the Executor's input channel -func (d *Executor) AddToExectorChan(b *binlogItem) { - d.input <- b +// Add adds binlogItem to the syncer's input channel +func (s *Syncer) Add(b *binlogItem) { + s.input <- b } // Close closes syncer. -func (d *Executor) Close() { - d.cancel() - d.wg.Wait() - closeDBs(d.toDBs...) +func (s *Syncer) Close() { + s.cancel() + s.wg.Wait() + closeExecutors(s.executors...) } diff --git a/drainer/executor_test.go b/drainer/syncer_test.go similarity index 83% rename from drainer/executor_test.go rename to drainer/syncer_test.go index c9258d101..afaf83226 100644 --- a/drainer/executor_test.go +++ b/drainer/syncer_test.go @@ -9,9 +9,9 @@ import ( func (t *testDrainerSuite) TestHandleDDL(c *C) { var err error - d := &Executor{} - d.ignoreSchemaNames = make(map[string]struct{}) - d.schema, err = NewSchema(nil, nil) + s := &Syncer{} + s.ignoreSchemaNames = make(map[string]struct{}) + s.schema, err = NewSchema(nil, nil) c.Assert(err, IsNil) dbName := model.NewCIStr("Test") ignoreDBName := model.NewCIStr("ignoreTest") @@ -20,13 +20,13 @@ func (t *testDrainerSuite) TestHandleDDL(c *C) { // check cancelled job job := &model.Job{ID: 1, State: model.JobCancelled} - _, _, sql, err := d.handleDDL(job) + _, _, sql, err := s.handleDDL(job) c.Assert(err, IsNil) c.Assert(sql, Equals, "") // check job.Query is empty job = &model.Job{ID: 1, State: model.JobDone} - _, _, sql, err = d.handleDDL(job) + _, _, sql, err = s.handleDDL(job) c.Assert(sql, Equals, "") c.Assert(err, NotNil, Commentf("should return not found job.Query")) @@ -58,7 +58,7 @@ func (t *testDrainerSuite) TestHandleDDL(c *C) { } tblInfo.Columns = []*model.ColumnInfo{colInfo} - d.ignoreSchemaNames[ingnoreDBInfo.Name.L] = struct{}{} + s.ignoreSchemaNames[ingnoreDBInfo.Name.L] = struct{}{} testCases := []struct { name string @@ -98,34 +98,34 @@ func (t *testDrainerSuite) TestHandleDDL(c *C) { BinlogInfo: testCase.binlogInfo, Query: testCase.query, } - testDoDDLAndCheck(c, d, job, false, testCase.resultQuery, testCase.schemaName, testCase.tableName) + testDoDDLAndCheck(c, s, job, false, testCase.resultQuery, testCase.schemaName, testCase.tableName) // custom check after ddl switch testCase.name { case "createSchema": - _, ok := d.schema.SchemaByID(dbInfo.ID) + _, ok := s.schema.SchemaByID(dbInfo.ID) c.Assert(ok, IsTrue) case "createTable": - _, ok := d.schema.TableByID(tblInfo.ID) + _, ok := s.schema.TableByID(tblInfo.ID) c.Assert(ok, IsTrue) case "addColumn", "truncateTable": - tb, ok := d.schema.TableByID(tblInfo.ID) + tb, ok := s.schema.TableByID(tblInfo.ID) c.Assert(ok, IsTrue) c.Assert(tb.Columns, HasLen, 1) case "dropTable": - _, ok := d.schema.TableByID(tblInfo.ID) + _, ok := s.schema.TableByID(tblInfo.ID) c.Assert(ok, IsFalse) case "dropSchema": - _, ok := d.schema.SchemaByID(job.SchemaID) + _, ok := s.schema.SchemaByID(job.SchemaID) c.Assert(ok, IsFalse) } } } -func testDoDDLAndCheck(c *C, d *Executor, job *model.Job, isErr bool, sql string, schema string, table string) { - schemaName, tableName, s, err := d.handleDDL(job) +func testDoDDLAndCheck(c *C, s *Syncer, job *model.Job, isErr bool, sql string, schema string, table string) { + schemaName, tableName, resSQL, err := s.handleDDL(job) c.Assert(err != nil, Equals, isErr) - c.Assert(sql, Equals, s) + c.Assert(sql, Equals, resSQL) c.Assert(schemaName, Equals, schema) c.Assert(tableName, Equals, table) } diff --git a/drainer/translator/mysql.go b/drainer/translator/mysql.go index f81fde482..b188ffc40 100644 --- a/drainer/translator/mysql.go +++ b/drainer/translator/mysql.go @@ -30,7 +30,7 @@ func (m *mysqlTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r columnList := m.genColumnList(columns) columnPlaceholders := m.genColumnPlaceholders((len(columns))) - sql := fmt.Sprintf("replace into %s.%s (%s) values (%s);", schema, table.Name, columnList, columnPlaceholders) + sql := fmt.Sprintf("replace into `%s`.`%s` (%s) values (%s);", schema, table.Name, columnList, columnPlaceholders) for _, row := range rows { //decode the pk value @@ -50,7 +50,7 @@ func (m *mysqlTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r } if len(r)%2 != 0 { - return nil, nil, nil, errors.Errorf("table %s.%s insert row raw data is corruption %v", schema, table.Name, r) + return nil, nil, nil, errors.Errorf("table `%s`.`%s` insert row raw data is corruption %v", schema, table.Name, r) } var columnValues = make(map[int64]types.Datum) @@ -60,7 +60,7 @@ func (m *mysqlTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r var vals []interface{} for _, col := range columns { - if m.isPKHandleColumn(table, col) { + if isPKHandleColumn(table, col) { columnValues[col.ID] = pk vals = append(vals, pk.GetValue()) continue @@ -82,14 +82,9 @@ func (m *mysqlTranslator) GenInsertSQLs(schema string, table *model.TableInfo, r sqls = append(sqls, sql) values = append(values, vals) - // generate dispatching key // find primary keys - pcs, err := m.pkIndexColumns(table) - if err != nil { - return nil, nil, nil, errors.Trace(err) - } - key, err := m.generateDispatchKey(pcs, columnValues) + key, err := m.generateDispatchKey(table, columnValues) if err != nil { return nil, nil, nil, errors.Trace(err) } @@ -110,68 +105,31 @@ func (m *mysqlTranslator) GenUpdateSQLs(schema string, table *model.TableInfo, r var oldValues []interface{} var newValues []interface{} - // it has pkHandle, get the columm - pcs, err := m.pkIndexColumns(table) - if err != nil { - return nil, nil, nil, errors.Trace(err) - } - - // decode one to get the pk - if pcs != nil { - remain, _, err := codec.DecodeOne(row) - if err != nil { - return nil, nil, nil, errors.Trace(err) - } - row = remain - } - - // the format - // 1 have pk index columns: [pk, colID, colVal, colID,..] - // the pk index columns' values are constant, we can make up the where condition - // from [..., colID, colVal, colID,..] directly - // 2 no pk index columns: [oldColID, oldColVal, ..., newColID, colVal, ..] - r, err := codec.Decode(row, 2*(len(columns)-1)) + r, err := codec.Decode(row, 2*len(columns)) if err != nil { return nil, nil, nil, errors.Trace(err) } if len(r)%2 != 0 { - return nil, nil, nil, errors.Errorf("table %s.%s update row data is corruption %v", schema, table.Name, r) + return nil, nil, nil, errors.Errorf("table `%s`.`%s` update row data is corruption %v", schema, table.Name, r) } - // TODO: if meet old schema that before drop pk index, - // (now we don't have pk indexs), It can't work well. var i int columnValues := make(map[int64]types.Datum) - if pcs == nil { - for ; i < len(r)/2; i += 2 { - columnValues[r[i].GetInt64()] = r[i+1] - } - - updateColumns, oldValues, err = m.generateColumnAndValue(columns, columnValues) - if err != nil { - return nil, nil, nil, errors.Trace(err) - } - - columnValues = make(map[int64]types.Datum) - for ; i < len(r); i += 2 { - columnValues[r[i].GetInt64()] = r[i+1] - } - keys = append(keys, "[]") - } else { - for ; i < len(r); i += 2 { - columnValues[r[i].GetInt64()] = r[i+1] - } - - updateColumns, oldValues, err = m.generateColumnAndValue(pcs, columnValues) - if err != nil { - return nil, nil, nil, errors.Trace(err) - } - keys = append(keys, fmt.Sprintf("%v", oldValues)) + for ; i < len(r)/2; i += 2 { + columnValues[r[i].GetInt64()] = r[i+1] } + updateColumns, oldValues, err = m.generateColumnAndValue(columns, columnValues) + if err != nil { + return nil, nil, nil, errors.Trace(err) + } whereColumns := updateColumns - updateColumns = nil + + columnValues = make(map[int64]types.Datum) + for ; i < len(r); i += 2 { + columnValues[r[i].GetInt64()] = r[i+1] + } updateColumns, newValues, err = m.generateColumnAndValue(columns, columnValues) if err != nil { @@ -184,40 +142,23 @@ func (m *mysqlTranslator) GenUpdateSQLs(schema string, table *model.TableInfo, r value = append(value, oldValues...) where := m.genWhere(whereColumns, oldValues) - sql := fmt.Sprintf("update %s.%s set %s where %s limit 1;", schema, table.Name.L, kvs, where) + sql := fmt.Sprintf("update `%s`.`%s` set %s where %s limit 1;", schema, table.Name.L, kvs, where) sqls = append(sqls, sql) values = append(values, value) - } - - return sqls, keys, values, nil -} - -func (m *mysqlTranslator) GenDeleteSQLsByID(schema string, table *model.TableInfo, rows []int64) ([]string, []string, [][]interface{}, error) { - sqls := make([]string, 0, len(rows)) - keys := make([]string, 0, len(rows)) - values := make([][]interface{}, 0, len(rows)) - column := m.pkHandleColumn(table) - if column == nil { - return nil, nil, nil, errors.Errorf("table %s.%s doesn't have pkHandle column", schema, table.Name) - } - whereColumns := []*model.ColumnInfo{column} - - for _, rowID := range rows { - var value []interface{} - value = append(value, rowID) - - where := m.genWhere(whereColumns, value) - values = append(values, value) + // generate dispatching key + // find primary keys + key, err := m.generateDispatchKey(table, columnValues) + if err != nil { + return nil, nil, nil, errors.Trace(err) + } + keys = append(keys, key) - sql := fmt.Sprintf("delete from %s.%s where %s limit 1;", schema, table.Name, where) - sqls = append(sqls, sql) - keys = append(keys, fmt.Sprintf("%v", value)) } return sqls, keys, values, nil } -func (m *mysqlTranslator) GenDeleteSQLs(schema string, table *model.TableInfo, op OpType, rows [][]byte) ([]string, []string, [][]interface{}, error) { +func (m *mysqlTranslator) GenDeleteSQLs(schema string, table *model.TableInfo, rows [][]byte) ([]string, []string, [][]interface{}, error) { columns := table.Columns sqls := make([]string, 0, len(rows)) keys := make([]string, 0, len(rows)) @@ -226,56 +167,36 @@ func (m *mysqlTranslator) GenDeleteSQLs(schema string, table *model.TableInfo, o for _, row := range rows { var whereColumns []*model.ColumnInfo var value []interface{} - r, err := codec.Decode(row, len(columns)) + r, err := codec.Decode(row, 2*len(columns)) if err != nil { return nil, nil, nil, errors.Trace(err) } - switch op { - case DelByPK: - whereColumns, _ = m.pkIndexColumns(table) - if whereColumns == nil { - return nil, nil, nil, errors.Errorf("table %s.%s doesn't have pkHandle column", schema, table.Name) - } - - if len(r) != len(whereColumns) { - return nil, nil, nil, errors.Errorf("table %s.%s the delete row by pks binlog %v is courruption", schema, table.Name, r) - } - - for index, val := range r { - newValue, err := m.formatData(val, whereColumns[index].FieldType) - if err != nil { - return nil, nil, nil, errors.Trace(err) - } - - value = append(value, newValue) - } - keys = append(keys, fmt.Sprintf("%v", value)) - - case DelByCol: - if len(r)%2 != 0 { - return nil, nil, nil, errors.Errorf("table %s.%s the delete row by cols binlog %v is courruption", schema, table.Name, r) - } - - var columnValues = make(map[int64]types.Datum) - for i := 0; i < len(r); i += 2 { - columnValues[r[i].GetInt64()] = r[i+1] - } + if len(r)%2 != 0 { + return nil, nil, nil, errors.Errorf("table `%s`.`%s` the delete row by cols binlog %v is courruption", schema, table.Name, r) + } - whereColumns, value, err = m.generateColumnAndValue(columns, columnValues) - if err != nil { - return nil, nil, nil, errors.Trace(err) - } - keys = append(keys, "[]") + var columnValues = make(map[int64]types.Datum) + for i := 0; i < len(r); i += 2 { + columnValues[r[i].GetInt64()] = r[i+1] + } - default: - return nil, nil, nil, errors.Errorf("delete row error type %v", op) + whereColumns, value, err = m.generateColumnAndValue(columns, columnValues) + if err != nil { + return nil, nil, nil, errors.Trace(err) } where := m.genWhere(whereColumns, value) values = append(values, value) - sql := fmt.Sprintf("delete from %s.%s where %s limit 1;", schema, table.Name, where) + sql := fmt.Sprintf("delete from `%s`.`%s` where %s limit 1;", schema, table.Name, where) sqls = append(sqls, sql) + // generate dispatching key + // find primary keys + key, err := m.generateDispatchKey(table, columnValues) + if err != nil { + return nil, nil, nil, errors.Trace(err) + } + keys = append(keys, key) } return sqls, keys, values, nil @@ -294,13 +215,14 @@ func (m *mysqlTranslator) GenDDLSQL(sql string, schema string) (string, error) { return fmt.Sprintf("%s;", sql), nil } - return fmt.Sprintf("use %s; %s;", schema, sql), nil + return fmt.Sprintf("use `%s`; %s;", schema, sql), nil } func (m *mysqlTranslator) genColumnList(columns []*model.ColumnInfo) string { var columnList []byte for i, column := range columns { - columnList = append(columnList, []byte(column.Name.L)...) + name := fmt.Sprintf("`%s`", column.Name.L) + columnList = append(columnList, []byte(name)...) if i != len(columns)-1 { columnList = append(columnList, ',') @@ -322,9 +244,9 @@ func (m *mysqlTranslator) genKVs(columns []*model.ColumnInfo) string { var kvs bytes.Buffer for i := range columns { if i == len(columns)-1 { - fmt.Fprintf(&kvs, "%s = ?", columns[i].Name) + fmt.Fprintf(&kvs, "`%s` = ?", columns[i].Name) } else { - fmt.Fprintf(&kvs, "%s = ?, ", columns[i].Name) + fmt.Fprintf(&kvs, "`%s` = ?, ", columns[i].Name) } } @@ -340,9 +262,9 @@ func (m *mysqlTranslator) genWhere(columns []*model.ColumnInfo, data []interface } if i == len(columns)-1 { - fmt.Fprintf(&kvs, "%s %s ?", columns[i].Name, kvSplit) + fmt.Fprintf(&kvs, "`%s` %s ?", columns[i].Name, kvSplit) } else { - fmt.Fprintf(&kvs, "%s %s ? and ", columns[i].Name, kvSplit) + fmt.Fprintf(&kvs, "`%s` %s ? and ", columns[i].Name, kvSplit) } } @@ -351,7 +273,7 @@ func (m *mysqlTranslator) genWhere(columns []*model.ColumnInfo, data []interface func (m *mysqlTranslator) pkHandleColumn(table *model.TableInfo) *model.ColumnInfo { for _, col := range table.Columns { - if m.isPKHandleColumn(table, col) { + if isPKHandleColumn(table, col) { return col } } @@ -391,7 +313,7 @@ func (m *mysqlTranslator) pkIndexColumns(table *model.TableInfo) ([]*model.Colum return cols, nil } -func (m *mysqlTranslator) isPKHandleColumn(table *model.TableInfo, column *model.ColumnInfo) bool { +func isPKHandleColumn(table *model.TableInfo, column *model.ColumnInfo) bool { return mysql.HasPriKeyFlag(column.Flag) && table.PKIsHandle } @@ -415,8 +337,12 @@ func (m *mysqlTranslator) generateColumnAndValue(columns []*model.ColumnInfo, co return newColumn, newColumnsValues, nil } -func (m *mysqlTranslator) generateDispatchKey(columns []*model.ColumnInfo, columnValues map[int64]types.Datum) (string, error) { +func (m *mysqlTranslator) generateDispatchKey(table *model.TableInfo, columnValues map[int64]types.Datum) (string, error) { var columnsValues []interface{} + columns, err := m.pkIndexColumns(table) + if err != nil { + return "", errors.Trace(err) + } for _, col := range columns { val, ok := columnValues[col.ID] if ok { diff --git a/drainer/translator/mysql_test.go b/drainer/translator/mysql_test.go index 61782e05b..81c9886af 100644 --- a/drainer/translator/mysql_test.go +++ b/drainer/translator/mysql_test.go @@ -5,31 +5,31 @@ import ( "github.com/pingcap/tidb/util/types" ) -// test the already implemented translater, register and unregister function -func (t *testTranslaterSuite) TestGenColumnList(c *C) { +// test the already implemented translator, register and unregister function +func (t *testTranslatorSuite) TestGenColumnList(c *C) { m := testGenMysqlTranslator(c) table := testGenTable("normal") - c.Assert(m.genColumnList(table.Columns), Equals, "id,name,sex") + c.Assert(m.genColumnList(table.Columns), Equals, "`id`,`name`,`sex`") } -func (t *testTranslaterSuite) TestGenColumnPlaceholders(c *C) { +func (t *testTranslatorSuite) TestGenColumnPlaceholders(c *C) { m := testGenMysqlTranslator(c) c.Assert(m.genColumnPlaceholders(3), Equals, "?,?,?") } -func (t *testTranslaterSuite) TestGenKVs(c *C) { +func (t *testTranslatorSuite) TestGenKVs(c *C) { m := testGenMysqlTranslator(c) table := testGenTable("normal") - c.Assert(m.genKVs(table.Columns), Equals, "ID = ?, NAME = ?, SEX = ?") + c.Assert(m.genKVs(table.Columns), Equals, "`ID` = ?, `NAME` = ?, `SEX` = ?") } -func (t *testTranslaterSuite) TestGenWhere(c *C) { +func (t *testTranslatorSuite) TestGenWhere(c *C) { m := testGenMysqlTranslator(c) table := testGenTable("normal") - c.Assert(m.genWhere(table.Columns, []interface{}{1, "test", nil}), Equals, "ID = ? and NAME = ? and SEX is ?") + c.Assert(m.genWhere(table.Columns, []interface{}{1, "test", nil}), Equals, "`ID` = ? and `NAME` = ? and `SEX` is ?") } -func (t *testTranslaterSuite) TestPkHandleColumn(c *C) { +func (t *testTranslatorSuite) TestPkHandleColumn(c *C) { m := testGenMysqlTranslator(c) table := testGenTable("normal") c.Assert(m.pkHandleColumn(table), IsNil) @@ -40,7 +40,7 @@ func (t *testTranslaterSuite) TestPkHandleColumn(c *C) { } } -func (t *testTranslaterSuite) TestPkIndexColumns(c *C) { +func (t *testTranslatorSuite) TestPkIndexColumns(c *C) { m := testGenMysqlTranslator(c) table := testGenTable("hasPK") cols, err := m.pkIndexColumns(table) @@ -58,7 +58,7 @@ func (t *testTranslaterSuite) TestPkIndexColumns(c *C) { c.Assert(len(cols), Equals, 0) } -func (t *testTranslaterSuite) testGenerateColumnAndValue(c *C) { +func (t *testTranslatorSuite) testGenerateColumnAndValue(c *C) { m := testGenMysqlTranslator(c) table := testGenTable("normal") rawData, expected := testGenRowDatas(c, table.Columns) diff --git a/drainer/translator/pb.go b/drainer/translator/pb.go new file mode 100644 index 000000000..c7347639c --- /dev/null +++ b/drainer/translator/pb.go @@ -0,0 +1,270 @@ +package translator + +import ( + "fmt" + + "github.com/golang/protobuf/proto" + "github.com/juju/errors" + pb "github.com/pingcap/tidb-binlog/proto/binlog" + "github.com/pingcap/tidb/ast" + "github.com/pingcap/tidb/model" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/types" +) + +// pbTranslator translates TiDB binlog to self-description protobuf +type pbTranslator struct{} + +func init() { + Register("pb", &pbTranslator{}) +} + +func (p *pbTranslator) GenInsertSQLs(schema string, table *model.TableInfo, rows [][]byte) ([]string, []string, [][]interface{}, error) { + columns := table.Columns + sqls := make([]string, 0, len(rows)) + keys := make([]string, 0, len(rows)) + values := make([][]interface{}, 0, len(rows)) + for _, row := range rows { + //decode the pk value + remain, pk, err := codec.DecodeOne(row) + if err != nil { + return nil, nil, nil, errors.Trace(err) + } + + var r []types.Datum + // decode the remain values, the format is [coldID, colVal, coldID, colVal....] + // while the table just has primary id, filter the nil value that follows by the primary id + if remain[0] != codec.NilFlag { + r, err = codec.Decode(remain, 2*(len(columns)-1)) + if err != nil { + return nil, nil, nil, errors.Trace(err) + } + } + + if len(r)%2 != 0 { + return nil, nil, nil, errors.Errorf("table %s.%s insert row raw data is corruption %v", schema, table.Name, r) + } + + var columnValues = make(map[int64]types.Datum) + for i := 0; i < len(r); i += 2 { + columnValues[r[i].GetInt64()] = r[i+1] + } + + var ( + vals = make([]types.Datum, 0, len(columns)) + cols = make([]string, 0, len(columns)) + tps = make([]byte, 0, len(columns)) + ) + for _, col := range columns { + if isPKHandleColumn(table, col) { + columnValues[col.ID] = pk + } + + val, ok := columnValues[col.ID] + if ok { + value, err := tablecodec.Unflatten(val, &col.FieldType, false) + if err != nil { + return nil, nil, nil, errors.Trace(err) + } + vals = append(vals, value) + cols = append(cols, col.Name.L) + tps = append(tps, col.Tp) + } + } + + rowData, err := encodeRow(vals, cols, tps) + if err != nil { + return nil, nil, nil, errors.Trace(err) + } + + sqls = append(sqls, "") + values = append(values, packEvent(schema, table.Name.L, pb.EventType_Insert, rowData)) + keys = append(keys, "") + } + + return sqls, keys, values, nil +} + +func (p *pbTranslator) GenUpdateSQLs(schema string, table *model.TableInfo, rows [][]byte) ([]string, []string, [][]interface{}, error) { + columns := table.Columns + sqls := make([]string, 0, len(rows)) + keys := make([]string, 0, len(rows)) + values := make([][]interface{}, 0, len(rows)) + for _, row := range rows { + r, err := codec.Decode(row, 2*len(columns)) + if err != nil { + return nil, nil, nil, errors.Trace(err) + } + + if len(r)%2 != 0 { + return nil, nil, nil, errors.Errorf("table %s.%s update row data is corruption %v", schema, table.Name, r) + } + + var i int + // old column values + oldColumnValues := make(map[int64]types.Datum) + for ; i < len(r)/2; i += 2 { + oldColumnValues[r[i].GetInt64()] = r[i+1] + } + // new coulmn values + newColumnVlaues := make(map[int64]types.Datum) + for ; i < len(r); i += 2 { + newColumnVlaues[r[i].GetInt64()] = r[i+1] + } + + var ( + oldVals = make([]types.Datum, 0, len(columns)) + newVals = make([]types.Datum, 0, len(columns)) + cols = make([]string, 0, len(columns)) + tps = make([]byte, 0, len(columns)) + ) + for _, col := range columns { + val, ok := newColumnVlaues[col.ID] + if ok { + newValue, err := tablecodec.Unflatten(val, &col.FieldType, false) + if err != nil { + return nil, nil, nil, errors.Trace(err) + } + oldValue, err := tablecodec.Unflatten(oldColumnValues[col.ID], &col.FieldType, false) + if err != nil { + return nil, nil, nil, errors.Trace(err) + } + oldVals = append(oldVals, oldValue) + newVals = append(newVals, newValue) + cols = append(cols, col.Name.L) + tps = append(tps, col.Tp) + } + } + + rowData, err := encodeUpdateRow(oldVals, newVals, cols, tps) + if err != nil { + return nil, nil, nil, errors.Trace(err) + } + + sqls = append(sqls, "") + values = append(values, packEvent(schema, table.Name.L, pb.EventType_Update, rowData)) + keys = append(keys, "") + } + + return sqls, keys, values, nil +} + +func (p *pbTranslator) GenDeleteSQLs(schema string, table *model.TableInfo, rows [][]byte) ([]string, []string, [][]interface{}, error) { + columns := table.Columns + sqls := make([]string, 0, len(rows)) + keys := make([]string, 0, len(rows)) + values := make([][]interface{}, 0, len(rows)) + + for _, row := range rows { + r, err := codec.Decode(row, len(columns)) + if err != nil { + return nil, nil, nil, errors.Trace(err) + } + + if len(r)%2 != 0 { + return nil, nil, nil, errors.Errorf("table %s.%s the delete row by cols binlog %v is courruption", schema, table.Name, r) + } + + var columnValues = make(map[int64]types.Datum) + for i := 0; i < len(r); i += 2 { + columnValues[r[i].GetInt64()] = r[i+1] + } + + var ( + vals = make([]types.Datum, 0, len(columns)) + cols = make([]string, 0, len(columns)) + tps = make([]byte, 0, len(columns)) + ) + for _, col := range columns { + val, ok := columnValues[col.ID] + if ok { + value, err := tablecodec.Unflatten(val, &col.FieldType, false) + if err != nil { + return nil, nil, nil, errors.Trace(err) + } + vals = append(vals, value) + cols = append(cols, col.Name.L) + tps = append(tps, col.Tp) + } + } + + rowData, err := encodeRow(vals, cols, tps) + if err != nil { + return nil, nil, nil, errors.Trace(err) + } + + sqls = append(sqls, "") + values = append(values, packEvent(schema, table.Name.L, pb.EventType_Delete, rowData)) + keys = append(keys, "") + } + + return sqls, keys, values, nil +} + +func (p *pbTranslator) GenDDLSQL(sql string, schema string) (string, error) { + stmts, err := parser.New().Parse(sql, "", "") + if err != nil { + return "", errors.Trace(err) + } + + stmt := stmts[0] + _, isCreateDatabase := stmt.(*ast.CreateDatabaseStmt) + if isCreateDatabase { + return fmt.Sprintf("%s;", sql), nil + } + + return fmt.Sprintf("use %s; %s;", schema, sql), nil +} + +// encodeRow encode insert/delete column name, file tp and row data into a slice of byte. +// Row layout: colName1, tp1, value1, colName2, tp2, value2, ..... +func encodeRow(row []types.Datum, colName []string, tp []byte) ([]byte, error) { + if len(row) != len(colName) || len(row) != len(tp) { + return nil, errors.Errorf("EncodeRow error: data, columnName and tp count not match %d vs %d vs %d", len(row), len(colName), len(tp)) + } + values := make([]types.Datum, 3*len(row)) + for i, c := range row { + values[3*i] = types.NewStringDatum(colName[i]) + values[3*i+1] = types.NewBytesDatum([]byte{tp[i]}) + values[3*i+2] = c + } + if len(values) == 0 { + // We could not set nil value into kv. + return []byte{codec.NilFlag}, nil + } + return codec.EncodeValue(nil, values...) +} + +// encodeUpdateRow encode insert/delete column name, file tp and old/new row data into a slice of byte. +// Row layout: colName1, tp1, oldValue1, newValue1, colName2, tp2, oldValue2, newValue2, ..... +func encodeUpdateRow(oldRow []types.Datum, newRow []types.Datum, colName []string, tp []byte) ([]byte, error) { + if len(oldRow) != len(colName) || len(oldRow) != len(newRow) || len(oldRow) != len(tp) { + return nil, errors.Errorf("EncodeRow error: oldData, newData, columnName and tp count not match %d vs %d vs %d vs %d", len(oldRow), len(newRow), len(colName), len(tp)) + } + values := make([]types.Datum, 4*len(oldRow)) + for i, c := range oldRow { + values[4*i] = types.NewStringDatum(colName[i]) + values[4*i+1] = types.NewBytesDatum([]byte{tp[i]}) + values[4*i+2] = c + values[4*i+3] = newRow[i] + + } + if len(values) == 0 { + // We could not set nil value into kv. + return []byte{codec.NilFlag}, nil + } + return codec.EncodeValue(nil, values...) +} + +func packEvent(schemaName, tableName string, tp pb.EventType, rowData []byte) []interface{} { + event := &pb.Event{ + SchemaName: proto.String(schemaName), + TableName: proto.String(tableName), + Row: rowData, + Tp: tp, + } + + return []interface{}{event} +} diff --git a/drainer/translator/translater.go b/drainer/translator/translator.go similarity index 70% rename from drainer/translator/translater.go rename to drainer/translator/translator.go index 154170844..0cd025346 100644 --- a/drainer/translator/translater.go +++ b/drainer/translator/translator.go @@ -10,18 +10,8 @@ import ( type OpType byte const ( - // Insert is the constant OpType for insert operation - Insert = iota + 1 - // Update is the constant OpType for update operation - Update - // Del is the constant OpType for delete operation - Del - // DelByID is the constant OpType for delete operation - DelByID - // DelByPK is the constant OpType for delete operation - DelByPK - // DelByCol is the constant OpType for delete operation - DelByCol + // DML is the constant OpType for delete operation + DML = iota + 1 // DDL is the constant OpType for ddl operation DDL ) @@ -36,11 +26,8 @@ type SQLTranslator interface { // GenUpdateSQLs generates the update sqls GenUpdateSQLs(string, *model.TableInfo, [][]byte) ([]string, []string, [][]interface{}, error) - // GenDeleteSQLsByID generates the delete by ID sqls - GenDeleteSQLsByID(string, *model.TableInfo, []int64) ([]string, []string, [][]interface{}, error) - // GenDeleteSQLs generates the delete sqls by cols values - GenDeleteSQLs(string, *model.TableInfo, OpType, [][]byte) ([]string, []string, [][]interface{}, error) + GenDeleteSQLs(string, *model.TableInfo, [][]byte) ([]string, []string, [][]interface{}, error) // GenDDLSQL generates the ddl sql by query string GenDDLSQL(string, string) (string, error) diff --git a/drainer/translator/translater_test.go b/drainer/translator/translator_test.go similarity index 70% rename from drainer/translator/translater_test.go rename to drainer/translator/translator_test.go index 65bce8ce9..1d0fe6b91 100644 --- a/drainer/translator/translater_test.go +++ b/drainer/translator/translator_test.go @@ -16,12 +16,12 @@ func TestClient(t *testing.T) { TestingT(t) } -var _ = Suite(&testTranslaterSuite{}) +var _ = Suite(&testTranslatorSuite{}) -type testTranslaterSuite struct{} +type testTranslatorSuite struct{} -// test the already implemented translater, register and unregister function -func (t *testTranslaterSuite) TestRegisterAndUnregister(c *C) { +// test the already implemented translator, register and unregister function +func (t *testTranslatorSuite) TestRegisterAndUnregister(c *C) { hasTranslaters := []string{"mysql"} for _, name := range hasTranslaters { _, err := New(name) @@ -41,15 +41,14 @@ func (t *testTranslaterSuite) TestRegisterAndUnregister(c *C) { } } -// test all already implemented translater's translation function -func (t *testTranslaterSuite) TestTranslater(c *C) { - for _, s := range providers { - testGenInsertSQLs(c, s) - testGenUpdateSQLs(c, s) - testGenDeleteSQLs(c, s) - testGenDeleteSQLsByID(c, s) - testGenDDLSQL(c, s) - } +// test all already implemented translator's translation function +func (t *testTranslatorSuite) TestTranslater(c *C) { + s, err := New("mysql") + c.Assert(err, IsNil) + testGenInsertSQLs(c, s) + testGenUpdateSQLs(c, s) + testGenDeleteSQLs(c, s) + testGenDDLSQL(c, s) } func testGenInsertSQLs(c *C, s SQLTranslator) { @@ -63,7 +62,7 @@ func testGenInsertSQLs(c *C, s SQLTranslator) { c.Assert(keys[0], Equals, fmt.Sprintf("%v", expected[:exceptedKeys[i]])) c.Assert(err, IsNil) c.Assert(len(vals[0]), Equals, 3) - c.Assert(sqls[0], Equals, "replace into t.account (id,name,sex) values (?,?,?);") + c.Assert(sqls[0], Equals, "replace into `t`.`account` (`id`,`name`,`sex`) values (?,?,?);") for index := range vals { c.Assert(vals[0][index], DeepEquals, expected[index]) } @@ -80,10 +79,8 @@ func testGenInsertSQLs(c *C, s SQLTranslator) { func testGenUpdateSQLs(c *C, s SQLTranslator) { schema := "t" tables := []*model.TableInfo{testGenTable("normal"), testGenTable("hasPK"), testGenTable("hasID")} - exceptedSqls := []string{"update t.account set ID = ?, NAME = ?, SEX = ? where ID = ? and NAME = ? and SEX = ? limit 1;", - "update t.account set ID = ?, NAME = ?, SEX = ? where ID = ? and NAME = ? limit 1;", - "update t.account set ID = ?, NAME = ?, SEX = ? where ID = ? limit 1;"} - exceptedNums := []int{6, 5, 4} + exceptedSQL := "update `t`.`account` set `ID` = ?, `NAME` = ?, `SEX` = ? where `ID` = ? and `NAME` = ? and `SEX` = ? limit 1;" + exceptedNum := 6 exceptedKeys := []int{0, 2, 1} for index, t := range tables { rowDatas, expected := testGenRowDatas(c, t.Columns) @@ -91,8 +88,8 @@ func testGenUpdateSQLs(c *C, s SQLTranslator) { sqls, keys, vals, err := s.GenUpdateSQLs(schema, t, [][]byte{binlog}) c.Assert(keys[0], Equals, fmt.Sprintf("%v", expected[:exceptedKeys[index]])) c.Assert(err, IsNil) - c.Assert(len(vals[0]), Equals, exceptedNums[index]) - c.Assert(sqls[0], Equals, exceptedSqls[index]) + c.Assert(len(vals[0]), Equals, exceptedNum) + c.Assert(sqls[0], Equals, exceptedSQL) for index := range vals { c.Assert(vals[0][index], DeepEquals, expected[index%3]) } @@ -109,19 +106,17 @@ func testGenUpdateSQLs(c *C, s SQLTranslator) { func testGenDeleteSQLs(c *C, s SQLTranslator) { schema := "t" tables := []*model.TableInfo{testGenTable("normal"), testGenTable("hasPK")} - exceptedSqls := []string{"delete from t.account where ID = ? and NAME = ? and SEX = ? limit 1;", - "delete from t.account where ID = ? and NAME = ? limit 1;"} - exceptedNums := []int{3, 2} + exceptedSQL := "delete from `t`.`account` where `ID` = ? and `NAME` = ? and `SEX` = ? limit 1;" + exceptedNum := 3 exceptedKeys := []int{0, 2} - op := []OpType{DelByCol, DelByPK} for index, t := range tables { rowDatas, expected := testGenRowDatas(c, t.Columns) binlog := testGenDeleteBinlog(c, t, rowDatas) - sqls, keys, vals, err := s.GenDeleteSQLs(schema, t, op[index], [][]byte{binlog}) + sqls, keys, vals, err := s.GenDeleteSQLs(schema, t, [][]byte{binlog}) c.Assert(keys[0], Equals, fmt.Sprintf("%v", expected[:exceptedKeys[index]])) c.Assert(err, IsNil) - c.Assert(len(vals[0]), Equals, exceptedNums[index]) - c.Assert(sqls[0], Equals, exceptedSqls[index]) + c.Assert(len(vals[0]), Equals, exceptedNum) + c.Assert(sqls[0], Equals, exceptedSQL) for index := range vals { c.Assert(vals[0][index], DeepEquals, expected[index]) } @@ -129,32 +124,12 @@ func testGenDeleteSQLs(c *C, s SQLTranslator) { rowDatas, _ := testGenRowDatas(c, tables[0].Columns) binlog := testGenDeleteBinlog(c, tables[0], rowDatas) - _, _, _, err := s.GenDeleteSQLs(schema, tables[0], DelByCol, [][]byte{binlog[6:]}) + _, _, _, err := s.GenDeleteSQLs(schema, tables[0], [][]byte{binlog[6:]}) if err == nil { c.Fatal("it's should panic") } } -func testGenDeleteSQLsByID(c *C, s SQLTranslator) { - schema := "t" - tables := []*model.TableInfo{testGenTable("hasID")} - exceptedSqls := []string{"delete from t.account where ID = ? limit 1;"} - exceptedNums := []int{1} - exceptedKeys := []int{1} - for index, t := range tables { - rowDatas, expected := testGenRowDatas(c, t.Columns) - binlog := testGenDeleteBinlogByID(c, t, rowDatas) - sqls, keys, vals, err := s.GenDeleteSQLsByID(schema, t, []int64{binlog}) - c.Assert(keys[0], Equals, fmt.Sprintf("%v", expected[:exceptedKeys[index]])) - c.Assert(err, IsNil) - c.Assert(len(vals[0]), Equals, exceptedNums[index]) - c.Assert(sqls[0], Equals, exceptedSqls[index]) - for index := range vals { - c.Assert(vals[0][index], DeepEquals, expected[index]) - } - } -} - func testGenDDLSQL(c *C, s SQLTranslator) { sql, err := s.GenDDLSQL("create database t", "t") c.Assert(err, IsNil) @@ -162,7 +137,7 @@ func testGenDDLSQL(c *C, s SQLTranslator) { sql, err = s.GenDDLSQL("drop table t", "t") c.Assert(err, IsNil) - c.Assert(sql, Equals, "use t; drop table t;") + c.Assert(sql, Equals, "use `t`; drop table t;") } func testGenInsertBinlog(c *C, t *model.TableInfo, r []types.Datum) []byte { @@ -198,83 +173,18 @@ func testGenUpdateBinlog(c *C, t *model.TableInfo, oldData []types.Datum, newDat colIDs = append(colIDs, col.ID) } + var bin []byte value, err := tablecodec.EncodeRow(newData, colIDs) c.Assert(err, IsNil) - - var h int64 - hasPK := false - if t.PKIsHandle { - for _, col := range t.Columns { - if testIsPKHandleColumn(t, col) { - hasPK = true - h = oldData[col.Offset].GetInt64() - break - } - } - } else { - for _, idx := range t.Indices { - if idx.Primary { - hasPK = true - break - } - } - h = 2 - } - - var bin []byte - if hasPK { - handleData, _ := codec.EncodeValue(nil, types.NewIntDatum(h)) - bin = append(handleData, value...) - } else { - oldData, err := tablecodec.EncodeRow(oldData, colIDs) - c.Assert(err, IsNil) - bin = append(oldData, value...) - } - + oldValue, err := tablecodec.EncodeRow(oldData, colIDs) + c.Assert(err, IsNil) + bin = append(oldValue, value...) return bin } -func testGenDeleteBinlogByID(c *C, t *model.TableInfo, r []types.Datum) int64 { - var h int64 - var hasPK bool - if t.PKIsHandle { - for _, col := range t.Columns { - if testIsPKHandleColumn(t, col) { - hasPK = true - h = r[col.Offset].GetInt64() - break - } - } - - if !hasPK { - c.Fatal("this case don't have primary id") - } - return h - } - - c.Fatal("this case don't have primary id") - return 0 -} - func testGenDeleteBinlog(c *C, t *model.TableInfo, r []types.Datum) []byte { - var primaryIdx *model.IndexInfo - for _, idx := range t.Indices { - if idx.Primary { - primaryIdx = idx - break - } - } var data []byte var err error - if primaryIdx != nil { - indexedValues := make([]types.Datum, len(primaryIdx.Columns)) - for i := range indexedValues { - indexedValues[i] = r[primaryIdx.Columns[i].Offset] - } - data, err = codec.EncodeKey(nil, indexedValues...) - c.Assert(err, IsNil) - return data - } colIDs := make([]int64, len(t.Columns)) for i, col := range t.Columns { colIDs[i] = col.ID diff --git a/drainer/util.go b/drainer/util.go index d6003b312..d8b32f063 100644 --- a/drainer/util.go +++ b/drainer/util.go @@ -13,6 +13,7 @@ import ( "github.com/go-sql-driver/mysql" "github.com/juju/errors" "github.com/ngaut/log" + "github.com/pingcap/tidb-binlog/drainer/executor" tddl "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/model" @@ -98,7 +99,7 @@ func posToFloat(pos *binlog.Pos) float64 { return float64(pos.Offset) + decimal } -func genCisternID(listenAddr string) (string, error) { +func genDrainerID(listenAddr string) (string, error) { urllis, err := url.Parse(listenAddr) if err != nil { return "", errors.Trace(err) @@ -117,36 +118,14 @@ func genCisternID(listenAddr string) (string, error) { return fmt.Sprintf("%s:%s", hostname, port), nil } -func executeSQLs(db *sql.DB, sqls []string, args [][]interface{}, isDDL bool) error { - if len(sqls) == 0 { - return nil - } - +func execute(executor executor.Executor, sqls []string, args [][]interface{}, commitTSs []int64, isDDL bool) error { // compute txn duration beginTime := time.Now() defer func() { txnHistogram.Observe(time.Since(beginTime).Seconds()) }() - retryCount := maxDMLRetryCount - if isDDL { - retryCount = maxDDLRetryCount - } - - var err error - for i := 0; i < retryCount; i++ { - if i > 0 { - log.Warnf("exec sql retry %d - %v - %v", i, sqls, args) - time.Sleep(waitTime) - } - - err = appleTxn(db, sqls, args) - if err == nil { - return nil - } - } - - return errors.Trace(err) + return executor.Execute(sqls, args, commitTSs, isDDL) } func appleTxn(db *sql.DB, sqls []string, args [][]interface{}) error { @@ -201,41 +180,27 @@ func ignoreDDLError(err error) bool { } } -func openDB(username string, password string, host string, port int, proto string) (*sql.DB, error) { - dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8&multiStatements=true", username, password, host, port) - db, err := sql.Open(proto, dbDSN) - if err != nil { - return nil, errors.Trace(err) - } - - return db, nil -} - -func closeDB(db *sql.DB) error { - return errors.Trace(db.Close()) -} - -func closeDBs(dbs ...*sql.DB) { - for _, db := range dbs { - err := closeDB(db) +func closeExecutors(executors ...executor.Executor) { + for _, e := range executors { + err := e.Close() if err != nil { log.Errorf("close db failed - %v", err) } } } -func createDBs(destDBType string, cfg DBConfig, count int) ([]*sql.DB, error) { - dbs := make([]*sql.DB, 0, count) +func createExecutors(destDBType string, cfg *executor.DBConfig, count int) ([]executor.Executor, error) { + executors := make([]executor.Executor, 0, count) for i := 0; i < count; i++ { - db, err := openDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, destDBType) + executor, err := executor.New(destDBType, cfg) if err != nil { return nil, errors.Trace(err) } - dbs = append(dbs, db) + executors = append(executors, executor) } - return dbs, nil + return executors, nil } func genHashKey(key string) uint32 { diff --git a/generate-binlog.sh b/generate-binlog.sh new file mode 100755 index 000000000..a3c3e63e0 --- /dev/null +++ b/generate-binlog.sh @@ -0,0 +1,11 @@ +#!/usr/bin/env bash +cd proto/ + +echo "generate binlog code..." +GOGO_ROOT=${GOPATH}/src/github.com/gogo/protobuf +protoc -I.:${GOGO_ROOT}:${GOGO_ROOT}/protobuf --gofast_out=./binlog binlog.proto +cd ./binlog +sed -i.bak -E 's/import _ \"gogoproto\"//g' *.pb.go +sed -i.bak -E 's/import fmt \"fmt\"//g' *.pb.go +rm -f *.bak +goimports -w *.pb.go diff --git a/proto/binlog.proto b/proto/binlog.proto new file mode 100644 index 000000000..b1812fbcf --- /dev/null +++ b/proto/binlog.proto @@ -0,0 +1,50 @@ +syntax = "proto2"; + +package pb_binlog; + +import "gogoproto/gogo.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.sizer_all) = true; +option (gogoproto.unmarshaler_all) = true; + +enum EventType { + Insert = 0; + Update = 1; + Delete = 2; +} + +// TableMutation contains mutations in a table. +message Event { + optional string schema_name = 1; + optional string table_name = 2; + optional EventType tp = 3 [(gogoproto.nullable) = false]; + + // For inserted rows, we save all column values of the row[column_name, column_type, column_value, ...]. + // For updated rows, we save all old and new column values of the row[column_name, column_type, column_old_value, column_new_value, ...]. + // For Deleted rows, we save all column values of the row[column_name, column_type, column_value, ...]. + optional bytes row = 4; +} + +message DMLData { + // mutations contains all the row changes. + repeated Event events = 1 [(gogoproto.nullable) = false]; +} + +enum BinlogType { + DML = 0; // has commit_ts, dml_data + DDL = 1; // has commit_ts, ddl_query +} + +// Binlog contains all the changes in a transaction. +message Binlog { + optional BinlogType tp = 1 [(gogoproto.nullable) = false]; + optional int64 commit_ts = 2 [(gogoproto.nullable) = false]; + + + // dml_data is marshalled from DML type, + optional DMLData dml_data = 3; + + // ddl_query is the original ddl statement query. + optional bytes ddl_query = 4; +} diff --git a/proto/binlog/binlog.pb.go b/proto/binlog/binlog.pb.go new file mode 100644 index 000000000..86b0762b6 --- /dev/null +++ b/proto/binlog/binlog.pb.go @@ -0,0 +1,966 @@ +// Code generated by protoc-gen-gogo. +// source: binlog.proto +// DO NOT EDIT! + +/* + Package pb_binlog is a generated protocol buffer package. + + It is generated from these files: + binlog.proto + + It has these top-level messages: + Event + DMLData + Binlog +*/ +package pb_binlog + +import ( + "fmt" + + proto "github.com/golang/protobuf/proto" + + math "math" +) + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type EventType int32 + +const ( + EventType_Insert EventType = 0 + EventType_Update EventType = 1 + EventType_Delete EventType = 2 +) + +var EventType_name = map[int32]string{ + 0: "Insert", + 1: "Update", + 2: "Delete", +} +var EventType_value = map[string]int32{ + "Insert": 0, + "Update": 1, + "Delete": 2, +} + +func (x EventType) Enum() *EventType { + p := new(EventType) + *p = x + return p +} +func (x EventType) String() string { + return proto.EnumName(EventType_name, int32(x)) +} +func (x *EventType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(EventType_value, data, "EventType") + if err != nil { + return err + } + *x = EventType(value) + return nil +} +func (EventType) EnumDescriptor() ([]byte, []int) { return fileDescriptorBinlog, []int{0} } + +type BinlogType int32 + +const ( + BinlogType_DML BinlogType = 0 + BinlogType_DDL BinlogType = 1 +) + +var BinlogType_name = map[int32]string{ + 0: "DML", + 1: "DDL", +} +var BinlogType_value = map[string]int32{ + "DML": 0, + "DDL": 1, +} + +func (x BinlogType) Enum() *BinlogType { + p := new(BinlogType) + *p = x + return p +} +func (x BinlogType) String() string { + return proto.EnumName(BinlogType_name, int32(x)) +} +func (x *BinlogType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(BinlogType_value, data, "BinlogType") + if err != nil { + return err + } + *x = BinlogType(value) + return nil +} +func (BinlogType) EnumDescriptor() ([]byte, []int) { return fileDescriptorBinlog, []int{1} } + +// TableMutation contains mutations in a table. +type Event struct { + SchemaName *string `protobuf:"bytes,1,opt,name=schema_name,json=schemaName" json:"schema_name,omitempty"` + TableName *string `protobuf:"bytes,2,opt,name=table_name,json=tableName" json:"table_name,omitempty"` + Tp EventType `protobuf:"varint,3,opt,name=tp,enum=pb_binlog.EventType" json:"tp"` + // For inserted rows, we save all column values of the row[column_name, column_type, column_value, ...]. + // For updated rows, we save all old and new column values of the row[column_name, column_type, column_old_value, column_new_value, ...]. + // For Deleted rows, we save all column values of the row[column_name, column_type, column_value, ...]. + Row []byte `protobuf:"bytes,4,opt,name=row" json:"row,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Event) Reset() { *m = Event{} } +func (m *Event) String() string { return proto.CompactTextString(m) } +func (*Event) ProtoMessage() {} +func (*Event) Descriptor() ([]byte, []int) { return fileDescriptorBinlog, []int{0} } + +func (m *Event) GetSchemaName() string { + if m != nil && m.SchemaName != nil { + return *m.SchemaName + } + return "" +} + +func (m *Event) GetTableName() string { + if m != nil && m.TableName != nil { + return *m.TableName + } + return "" +} + +func (m *Event) GetTp() EventType { + if m != nil { + return m.Tp + } + return EventType_Insert +} + +func (m *Event) GetRow() []byte { + if m != nil { + return m.Row + } + return nil +} + +type DMLData struct { + // mutations contains all the row changes. + Events []Event `protobuf:"bytes,1,rep,name=events" json:"events"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *DMLData) Reset() { *m = DMLData{} } +func (m *DMLData) String() string { return proto.CompactTextString(m) } +func (*DMLData) ProtoMessage() {} +func (*DMLData) Descriptor() ([]byte, []int) { return fileDescriptorBinlog, []int{1} } + +func (m *DMLData) GetEvents() []Event { + if m != nil { + return m.Events + } + return nil +} + +// Binlog contains all the changes in a transaction. +type Binlog struct { + Tp BinlogType `protobuf:"varint,1,opt,name=tp,enum=pb_binlog.BinlogType" json:"tp"` + CommitTs int64 `protobuf:"varint,2,opt,name=commit_ts,json=commitTs" json:"commit_ts"` + // dml_data is marshalled from DML type, + DmlData *DMLData `protobuf:"bytes,3,opt,name=dml_data,json=dmlData" json:"dml_data,omitempty"` + // ddl_query is the original ddl statement query. + DdlQuery []byte `protobuf:"bytes,4,opt,name=ddl_query,json=ddlQuery" json:"ddl_query,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Binlog) Reset() { *m = Binlog{} } +func (m *Binlog) String() string { return proto.CompactTextString(m) } +func (*Binlog) ProtoMessage() {} +func (*Binlog) Descriptor() ([]byte, []int) { return fileDescriptorBinlog, []int{2} } + +func (m *Binlog) GetTp() BinlogType { + if m != nil { + return m.Tp + } + return BinlogType_DML +} + +func (m *Binlog) GetCommitTs() int64 { + if m != nil { + return m.CommitTs + } + return 0 +} + +func (m *Binlog) GetDmlData() *DMLData { + if m != nil { + return m.DmlData + } + return nil +} + +func (m *Binlog) GetDdlQuery() []byte { + if m != nil { + return m.DdlQuery + } + return nil +} + +func init() { + proto.RegisterType((*Event)(nil), "pb_binlog.Event") + proto.RegisterType((*DMLData)(nil), "pb_binlog.DMLData") + proto.RegisterType((*Binlog)(nil), "pb_binlog.Binlog") + proto.RegisterEnum("pb_binlog.EventType", EventType_name, EventType_value) + proto.RegisterEnum("pb_binlog.BinlogType", BinlogType_name, BinlogType_value) +} +func (m *Event) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *Event) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.SchemaName != nil { + data[i] = 0xa + i++ + i = encodeVarintBinlog(data, i, uint64(len(*m.SchemaName))) + i += copy(data[i:], *m.SchemaName) + } + if m.TableName != nil { + data[i] = 0x12 + i++ + i = encodeVarintBinlog(data, i, uint64(len(*m.TableName))) + i += copy(data[i:], *m.TableName) + } + data[i] = 0x18 + i++ + i = encodeVarintBinlog(data, i, uint64(m.Tp)) + if m.Row != nil { + data[i] = 0x22 + i++ + i = encodeVarintBinlog(data, i, uint64(len(m.Row))) + i += copy(data[i:], m.Row) + } + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} + +func (m *DMLData) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *DMLData) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Events) > 0 { + for _, msg := range m.Events { + data[i] = 0xa + i++ + i = encodeVarintBinlog(data, i, uint64(msg.Size())) + n, err := msg.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} + +func (m *Binlog) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *Binlog) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + data[i] = 0x8 + i++ + i = encodeVarintBinlog(data, i, uint64(m.Tp)) + data[i] = 0x10 + i++ + i = encodeVarintBinlog(data, i, uint64(m.CommitTs)) + if m.DmlData != nil { + data[i] = 0x1a + i++ + i = encodeVarintBinlog(data, i, uint64(m.DmlData.Size())) + n1, err := m.DmlData.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n1 + } + if m.DdlQuery != nil { + data[i] = 0x22 + i++ + i = encodeVarintBinlog(data, i, uint64(len(m.DdlQuery))) + i += copy(data[i:], m.DdlQuery) + } + if m.XXX_unrecognized != nil { + i += copy(data[i:], m.XXX_unrecognized) + } + return i, nil +} + +func encodeFixed64Binlog(data []byte, offset int, v uint64) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + data[offset+4] = uint8(v >> 32) + data[offset+5] = uint8(v >> 40) + data[offset+6] = uint8(v >> 48) + data[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Binlog(data []byte, offset int, v uint32) int { + data[offset] = uint8(v) + data[offset+1] = uint8(v >> 8) + data[offset+2] = uint8(v >> 16) + data[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintBinlog(data []byte, offset int, v uint64) int { + for v >= 1<<7 { + data[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + data[offset] = uint8(v) + return offset + 1 +} +func (m *Event) Size() (n int) { + var l int + _ = l + if m.SchemaName != nil { + l = len(*m.SchemaName) + n += 1 + l + sovBinlog(uint64(l)) + } + if m.TableName != nil { + l = len(*m.TableName) + n += 1 + l + sovBinlog(uint64(l)) + } + n += 1 + sovBinlog(uint64(m.Tp)) + if m.Row != nil { + l = len(m.Row) + n += 1 + l + sovBinlog(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *DMLData) Size() (n int) { + var l int + _ = l + if len(m.Events) > 0 { + for _, e := range m.Events { + l = e.Size() + n += 1 + l + sovBinlog(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *Binlog) Size() (n int) { + var l int + _ = l + n += 1 + sovBinlog(uint64(m.Tp)) + n += 1 + sovBinlog(uint64(m.CommitTs)) + if m.DmlData != nil { + l = m.DmlData.Size() + n += 1 + l + sovBinlog(uint64(l)) + } + if m.DdlQuery != nil { + l = len(m.DdlQuery) + n += 1 + l + sovBinlog(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovBinlog(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozBinlog(x uint64) (n int) { + return sovBinlog(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Event) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBinlog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Event: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Event: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SchemaName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBinlog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthBinlog + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + s := string(data[iNdEx:postIndex]) + m.SchemaName = &s + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field TableName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBinlog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthBinlog + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + s := string(data[iNdEx:postIndex]) + m.TableName = &s + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Tp", wireType) + } + m.Tp = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBinlog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Tp |= (EventType(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Row", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBinlog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthBinlog + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Row = append(m.Row[:0], data[iNdEx:postIndex]...) + if m.Row == nil { + m.Row = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipBinlog(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthBinlog + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *DMLData) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBinlog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DMLData: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DMLData: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Events", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBinlog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthBinlog + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Events = append(m.Events, Event{}) + if err := m.Events[len(m.Events)-1].Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipBinlog(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthBinlog + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Binlog) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBinlog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Binlog: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Binlog: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Tp", wireType) + } + m.Tp = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBinlog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Tp |= (BinlogType(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field CommitTs", wireType) + } + m.CommitTs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBinlog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.CommitTs |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DmlData", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBinlog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthBinlog + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.DmlData == nil { + m.DmlData = &DMLData{} + } + if err := m.DmlData.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DdlQuery", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBinlog + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthBinlog + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DdlQuery = append(m.DdlQuery[:0], data[iNdEx:postIndex]...) + if m.DdlQuery == nil { + m.DdlQuery = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipBinlog(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthBinlog + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, data[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipBinlog(data []byte) (n int, err error) { + l := len(data) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowBinlog + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowBinlog + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if data[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowBinlog + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthBinlog + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowBinlog + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipBinlog(data[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthBinlog = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowBinlog = fmt.Errorf("proto: integer overflow") +) + +func init() { proto.RegisterFile("binlog.proto", fileDescriptorBinlog) } + +var fileDescriptorBinlog = []byte{ + // 354 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x5c, 0x90, 0xc1, 0x4a, 0xf3, 0x40, + 0x14, 0x85, 0x33, 0x49, 0xff, 0xb6, 0xb9, 0x2d, 0x3f, 0xc3, 0x50, 0x21, 0x28, 0xa6, 0xb1, 0xab, + 0x50, 0x31, 0x85, 0xee, 0xdc, 0x96, 0xb8, 0x10, 0x5a, 0xc1, 0x50, 0xd7, 0x61, 0xda, 0x19, 0x6a, + 0x21, 0x93, 0x89, 0xc9, 0xa8, 0xf4, 0x01, 0x7c, 0x07, 0x1f, 0xc0, 0x87, 0xe9, 0xd2, 0x27, 0x10, + 0xa9, 0x2f, 0x22, 0x99, 0x84, 0x5a, 0xba, 0x3b, 0x39, 0xe7, 0xde, 0x93, 0xef, 0x0e, 0x74, 0x17, + 0xeb, 0x34, 0x91, 0xab, 0x20, 0xcb, 0xa5, 0x92, 0xc4, 0xce, 0x16, 0x71, 0x65, 0x9c, 0xf6, 0x56, + 0x72, 0x25, 0xb5, 0x3b, 0x2a, 0x55, 0x35, 0x30, 0x78, 0x43, 0xf0, 0xef, 0xe6, 0x85, 0xa7, 0x8a, + 0xf4, 0xa1, 0x53, 0x2c, 0x1f, 0xb9, 0xa0, 0x71, 0x4a, 0x05, 0x77, 0x90, 0x87, 0x7c, 0x3b, 0x82, + 0xca, 0xba, 0xa3, 0x82, 0x93, 0x73, 0x00, 0x45, 0x17, 0x09, 0xaf, 0x72, 0x53, 0xe7, 0xb6, 0x76, + 0x74, 0x3c, 0x04, 0x53, 0x65, 0x8e, 0xe5, 0x21, 0xff, 0xff, 0xb8, 0x17, 0xec, 0xff, 0x1b, 0xe8, + 0xf6, 0xf9, 0x26, 0xe3, 0x93, 0xc6, 0xf6, 0xab, 0x6f, 0x44, 0xa6, 0xca, 0x08, 0x06, 0x2b, 0x97, + 0xaf, 0x4e, 0xc3, 0x43, 0x7e, 0x37, 0x2a, 0xe5, 0xe0, 0x1a, 0x5a, 0xe1, 0x6c, 0x1a, 0x52, 0x45, + 0x49, 0x00, 0x4d, 0x5e, 0xee, 0x14, 0x0e, 0xf2, 0x2c, 0xbf, 0x33, 0xc6, 0xc7, 0x65, 0x75, 0x51, + 0x3d, 0x35, 0xf8, 0x40, 0xd0, 0x9c, 0xe8, 0x98, 0x5c, 0x6a, 0x06, 0xa4, 0x19, 0x4e, 0x0e, 0xd6, + 0xaa, 0xf8, 0x08, 0xe2, 0x02, 0xec, 0xa5, 0x14, 0x62, 0xad, 0x62, 0x55, 0xe8, 0x73, 0xac, 0x3a, + 0x6c, 0x57, 0xf6, 0xbc, 0x20, 0x57, 0xd0, 0x66, 0x22, 0x89, 0x19, 0x55, 0x54, 0x5f, 0xd6, 0x19, + 0x93, 0x83, 0xd6, 0x1a, 0x38, 0x6a, 0x31, 0x91, 0x68, 0xf2, 0x33, 0xb0, 0x19, 0x4b, 0xe2, 0xa7, + 0x67, 0x9e, 0x6f, 0xea, 0xe3, 0xda, 0x8c, 0x25, 0xf7, 0xe5, 0xf7, 0x70, 0x04, 0xf6, 0xfe, 0x29, + 0x08, 0x40, 0xf3, 0x36, 0x2d, 0x78, 0xae, 0xb0, 0x51, 0xea, 0x87, 0x8c, 0x51, 0xc5, 0x31, 0x2a, + 0x75, 0xc8, 0x13, 0xae, 0x38, 0x36, 0x87, 0x2e, 0xc0, 0x1f, 0x37, 0x69, 0x81, 0x15, 0xce, 0xa6, + 0xd8, 0xd0, 0x22, 0x9c, 0x62, 0x34, 0xc1, 0xdb, 0x9d, 0x8b, 0x3e, 0x77, 0x2e, 0xfa, 0xde, 0xb9, + 0xe8, 0xfd, 0xc7, 0x35, 0x7e, 0x03, 0x00, 0x00, 0xff, 0xff, 0xfb, 0x8a, 0x91, 0x46, 0xfc, 0x01, + 0x00, 0x00, +}