Skip to content

Commit

Permalink
pb: update pb parser to avoid drainer failure (#1093)
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu authored Jan 13, 2022
1 parent 4fd9ebd commit 194d4ac
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 59 deletions.
6 changes: 6 additions & 0 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,9 @@ Related changes
- Need to update the documentation
- Need to update the `tidb-ansible` repository
- Need to be included in the release note

### Release note

<!-- bugfix or new feature needs a release note -->

- No release note
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ GOVERSION := "`go version`"
ARCH := "`uname -s`"
LINUX := "Linux"
MAC := "Darwin"
PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto'
PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto' | sed '/github.com\/pingcap\/tidb-binlog\/bin/d'
PACKAGES := $$($(PACKAGE_LIST))
PACKAGE_DIRECTORIES := $(PACKAGE_LIST) | sed 's|github.com/pingcap/$(PROJECT)/||'
FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor' | grep -vE 'binlog.pb.go')
Expand Down
3 changes: 2 additions & 1 deletion drainer/sync/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
tb "github.com/pingcap/tipb/go-binlog"

"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/pingcap/tidb-binlog/pkg/binlogfile"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
tb "github.com/pingcap/tipb/go-binlog"
)

var _ Syncer = &pbSyncer{}
Expand Down
23 changes: 10 additions & 13 deletions drainer/translator/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import (
//nolint
"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-binlog/pkg/util"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
tipb "github.com/pingcap/tipb/go-binlog"

"github.com/pingcap/tidb-binlog/pkg/util"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
)

