Skip to content

Commit

Permalink
drainer: refactor executor & add self-description pb output (#121)
Browse files Browse the repository at this point in the history
*: add output self-description protobuf binlog
  • Loading branch information
IANTHEREAL authored Feb 28, 2017
1 parent 92dd34f commit 70ca906
Show file tree
Hide file tree
Showing 21 changed files with 1,894 additions and 585 deletions.
12 changes: 7 additions & 5 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ data-dir = "data.drainer"
# a comma separated list of PD endpoints
pd-urls = "http://127.0.0.1:2379"

# Executor Configuration.
[executor]
# syncer Configuration.
[syncer]

# disable sync these schema
ignore-schemas = "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql"
Expand All @@ -24,8 +24,10 @@ txn-batch = 1
# work count to execute binlogs
worker-count = 1

disable-dispatch = true

# the downstream mysql protocol database
[executor.to]
[syncer.to]
host = "127.0.0.1"
user = "root"
password = ""
Expand All @@ -35,10 +37,10 @@ port = 3306
##and we support regex expression , start with '~' declare use regex expression.
#
#replicate-do-db = ["~^b.*","s1"]
#[[executor.replicate-do-table]]
#[[syncer.replicate-do-table]]
#db-name ="test"
#tbl-name = "log"

#[[executor.replicate-do-table]]
#[[syncer.replicate-do-table]]
#db-name ="test"
#tbl-name = "~^a.*"
10 changes: 5 additions & 5 deletions drainer/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Collector struct {
tiStore kv.Storage
pumps map[string]*Pump
bh *binlogHeap
executor *Executor
syncer *Syncer
latestTS int64
meta Meta

Expand All @@ -52,7 +52,7 @@ type Collector struct {
}

// NewCollector returns an instance of Collector
func NewCollector(cfg *Config, clusterID uint64, w *DepositWindow, e *Executor, m Meta) (*Collector, error) {
func NewCollector(cfg *Config, clusterID uint64, w *DepositWindow, s *Syncer, m Meta) (*Collector, error) {
urlv, err := flags.NewURLsValue(cfg.EtcdURLs)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -79,7 +79,7 @@ func NewCollector(cfg *Config, clusterID uint64, w *DepositWindow, e *Executor,
pumps: make(map[string]*Pump),
bh: newBinlogHeap(maxHeapSize),
window: w,
executor: e,
syncer: s,
meta: m,
tiClient: tiClient,
tiStore: tiStore,
Expand Down Expand Up @@ -272,8 +272,8 @@ func (c *Collector) publishBinlogs(ctx context.Context, minTS, maxTS int64) {

item := c.bh.pop()
for item != nil {
c.executor.AddToExectorChan(item)
// if binlogOffsets[item.nodeID] == len(bss[item.nodeID]), all binlogs must be pushed into heap, then delete it from bss
c.syncer.Add(item)
// if binlogOffsets[item.nodeID] == len(bss[item.nodeID]), all binlogs must be pushed into heap, delete it from bss
if binlogOffsets[item.nodeID] == len(bss[item.nodeID]) {
delete(bss, item.nodeID)
} else {
Expand Down
72 changes: 36 additions & 36 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package drainer
import (
"flag"
"fmt"
"math"
"net"
"net/url"
"os"
Expand All @@ -12,6 +13,7 @@ import (

"github.com/BurntSushi/toml"
"github.com/juju/errors"
"github.com/pingcap/tidb-binlog/drainer/executor"
"github.com/pingcap/tidb-binlog/pkg/flags"
)

Expand All @@ -25,36 +27,29 @@ const (
defaultPumpTimeout = 5 * time.Second
)

// DBConfig is the DB configuration.
type DBConfig struct {
Host string `toml:"host" json:"host"`
User string `toml:"user" json:"user"`
Password string `toml:"password" json:"password"`
Port int `toml:"port" json:"port"`
}

// ExecutorConfig is the Executor's configuration.
type ExecutorConfig struct {
IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"`
TxnBatch int `toml:"txn-batch" json:"txn-batch"`
WorkerCount int `toml:"worker-count" json:"worker-count"`
To DBConfig `toml:"to" json:"to"`
DoTables []TableName `toml:"replicate-do-table" json:"replicate-do-table"`
DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"`
DestDBType string `toml:"db-type" json:"db-type"`
// SyncerConfig is the Syncer's configuration.
type SyncerConfig struct {
IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"`
TxnBatch int `toml:"txn-batch" json:"txn-batch"`
WorkerCount int `toml:"worker-count" json:"worker-count"`
To *executor.DBConfig `toml:"to" json:"to"`
DoTables []TableName `toml:"replicate-do-table" json:"replicate-do-table"`
DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"`
DestDBType string `toml:"db-type" json:"db-type"`
DisableDispatch bool `toml:"disable-dispatch" json:"disable-dispatch"`
}

// Config holds the configuration of drainer
type Config struct {
*flag.FlagSet
LogLevel string `toml:"log-level" json:"log-level"`
ListenAddr string `toml:"addr" json:"addr"`
DataDir string `toml:"data-dir" json:"data-dir"`
DetectInterval int `toml:"detect-interval" json:"detect-interval"`
EtcdURLs string `toml:"pd-urls" json:"pd-urls"`
LogFile string `toml:"log-file" json:"log-file"`
LogRotate string `toml:"log-rotate" json:"log-rotate"`
ExecutorCfg *ExecutorConfig `toml:"executor" json:"executor"`
LogLevel string `toml:"log-level" json:"log-level"`
ListenAddr string `toml:"addr" json:"addr"`
DataDir string `toml:"data-dir" json:"data-dir"`
DetectInterval int `toml:"detect-interval" json:"detect-interval"`
EtcdURLs string `toml:"pd-urls" json:"pd-urls"`
LogFile string `toml:"log-file" json:"log-file"`
LogRotate string `toml:"log-rotate" json:"log-rotate"`
SyncerCfg *SyncerConfig `toml:"syncer" json:"sycner"`
EtcdTimeout time.Duration
PumpTimeout time.Duration
MetricsAddr string
Expand All @@ -68,7 +63,7 @@ func NewConfig() *Config {
cfg := &Config{
EtcdTimeout: defaultEtcdTimeout,
PumpTimeout: defaultPumpTimeout,
ExecutorCfg: new(ExecutorConfig),
SyncerCfg: new(SyncerConfig),
}
cfg.FlagSet = flag.NewFlagSet("drainer", flag.ContinueOnError)
fs := cfg.FlagSet
Expand All @@ -87,14 +82,11 @@ func NewConfig() *Config {
fs.IntVar(&cfg.MetricsInterval, "metrics-interval", 15, "prometheus client push interval in second, set \"0\" to disable prometheus push")
fs.StringVar(&cfg.LogFile, "log-file", "", "log file path")
fs.StringVar(&cfg.LogRotate, "log-rotate", "", "log file rotate type, hour/day")
fs.IntVar(&cfg.ExecutorCfg.TxnBatch, "txn-batch", 1, "number of binlog events in a transaction batch")
fs.StringVar(&cfg.ExecutorCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync the meta schema")
fs.IntVar(&cfg.ExecutorCfg.WorkerCount, "c", 1, "parallel worker count")
fs.StringVar(&cfg.ExecutorCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql, postgresql")
fs.StringVar(&cfg.ExecutorCfg.To.Host, "db-host", "127.0.0.1", "host of target database")
fs.IntVar(&cfg.ExecutorCfg.To.Port, "db-port", 3306, "port of target database")
fs.StringVar(&cfg.ExecutorCfg.To.User, "db-username", "root", "username of target database")
fs.StringVar(&cfg.ExecutorCfg.To.Password, "db-password", "", "password of target database")
fs.IntVar(&cfg.SyncerCfg.TxnBatch, "txn-batch", 1, "number of binlog events in a transaction batch")
fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas")
fs.IntVar(&cfg.SyncerCfg.WorkerCount, "c", 1, "parallel worker count")
fs.StringVar(&cfg.SyncerCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql, postgresql")
fs.BoolVar(&cfg.SyncerCfg.DisableDispatch, "disable-dispatch", false, "disable dispatching sqls that in one same binlog; if set true, work-count and txn-batch would be useless")
return cfg
}

Expand Down Expand Up @@ -137,11 +129,19 @@ func (cfg *Config) Parse(args []string) error {
cfg.ListenAddr = "http://" + cfg.ListenAddr // add 'http:' scheme to facilitate parsing
adjustString(&cfg.DataDir, defaultDataDir)
adjustInt(&cfg.DetectInterval, defaultDetectInterval)
cfg.ExecutorCfg.adjustDoDBAndTable()
cfg.SyncerCfg.adjustWorkCount()
cfg.SyncerCfg.adjustDoDBAndTable()
return cfg.validate()
}

func (c *ExecutorConfig) adjustDoDBAndTable() {
func (c *SyncerConfig) adjustWorkCount() {
if c.DisableDispatch {
c.WorkerCount = 1
c.TxnBatch = math.MaxInt32
}
}

func (c *SyncerConfig) adjustDoDBAndTable() {
for i := 0; i < len(c.DoTables); i++ {
c.DoTables[i].Table = strings.ToLower(c.DoTables[i].Table)
c.DoTables[i].Schema = strings.ToLower(c.DoTables[i].Schema)
Expand Down
25 changes: 25 additions & 0 deletions drainer/executor/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package executor

import (
"github.com/juju/errors"
)

// Executor is the interface for execute TiDB binlog's sql
type Executor interface {
// Execute executes TiDB binlogs
Execute([]string, [][]interface{}, []int64, bool) error
// Close closes the executor
Close() error
}

// New returns the an Executor instance by given name
func New(name string, cfg *DBConfig) (Executor, error) {
switch name {
case "mysql":
return newMysql(cfg)
case "pb":
return newPB(cfg)
default:
return nil, errors.Errorf("unsupport executor type %s", name)
}
}
30 changes: 30 additions & 0 deletions drainer/executor/mysql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package executor

import (
"database/sql"

"github.com/juju/errors"
)

type mysqlExecutor struct {
db *sql.DB
}

func newMysql(cfg *DBConfig) (Executor, error) {
db, err := openDB("mysql", cfg.Host, cfg.Port, cfg.User, cfg.Password)
if err != nil {
return nil, errors.Trace(err)
}

return &mysqlExecutor{
db: db,
}, nil
}

func (m *mysqlExecutor) Execute(sqls []string, args [][]interface{}, commitTSs []int64, isDDL bool) error {
return executeSQLs(m.db, sqls, args, isDDL)
}

func (m *mysqlExecutor) Close() error {
return m.db.Close()
}
70 changes: 70 additions & 0 deletions drainer/executor/pb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package executor

import (
"github.com/juju/errors"
"github.com/pingcap/tidb-binlog/pkg/file"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
"github.com/pingcap/tidb-binlog/pump"
)

type pbExecutor struct {
dir string
binlogger pump.Binlogger
}

func newPB(cfg *DBConfig) (Executor, error) {
var (
binlogger pump.Binlogger
err error
)
dirPath := cfg.BinlogFileDir
names, err := file.ReadDir(dirPath)
if err != nil {
return nil, errors.Trace(err)
}

if len(names) > 0 {
binlogger, err = pump.OpenBinlogger(dirPath)
} else {
binlogger, err = pump.CreateBinlogger(dirPath)
}
if err != nil {
return nil, errors.Trace(err)
}

return &pbExecutor{
dir: cfg.BinlogFileDir,
binlogger: binlogger,
}, nil
}

func (p *pbExecutor) Execute(sqls []string, args [][]interface{}, commitTSs []int64, isDDL bool) error {
binlog := &pb.Binlog{CommitTs: commitTSs[0]}
if isDDL {
binlog.Tp = pb.BinlogType_DDL
binlog.DdlQuery = []byte(sqls[0])
return p.saveBinlog(binlog)
}

binlog.Tp = pb.BinlogType_DML
binlog.DmlData = new(pb.DMLData)
for i := range sqls {
// event can be only pb.Event, otherwise need to panic
event := args[i][0].(*pb.Event)
binlog.DmlData.Events = append(binlog.DmlData.Events, *event)
}
return p.saveBinlog(binlog)
}

func (p *pbExecutor) Close() error {
return p.binlogger.Close()
}

func (p *pbExecutor) saveBinlog(binlog *pb.Binlog) error {
data, err := binlog.Marshal()
if err != nil {
return errors.Trace(err)
}

return p.binlogger.WriteTail(data)
}
Loading

0 comments on commit 70ca906

Please sign in to comment.