From 194d4ac100bf937dbb457f0a1b628cf2bcf44b2d Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Wed, 12 Jan 2022 21:05:42 -0600 Subject: [PATCH] pb: update pb parser to avoid drainer failure (#1093) --- .github/pull_request_template.md | 6 ++++++ Makefile | 2 +- drainer/sync/pb.go | 3 ++- drainer/translator/pb.go | 23 ++++++++++------------- drainer/translator/pb_test.go | 29 ++++++++++++++++++++++++++++- drainer/translator/translator.go | 11 ++--------- pkg/loader/load.go | 13 +------------ pkg/loader/load_test.go | 18 ------------------ pkg/util/util.go | 23 +++++++++++++++++++++-- pkg/util/util_test.go | 22 +++++++++++++++++++++- tests/sequence/run.sh | 2 +- 11 files changed, 93 insertions(+), 59 deletions(-) diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 992528fb6..6ef5e0442 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -45,3 +45,9 @@ Related changes - Need to update the documentation - Need to update the `tidb-ansible` repository - Need to be included in the release note + +### Release note + + + +- No release note diff --git a/Makefile b/Makefile index 1d25fb870..c0e337d5c 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ GOVERSION := "`go version`" ARCH := "`uname -s`" LINUX := "Linux" MAC := "Darwin" -PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto' +PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto' | sed '/github.com\/pingcap\/tidb-binlog\/bin/d' PACKAGES := $$($(PACKAGE_LIST)) PACKAGE_DIRECTORIES := $(PACKAGE_LIST) | sed 's|github.com/pingcap/$(PROJECT)/||' FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor' | grep -vE 'binlog.pb.go') diff --git a/drainer/sync/pb.go b/drainer/sync/pb.go index 160f3253f..9f439f65b 100644 --- a/drainer/sync/pb.go +++ b/drainer/sync/pb.go @@ -19,10 +19,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" + tb "github.com/pingcap/tipb/go-binlog" + "github.com/pingcap/tidb-binlog/drainer/translator" "github.com/pingcap/tidb-binlog/pkg/binlogfile" pb "github.com/pingcap/tidb-binlog/proto/binlog" - tb "github.com/pingcap/tipb/go-binlog" ) var _ Syncer = &pbSyncer{} diff --git a/drainer/translator/pb.go b/drainer/translator/pb.go index f37fb75e0..de0d19f15 100644 --- a/drainer/translator/pb.go +++ b/drainer/translator/pb.go @@ -22,15 +22,15 @@ import ( //nolint "github.com/golang/protobuf/proto" "github.com/pingcap/errors" - "github.com/pingcap/tidb-binlog/pkg/util" - pb "github.com/pingcap/tidb-binlog/proto/binlog" - "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" tipb "github.com/pingcap/tipb/go-binlog" + + "github.com/pingcap/tidb-binlog/pkg/util" + pb "github.com/pingcap/tidb-binlog/proto/binlog" ) // TiBinlogToPbBinlog translate the binlog format @@ -41,16 +41,13 @@ func TiBinlogToPbBinlog(infoGetter TableInfoGetter, schema string, table string, if tiBinlog.DdlJobId > 0 { // DDL sql := string(tiBinlog.GetDdlQuery()) - stmt, err := getParser().ParseOneStmt(sql, "", "") - if err != nil { - return nil, errors.Trace(err) - } - - _, isCreateDatabase := stmt.(*ast.CreateDatabaseStmt) - if isCreateDatabase { - sql += ";" - } else { - sql = fmt.Sprintf("use %s; %s;", quoteName(schema), sql) + if len(schema) > 0 { + isCreateDatabase := util.IsCreateDatabaseDDL(sql, sqlMode) + if isCreateDatabase { + sql += ";" + } else { + sql = fmt.Sprintf("use %s; %s;", quoteName(schema), sql) + } } pbBinlog.Tp = pb.BinlogType_DDL diff --git a/drainer/translator/pb_test.go b/drainer/translator/pb_test.go index c9c536e1f..e448b68f5 100644 --- a/drainer/translator/pb_test.go +++ b/drainer/translator/pb_test.go @@ -17,10 +17,11 @@ import ( "fmt" "github.com/pingcap/check" - pb "github.com/pingcap/tidb-binlog/proto/binlog" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" + + pb "github.com/pingcap/tidb-binlog/proto/binlog" ) type testPbSuite struct { @@ -55,6 +56,32 @@ func (t *testPbSuite) TestDDL(c *check.C) { CommitTs: t.TiBinlog.GetCommitTs(), DdlQuery: []byte(expected), }) + + // test PB shouldn't reports an error when meeting unsupported DDL + t.TiBinlog.DdlQuery = []byte("invalid sql") + pbBinog, err = TiBinlogToPbBinlog(t, t.Schema, t.Table, t.TiBinlog, nil) + c.Assert(err, check.IsNil) + + c.Log("get ddl: ", string(pbBinog.GetDdlQuery())) + expected = fmt.Sprintf("use `%s`; %s;", t.Schema, string(t.TiBinlog.GetDdlQuery())) + c.Assert(pbBinog, check.DeepEquals, &pb.Binlog{ + Tp: pb.BinlogType_DDL, + CommitTs: t.TiBinlog.GetCommitTs(), + DdlQuery: []byte(expected), + }) + + // test PB shouldn't reports an error when meeting unsupported DDL + t.TiBinlog.DdlQuery = []byte("invalid sql") + t.Schema = "" + pbBinog, err = TiBinlogToPbBinlog(t, t.Schema, t.Table, t.TiBinlog, nil) + c.Assert(err, check.IsNil) + + c.Log("get ddl: ", string(pbBinog.GetDdlQuery())) + c.Assert(pbBinog, check.DeepEquals, &pb.Binlog{ + Tp: pb.BinlogType_DDL, + CommitTs: t.TiBinlog.GetCommitTs(), + DdlQuery: t.TiBinlog.GetDdlQuery(), + }) } func (t *testPbSuite) testDML(c *check.C, tp pb.EventType) { diff --git a/drainer/translator/translator.go b/drainer/translator/translator.go index 7d4cbe08c..df05a7b5c 100644 --- a/drainer/translator/translator.go +++ b/drainer/translator/translator.go @@ -19,8 +19,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" - "github.com/pingcap/tidb-binlog/pkg/util" - "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/table" @@ -28,6 +26,8 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" "go.uber.org/zap" + + "github.com/pingcap/tidb-binlog/pkg/util" ) var sqlMode mysql.SQLMode @@ -37,13 +37,6 @@ func SetSQLMode(mode mysql.SQLMode) { sqlMode = mode } -func getParser() (p *parser.Parser) { - p = parser.New() - p.SetSQLMode(sqlMode) - - return -} - func insertRowToDatums(table *model.TableInfo, row []byte) (datums map[int64]types.Datum, err error) { colsTypeMap := util.ToColumnTypeMap(table.Columns) diff --git a/pkg/loader/load.go b/pkg/loader/load.go index 37bb2d8ad..9e2c048c0 100644 --- a/pkg/loader/load.go +++ b/pkg/loader/load.go @@ -390,17 +390,6 @@ func needRefreshTableInfo(sql string) bool { return true } -func isCreateDatabaseDDL(sql string) bool { - stmt, err := parser.New().ParseOneStmt(sql, "", "") - if err != nil { - log.Error("parse sql failed", zap.String("sql", sql), zap.Error(err)) - return false - } - - _, isCreateDatabase := stmt.(*ast.CreateDatabaseStmt) - return isCreateDatabase -} - func (s *loaderImpl) execDDL(ddl *DDL) error { log.Debug("exec ddl", zap.Reflect("ddl", ddl), zap.Bool("shouldSkip", ddl.ShouldSkip)) if ddl.ShouldSkip { @@ -419,7 +408,7 @@ func (s *loaderImpl) processMysqlDDL(ddl *DDL) error { return err } - if len(ddl.Database) > 0 && !isCreateDatabaseDDL(ddl.SQL) { + if len(ddl.Database) > 0 && !util.IsCreateDatabaseDDL(ddl.SQL, tmysql.ModeNone) { _, err = tx.Exec(fmt.Sprintf("use %s;", quoteName(ddl.Database))) if err != nil { if rbErr := tx.Rollback(); rbErr != nil { diff --git a/pkg/loader/load_test.go b/pkg/loader/load_test.go index 1d2207e82..f89f35d88 100644 --- a/pkg/loader/load_test.go +++ b/pkg/loader/load_test.go @@ -320,24 +320,6 @@ func (s *getTblInfoSuite) TestShouldCacheResult(c *check.C) { c.Assert(nCalled, check.Equals, 1) } -type isCreateDBDDLSuite struct{} - -var _ = check.Suite(&isCreateDBDDLSuite{}) - -func (s *isCreateDBDDLSuite) TestInvalidSQL(c *check.C) { - c.Assert(isCreateDatabaseDDL("INSERT INTO Y a b c;"), check.IsFalse) -} - -func (s *isCreateDBDDLSuite) TestNonCreateDBSQL(c *check.C) { - c.Assert(isCreateDatabaseDDL("SELECT 1;"), check.IsFalse) - c.Assert(isCreateDatabaseDDL(`INSERT INTO tbl(id, name) VALUES(1, "test";`), check.IsFalse) -} - -func (s *isCreateDBDDLSuite) TestCreateDBSQL(c *check.C) { - c.Assert(isCreateDatabaseDDL("CREATE DATABASE test;"), check.IsTrue) - c.Assert(isCreateDatabaseDDL("create database `db2`;"), check.IsTrue) -} - type needRefreshTableInfoSuite struct{} var _ = check.Suite(&needRefreshTableInfoSuite{}) diff --git a/pkg/util/util.go b/pkg/util/util.go index 5d8220a82..75193e1b5 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -26,14 +26,19 @@ import ( "github.com/BurntSushi/toml" "github.com/pingcap/errors" "github.com/pingcap/log" - "github.com/pingcap/tidb-binlog/pkg/flags" - "github.com/pingcap/tidb-binlog/pkg/security" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" + _ "github.com/pingcap/tidb/types/parser_driver" // for parser driver "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" "go.uber.org/zap" + + "github.com/pingcap/tidb-binlog/pkg/flags" + "github.com/pingcap/tidb-binlog/pkg/security" ) // DefaultIP get a default non local ip, err is not nil, ip return 127.0.0.1 @@ -333,3 +338,17 @@ func WriteFileAtomic(filename string, data []byte, perm os.FileMode) error { return os.Rename(f.Name(), filename) } + +// IsCreateDatabaseDDL checks whether ddl is a create database statement +func IsCreateDatabaseDDL(sql string, sqlMode mysql.SQLMode) bool { + p := parser.New() + p.SetSQLMode(sqlMode) + stmt, err := p.ParseOneStmt(sql, "", "") + if err != nil { + log.Error("parse sql failed", zap.String("sql", sql), zap.Error(err)) + return false + } + + _, isCreateDatabase := stmt.(*ast.CreateDatabaseStmt) + return isCreateDatabase +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 7ebd9825d..8cd3a55da 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -22,13 +22,14 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/log" - "github.com/pingcap/tidb-binlog/pkg/security" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" pd "github.com/tikv/pd/client" "go.uber.org/zap/zapcore" + + "github.com/pingcap/tidb-binlog/pkg/security" ) // Hook up gocheck into the "go test" runner. @@ -340,3 +341,22 @@ func (s *waitUntilTimeoutSuit) TestGoAndAbortGoroutine(c *C) { }, time.Second) c.Assert(called, IsTrue) } + +type isCreateDBDDLSuite struct{} + +var _ = Suite(&isCreateDBDDLSuite{}) + +func (s *isCreateDBDDLSuite) TestInvalidSQL(c *C) { + c.Assert(IsCreateDatabaseDDL("INSERT INTO Y a b c;", mysql.ModeNone), IsFalse) + c.Assert(IsCreateDatabaseDDL("Invalid SQL", mysql.ModeNone), IsFalse) +} + +func (s *isCreateDBDDLSuite) TestNonCreateDBSQL(c *C) { + c.Assert(IsCreateDatabaseDDL("SELECT 1;", mysql.ModeNone), IsFalse) + c.Assert(IsCreateDatabaseDDL(`INSERT INTO tbl(id, name) VALUES(1, "test";`, mysql.ModeNone), IsFalse) +} + +func (s *isCreateDBDDLSuite) TestCreateDBSQL(c *C) { + c.Assert(IsCreateDatabaseDDL("CREATE DATABASE test;", mysql.ModeNone), IsTrue) + c.Assert(IsCreateDatabaseDDL("create database `db2`;", mysql.ModeNone), IsTrue) +} diff --git a/tests/sequence/run.sh b/tests/sequence/run.sh index 41985fbf2..0ca678dfd 100755 --- a/tests/sequence/run.sh +++ b/tests/sequence/run.sh @@ -14,7 +14,7 @@ run_sql 'CREATE TABLE seq.table_name (id INT DEFAULT NEXT VALUE FOR seq.sequence run_sql 'INSERT INTO seq.table_name(val) values(10);' -sleep 3 +sleep 5 down_run_sql 'SELECT count(*), sum(id), sum(val) FROM seq.table_name;' check_contains 'count(*): 1'