// TiBinlogToPbBinlog translate the binlog format
Expand All @@ -41,16 +41,13 @@ func TiBinlogToPbBinlog(infoGetter TableInfoGetter, schema string, table string,

if tiBinlog.DdlJobId > 0 { // DDL
sql := string(tiBinlog.GetDdlQuery())
stmt, err := getParser().ParseOneStmt(sql, "", "")
if err != nil {
return nil, errors.Trace(err)
}

_, isCreateDatabase := stmt.(*ast.CreateDatabaseStmt)
if isCreateDatabase {
sql += ";"
} else {
sql = fmt.Sprintf("use %s; %s;", quoteName(schema), sql)
if len(schema) > 0 {
isCreateDatabase := util.IsCreateDatabaseDDL(sql, sqlMode)
if isCreateDatabase {
sql += ";"
} else {
sql = fmt.Sprintf("use %s; %s;", quoteName(schema), sql)
}
}

pbBinlog.Tp = pb.BinlogType_DDL
Expand Down
29 changes: 28 additions & 1 deletion drainer/translator/pb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ import (
"fmt"

"github.com/pingcap/check"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"

pb "github.com/pingcap/tidb-binlog/proto/binlog"
)

type testPbSuite struct {
Expand Down Expand Up @@ -55,6 +56,32 @@ func (t *testPbSuite) TestDDL(c *check.C) {
CommitTs: t.TiBinlog.GetCommitTs(),
DdlQuery: []byte(expected),
})

// test PB shouldn't reports an error when meeting unsupported DDL
t.TiBinlog.DdlQuery = []byte("invalid sql")
pbBinog, err = TiBinlogToPbBinlog(t, t.Schema, t.Table, t.TiBinlog, nil)
c.Assert(err, check.IsNil)

c.Log("get ddl: ", string(pbBinog.GetDdlQuery()))
expected = fmt.Sprintf("use `%s`; %s;", t.Schema, string(t.TiBinlog.GetDdlQuery()))
c.Assert(pbBinog, check.DeepEquals, &pb.Binlog{
Tp: pb.BinlogType_DDL,
CommitTs: t.TiBinlog.GetCommitTs(),
DdlQuery: []byte(expected),
})

// test PB shouldn't reports an error when meeting unsupported DDL
t.TiBinlog.DdlQuery = []byte("invalid sql")
t.Schema = ""
pbBinog, err = TiBinlogToPbBinlog(t, t.Schema, t.Table, t.TiBinlog, nil)
c.Assert(err, check.IsNil)

c.Log("get ddl: ", string(pbBinog.GetDdlQuery()))
c.Assert(pbBinog, check.DeepEquals, &pb.Binlog{
Tp: pb.BinlogType_DDL,
CommitTs: t.TiBinlog.GetCommitTs(),
DdlQuery: t.TiBinlog.GetDdlQuery(),
})
}

func (t *testPbSuite) testDML(c *check.C, tp pb.EventType) {
Expand Down
11 changes: 2 additions & 9 deletions drainer/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/pkg/util"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"go.uber.org/zap"

"github.com/pingcap/tidb-binlog/pkg/util"
)

var sqlMode mysql.SQLMode
Expand All @@ -37,13 +37,6 @@ func SetSQLMode(mode mysql.SQLMode) {
sqlMode = mode
}

func getParser() (p *parser.Parser) {
p = parser.New()
p.SetSQLMode(sqlMode)

return
}

func insertRowToDatums(table *model.TableInfo, row []byte) (datums map[int64]types.Datum, err error) {
colsTypeMap := util.ToColumnTypeMap(table.Columns)

Expand Down
13 changes: 1 addition & 12 deletions pkg/loader/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,17 +390,6 @@ func needRefreshTableInfo(sql string) bool {
return true
}

func isCreateDatabaseDDL(sql string) bool {
stmt, err := parser.New().ParseOneStmt(sql, "", "")
if err != nil {
log.Error("parse sql failed", zap.String("sql", sql), zap.Error(err))
return false
}

_, isCreateDatabase := stmt.(*ast.CreateDatabaseStmt)
return isCreateDatabase
}

func (s *loaderImpl) execDDL(ddl *DDL) error {
log.Debug("exec ddl", zap.Reflect("ddl", ddl), zap.Bool("shouldSkip", ddl.ShouldSkip))
if ddl.ShouldSkip {
Expand All @@ -419,7 +408,7 @@ func (s *loaderImpl) processMysqlDDL(ddl *DDL) error {
return err
}

if len(ddl.Database) > 0 && !isCreateDatabaseDDL(ddl.SQL) {
if len(ddl.Database) > 0 && !util.IsCreateDatabaseDDL(ddl.SQL, tmysql.ModeNone) {
_, err = tx.Exec(fmt.Sprintf("use %s;", quoteName(ddl.Database)))
if err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
Expand Down
18 changes: 0 additions & 18 deletions pkg/loader/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,24 +320,6 @@ func (s *getTblInfoSuite) TestShouldCacheResult(c *check.C) {
c.Assert(nCalled, check.Equals, 1)
}

type isCreateDBDDLSuite struct{}

var _ = check.Suite(&isCreateDBDDLSuite{})

func (s *isCreateDBDDLSuite) TestInvalidSQL(c *check.C) {
c.Assert(isCreateDatabaseDDL("INSERT INTO Y a b c;"), check.IsFalse)
}

func (s *isCreateDBDDLSuite) TestNonCreateDBSQL(c *check.C) {
c.Assert(isCreateDatabaseDDL("SELECT 1;"), check.IsFalse)
c.Assert(isCreateDatabaseDDL(`INSERT INTO tbl(id, name) VALUES(1, "test";`), check.IsFalse)
}

func (s *isCreateDBDDLSuite) TestCreateDBSQL(c *check.C) {
c.Assert(isCreateDatabaseDDL("CREATE DATABASE test;"), check.IsTrue)
c.Assert(isCreateDatabaseDDL("create database `db2`;"), check.IsTrue)
}

type needRefreshTableInfoSuite struct{}

var _ = check.Suite(&needRefreshTableInfoSuite{})
Expand Down
23 changes: 21 additions & 2 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,19 @@ import (
"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/pkg/flags"
"github.com/pingcap/tidb-binlog/pkg/security"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
_ "github.com/pingcap/tidb/types/parser_driver" // for parser driver
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"

"github.com/pingcap/tidb-binlog/pkg/flags"
"github.com/pingcap/tidb-binlog/pkg/security"
)

// DefaultIP get a default non local ip, err is not nil, ip return 127.0.0.1
Expand Down Expand Up @@ -333,3 +338,17 @@ func WriteFileAtomic(filename string, data []byte, perm os.FileMode) error {

return os.Rename(f.Name(), filename)
}

// IsCreateDatabaseDDL checks whether ddl is a create database statement
func IsCreateDatabaseDDL(sql string, sqlMode mysql.SQLMode) bool {
p := parser.New()
p.SetSQLMode(sqlMode)
stmt, err := p.ParseOneStmt(sql, "", "")
if err != nil {
log.Error("parse sql failed", zap.String("sql", sql), zap.Error(err))
return false
}

_, isCreateDatabase := stmt.(*ast.CreateDatabaseStmt)
return isCreateDatabase
}
22 changes: 21 additions & 1 deletion pkg/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/pkg/security"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
pd "github.com/tikv/pd/client"
"go.uber.org/zap/zapcore"

"github.com/pingcap/tidb-binlog/pkg/security"
)

// Hook up gocheck into the "go test" runner.
Expand Down Expand Up @@ -340,3 +341,22 @@ func (s *waitUntilTimeoutSuit) TestGoAndAbortGoroutine(c *C) {
}, time.Second)
c.Assert(called, IsTrue)
}

type isCreateDBDDLSuite struct{}

var _ = Suite(&isCreateDBDDLSuite{})

func (s *isCreateDBDDLSuite) TestInvalidSQL(c *C) {
c.Assert(IsCreateDatabaseDDL("INSERT INTO Y a b c;", mysql.ModeNone), IsFalse)
c.Assert(IsCreateDatabaseDDL("Invalid SQL", mysql.ModeNone), IsFalse)
}

func (s *isCreateDBDDLSuite) TestNonCreateDBSQL(c *C) {
c.Assert(IsCreateDatabaseDDL("SELECT 1;", mysql.ModeNone), IsFalse)
c.Assert(IsCreateDatabaseDDL(`INSERT INTO tbl(id, name) VALUES(1, "test";`, mysql.ModeNone), IsFalse)
}

func (s *isCreateDBDDLSuite) TestCreateDBSQL(c *C) {
c.Assert(IsCreateDatabaseDDL("CREATE DATABASE test;", mysql.ModeNone), IsTrue)
c.Assert(IsCreateDatabaseDDL("create database `db2`;", mysql.ModeNone), IsTrue)
}
2 changes: 1 addition & 1 deletion tests/sequence/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ run_sql 'CREATE TABLE seq.table_name (id INT DEFAULT NEXT VALUE FOR seq.sequence

run_sql 'INSERT INTO seq.table_name(val) values(10);'

sleep 3
sleep 5

down_run_sql 'SELECT count(*), sum(id), sum(val) FROM seq.table_name;'
check_contains 'count(*): 1'
Expand Down

0 comments on commit 194d4ac

Please sign in to comment.