Skip to content

Commit

Permalink
drainer/translater: fix the bug that drainer writes incorrect time da…
Browse files Browse the repository at this point in the history
…ta (#1160)

close #1159
  • Loading branch information
lichunzhu authored May 23, 2022
1 parent 09d4a33 commit b8a6395
Show file tree
Hide file tree
Showing 22 changed files with 244 additions and 63 deletions.
4 changes: 3 additions & 1 deletion drainer/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ func (p *Pump) PullBinlog(pctx context.Context, last int64) chan MergeItem {
payloadSize := len(resp.Entity.Payload)
readBinlogSizeHistogram.WithLabelValues(p.nodeID).Observe(float64(payloadSize))
if len(resp.Entity.Payload) >= 10*1024*1024 {
log.Info("receive big size binlog", zap.String("size", humanize.Bytes(uint64(payloadSize))))
p.logger.Info("receive big size binlog before unmarshal", zap.String("size", humanize.Bytes(uint64(payloadSize))),
zap.Int64("start ts", resp.Entity.Meta.StartTs), zap.Int64("commit ts", resp.Entity.Meta.CommitTs))
}

binlog := new(pb.Binlog)
Expand All @@ -176,6 +177,7 @@ func (p *Pump) PullBinlog(pctx context.Context, last int64) chan MergeItem {
p.reportErr(pctx, err)
return
}
resp.Entity.Payload = nil // GC

millisecond := time.Now().UnixNano()/1000000 - oracle.ExtractPhysical(uint64(binlog.CommitTs))
binlogReachDurationHistogram.WithLabelValues(p.nodeID).Observe(float64(millisecond) / 1000.0)
Expand Down
7 changes: 4 additions & 3 deletions drainer/sync/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (
"github.com/Shopify/sarama"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/pingcap/tidb-binlog/pkg/util"
obinlog "github.com/pingcap/tidb-tools/tidb-binlog/proto/go-binlog"
"go.uber.org/zap"

"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/pingcap/tidb-binlog/pkg/util"
)

var maxWaitTimeToSendMSG = time.Second * 30
Expand Down Expand Up @@ -68,7 +69,7 @@ func NewKafka(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter) (*Kafka
topic: topic,
ackWindow: newAckWindow(),
shutdown: make(chan struct{}),
baseSyncer: newBaseSyncer(tableInfoGetter),
baseSyncer: newBaseSyncer(tableInfoGetter, nil),
}

config, err := util.NewSaramaConfig(cfg.KafkaVersion, "kafka.")
Expand Down
69 changes: 67 additions & 2 deletions drainer/sync/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@
package sync

import (
"context"
"database/sql"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb/types"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"

"github.com/pingcap/tidb-binlog/drainer/loopbacksync"
"github.com/pingcap/tidb-binlog/drainer/relay"
Expand Down Expand Up @@ -132,11 +137,20 @@ func NewMysqlSyncer(
return nil, errors.Trace(err)
}

tzStr := ""
if len(cfg.Params) > 0 {
tzStr = cfg.Params["time_zone"]
}
timeZone, err := str2TimezoneOrFromDB(tzStr, db)
if err != nil {
return nil, errors.Trace(err)
}

s := &MysqlSyncer{
db: db,
loader: loader,
relayer: relayer,
baseSyncer: newBaseSyncer(tableInfoGetter),
baseSyncer: newBaseSyncer(tableInfoGetter, timeZone),
}

go s.run()
Expand Down Expand Up @@ -167,6 +181,57 @@ func relaxSQLMode(db *sql.DB) (oldMode string, newMode string, err error) {
return
}

func str2TimezoneOrFromDB(tzStr string, db *sql.DB) (*time.Location, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var err error
if len(tzStr) == 0 {
dur, err := dbutil.GetTimeZoneOffset(ctx, db)
if err != nil {
return nil, errors.Trace(err)
}
tzStr = dbutil.FormatTimeZoneOffset(dur)
}
if tzStr == "SYSTEM" || tzStr == "Local" {
return nil, errors.New("'SYSTEM' or 'Local' time_zone is not supported")
}

loc, err := time.LoadLocation(tzStr)
if err == nil {
return loc, nil
}

// The value can be given as a string indicating an offset from UTC, such as '+10:00' or '-6:00'.
// The time zone's value should in [-12:59,+14:00].
// See: https://dev.mysql.com/doc/refman/8.0/en/time-zone-support.html#time-zone-variables
if strings.HasPrefix(tzStr, "+") || strings.HasPrefix(tzStr, "-") {
d, err := types.ParseDuration(nil, tzStr[1:], 0)
if err == nil {
if tzStr[0] == '-' {
if d.Duration > 12*time.Hour+59*time.Minute {
return nil, errors.Errorf("invalid timezone %s", tzStr)
}
} else {
if d.Duration > 14*time.Hour {
return nil, errors.Errorf("invalid timezone %s", tzStr)
}
}

ofst := int(d.Duration / time.Second)
if tzStr[0] == '-' {
ofst = -ofst
}
name := dbutil.FormatTimeZoneOffset(d.Duration)
return time.FixedZone(name, ofst), nil
}
}
if err != nil {
return nil, err
}
log.Info("use timezone", zap.String("location", loc.String()))
return loc, nil
}

// SetSafeMode make the MysqlSyncer to use safe mode or not
func (m *MysqlSyncer) SetSafeMode(mode bool) bool {
m.loader.SetSafeMode(mode)
Expand All @@ -184,7 +249,7 @@ func (m *MysqlSyncer) Sync(item *Item) error {
item.RelayLogPos = pos
}

txn, err := translator.TiBinlogToTxn(m.tableInfoGetter, item.Schema, item.Table, item.Binlog, item.PrewriteValue, item.ShouldSkip)
txn, err := translator.TiBinlogToTxn(m.tableInfoGetter, item.Schema, item.Table, item.Binlog, item.PrewriteValue, item.ShouldSkip, m.timeZone)
if err != nil {
return errors.Trace(err)
}
Expand Down
5 changes: 3 additions & 2 deletions drainer/sync/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/check"
"github.com/pingcap/errors"

"github.com/pingcap/tidb-binlog/drainer/relay"
"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/pingcap/tidb-binlog/pkg/binlogfile"
Expand Down Expand Up @@ -59,7 +60,7 @@ func (s *mysqlSuite) TestMySQLSyncerAvoidBlock(c *check.C) {
syncer := &MysqlSyncer{
db: db,
loader: fakeMySQLLoaderImpl,
baseSyncer: newBaseSyncer(infoGetter),
baseSyncer: newBaseSyncer(infoGetter, nil),
}
go syncer.run()
gen := translator.BinlogGenerator{}
Expand Down Expand Up @@ -134,7 +135,7 @@ func (s *mysqlSuite) TestMySQLSyncerWithRelayer(c *check.C) {
db: db,
loader: fakeMySQLLoaderImpl,
relayer: relayer,
baseSyncer: newBaseSyncer(infoGetter),
baseSyncer: newBaseSyncer(infoGetter, nil),
}
defer syncer.Close()

Expand Down
7 changes: 4 additions & 3 deletions drainer/sync/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/prometheus/client_golang/prometheus"

"github.com/pingcap/tidb-binlog/drainer/relay"
"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/pingcap/tidb-binlog/pkg/loader"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/prometheus/client_golang/prometheus"
)

var _ Syncer = &OracleSyncer{}
Expand Down Expand Up @@ -55,7 +56,7 @@ func NewOracleSyncer(
db: db,
loader: loader,
relayer: relayer,
baseSyncer: newBaseSyncer(tableInfoGetter),
baseSyncer: newBaseSyncer(tableInfoGetter, nil),
tableRouter: tableRouter,
}

Expand Down
5 changes: 3 additions & 2 deletions drainer/sync/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/check"
"github.com/pingcap/errors"

"github.com/pingcap/tidb-binlog/drainer/relay"
"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/pingcap/tidb-binlog/pkg/binlogfile"
Expand Down Expand Up @@ -83,7 +84,7 @@ func (s *oracleSuite) TestOracleSyncerAvoidBlock(c *check.C) {
syncer := &OracleSyncer{
db: db,
loader: fakeOracleLoaderImpl,
baseSyncer: newBaseSyncer(infoGetter),
baseSyncer: newBaseSyncer(infoGetter, nil),
tableRouter: router,
}
go syncer.run()
Expand Down Expand Up @@ -163,7 +164,7 @@ func (s *oracleSuite) TestOracleSyncerWithRelayer(c *check.C) {
db: db,
loader: fakeOracleLoaderImpl,
relayer: relayer,
baseSyncer: newBaseSyncer(infoGetter),
baseSyncer: newBaseSyncer(infoGetter, nil),
tableRouter: router,
}
defer syncer.Close()
Expand Down
2 changes: 1 addition & 1 deletion drainer/sync/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewPBSyncer(dir string, retentionDays int, tableInfoGetter translator.Table

s := &pbSyncer{
binlogger: binlogger,
baseSyncer: newBaseSyncer(tableInfoGetter),
baseSyncer: newBaseSyncer(tableInfoGetter, nil),
cancel: cancel,
}

Expand Down
11 changes: 9 additions & 2 deletions drainer/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ package sync

import (
"fmt"
"time"

"github.com/pingcap/tidb-binlog/drainer/translator"
pb "github.com/pingcap/tipb/go-binlog"

"github.com/pingcap/tidb-binlog/drainer/translator"
)

// Item contains information about binlog
Expand Down Expand Up @@ -64,13 +66,18 @@ type baseSyncer struct {
*baseError
success chan *Item
tableInfoGetter translator.TableInfoGetter
timeZone *time.Location
}

func newBaseSyncer(tableInfoGetter translator.TableInfoGetter) *baseSyncer {
func newBaseSyncer(tableInfoGetter translator.TableInfoGetter, timeZone *time.Location) *baseSyncer {
if timeZone == nil {
timeZone = time.Local
}
return &baseSyncer{
baseError: newBaseError(),
success: make(chan *Item, 8),
tableInfoGetter: tableInfoGetter,
timeZone: timeZone,
}
}

Expand Down
5 changes: 5 additions & 0 deletions drainer/sync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
"github.com/pingcap/check"

"github.com/pingcap/tidb-binlog/drainer/translator"
)

Expand Down Expand Up @@ -62,6 +63,9 @@ func (s *syncerSuite) SetUpTest(c *check.C) {
oldCreateDB := createDB
createDB = func(string, string, string, int, *tls.Config, *string, map[string]string, time.Duration) (db *sql.DB, err error) {
db, s.mysqlMock, err = sqlmock.New()
s.mysqlMock.ExpectQuery("SELECT cast.*").
WillReturnRows(sqlmock.NewRows([]string{"cast(TIMEDIFF(NOW(6), UTC_TIMESTAMP(6)) as time)"}).
AddRow("08:00:00"))
return
}
defer func() {
Expand All @@ -70,6 +74,7 @@ func (s *syncerSuite) SetUpTest(c *check.C) {

mysql, err := NewMysqlSyncer(cfg, infoGetter, 1, 1, nil, nil, "mysql", nil, nil, true, true)
c.Assert(err, check.IsNil)
c.Assert(mysql.timeZone, check.DeepEquals, time.FixedZone("+08:00", 8*60*60))
s.syncers = append(s.syncers, mysql)

// create kafka syncer
Expand Down
2 changes: 1 addition & 1 deletion drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func findLoopBackMark(dmls []*loader.DML, info *loopbacksync.LoopBackSync) (bool
func loopBackStatus(binlog *pb.Binlog, prewriteValue *pb.PrewriteValue, infoGetter translator.TableInfoGetter, info *loopbacksync.LoopBackSync) (bool, error) {
var tableName string
var schemaName string
txn, err := translator.TiBinlogToTxn(infoGetter, schemaName, tableName, binlog, prewriteValue, false)
txn, err := translator.TiBinlogToTxn(infoGetter, schemaName, tableName, binlog, prewriteValue, false, time.Local)
if err != nil {
return false, errors.Trace(err)
}
Expand Down
5 changes: 3 additions & 2 deletions drainer/translator/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import (
"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/pkg/util"
obinlog "github.com/pingcap/tidb-tools/tidb-binlog/proto/go-binlog"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
pb "github.com/pingcap/tipb/go-binlog"
"go.uber.org/zap"

"github.com/pingcap/tidb-binlog/pkg/util"
)

// TiBinlogToSecondaryBinlog translates the format to secondary binlog
Expand Down Expand Up @@ -149,7 +150,7 @@ func genTable(schema string, tableInfo *model.TableInfo) (table *obinlog.Table)
}

func insertRowToRow(ptableInfo, tableInfo *model.TableInfo, raw []byte) (row *obinlog.Row, err error) {
columnValues, err := insertRowToDatums(tableInfo, raw)
columnValues, err := insertRowToDatums(tableInfo, raw, time.Local)
columns := tableInfo.Columns

row = new(obinlog.Row)
Expand Down
Loading

0 comments on commit b8a6395

Please sign in to comment.