Skip to content

Commit

Permalink
relay log for oracle (#1104)
Browse files Browse the repository at this point in the history
  • Loading branch information
cartersz authored Dec 27, 2021
1 parent 6c33762 commit 3330cb7
Show file tree
Hide file tree
Showing 10 changed files with 335 additions and 80 deletions.
2 changes: 1 addition & 1 deletion arbiter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func syncBinlogs(ctx context.Context, source <-chan *reader.Message, ld loader.L
}
receivedTs = msg.Binlog.CommitTs

txn, err := loader.SecondaryBinlogToTxn(msg.Binlog)
txn, err := loader.SecondaryBinlogToTxn(msg.Binlog, nil, false)
if err != nil {
log.Error("transfer binlog failed, program will stop handling data from loader", zap.Error(err))
return err
Expand Down
32 changes: 26 additions & 6 deletions drainer/relay.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package drainer

import (
"database/sql"

router "github.com/pingcap/tidb-tools/pkg/table-router"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/drainer/checkpoint"
Expand Down Expand Up @@ -42,8 +46,12 @@ func feedByRelayLogIfNeed(cfg *Config) error {
if err != nil {
return errors.Annotate(err, "failed to create reader")
}

db, err := loader.CreateDBWithSQLMode(scfg.To.User, scfg.To.Password, scfg.To.Host, scfg.To.Port, scfg.To.TLS, scfg.StrSQLMode, scfg.To.Params, scfg.To.ReadTimeout)
var db *sql.DB
if cfg.SyncerCfg.DestDBType == "oracle" {
db, err = loader.CreateOracleDB(cfg.SyncerCfg.To.User, cfg.SyncerCfg.To.Password, scfg.To.Host, scfg.To.Port, cfg.SyncerCfg.To.OracleServiceName, cfg.SyncerCfg.To.OracleConnectString)
} else {
db, err = loader.CreateDBWithSQLMode(scfg.To.User, scfg.To.Password, scfg.To.Host, scfg.To.Port, scfg.To.TLS, scfg.StrSQLMode, scfg.To.Params, scfg.To.ReadTimeout)
}
if err != nil {
return errors.Annotate(err, "failed to create SQL db")
}
Expand All @@ -56,7 +64,7 @@ func feedByRelayLogIfNeed(cfg *Config) error {
return errors.Annotate(err, "failed to create loader")
}

err = feedByRelayLog(reader, ld, cp)
err = feedByRelayLog(reader, ld, cp, cfg)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -65,7 +73,7 @@ func feedByRelayLogIfNeed(cfg *Config) error {
}

// feedByRelayLog will take over the `ld loader.Loader`.
func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint) error {
func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint, cfg *Config) error {
checkpointTS := cp.TS()
lastSuccessTS := checkpointTS
r.Run()
Expand All @@ -88,6 +96,17 @@ func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint)

loaderClosed := false

var tableRouter *router.Table = nil
upperColName := false
var routerErr error
if cfg.SyncerCfg.DestDBType == "oracle" {
upperColName = true
tableRouter, _, routerErr = genRouterAndBinlogEvent(cfg.SyncerCfg)
if routerErr != nil {
return errors.Annotate(routerErr, "when feed by relay log, gen router and filter failed")
}
}

for {
// when reader is drained and all txn has been push into loader
// we close cloader.
Expand All @@ -112,8 +131,9 @@ func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint)
if sbinlog.CommitTs <= checkpointTS {
continue
}

txn, err := loader.SecondaryBinlogToTxn(sbinlog)
var txn *loader.Txn
var err error
txn, err = loader.SecondaryBinlogToTxn(sbinlog, tableRouter, upperColName)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion drainer/relay/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (r *testReaderSuite) readBinlogAndCheck(c *C, dir string, expectedNumber in
number := 0
for txn := range relayReader.Binlogs() {
number++
loaderTxn, err := loader.SecondaryBinlogToTxn(txn)
loaderTxn, err := loader.SecondaryBinlogToTxn(txn, nil, false)
c.Assert(err, IsNil)
lastTxn = loaderTxn
}
Expand Down
70 changes: 69 additions & 1 deletion drainer/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pingcap/check"
"github.com/pingcap/tidb-binlog/drainer/checkpoint"
"github.com/pingcap/tidb-binlog/drainer/relay"
dsync "github.com/pingcap/tidb-binlog/drainer/sync"
"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/pingcap/tidb-binlog/pkg/binlogfile"
"github.com/pingcap/tidb-binlog/pkg/loader"
Expand Down Expand Up @@ -100,7 +101,74 @@ func (s *relaySuite) TestFeedByRealyLog(c *check.C) {
reader, err := relay.NewReader(relayDir, 1)
c.Assert(err, check.IsNil)

err = feedByRelayLog(reader, ld, cp)
cfg := &Config{}
cfg.SyncerCfg = &SyncerConfig{
DestDBType: "tidb",
To: &dsync.DBConfig{
Checkpoint: dsync.CheckpointConfig{
Type: "oracle",
Host: "host-1",
User: "user-1",
Password: "password-1",
OracleServiceName: "oracle-service-name-1",
OracleConnectString: "oracle-connect-string-1",
Table: "new_checkpoint_table",
},
},
}
err = feedByRelayLog(reader, ld, cp, cfg)
c.Assert(err, check.IsNil)

ts := cp.TS()
c.Assert(ts, check.Equals, int64(90) /* latest commit ts */)
c.Assert(cp.IsConsistent(), check.Equals, true)
}

func (s *relaySuite) TestFeedByRealyLogForOracle(c *check.C) {
cp, err := checkpoint.NewFile(0 /* initialCommitTS */, c.MkDir()+"/checkpoint")
c.Assert(err, check.IsNil)
err = cp.Save(0, 0, false, 0)
c.Assert(err, check.IsNil)
c.Assert(cp.IsConsistent(), check.Equals, false)

ld := newNoOpLoader()

// write some relay log
gen := &translator.BinlogGenerator{}
relayDir := c.MkDir()
relayer, err := relay.NewRelayer(relayDir, binlogfile.SegmentSizeBytes, gen)
c.Assert(err, check.IsNil)

for i := 0; i < 10; i++ {
gen.SetInsert(c)
gen.TiBinlog.StartTs = int64(i)
gen.TiBinlog.CommitTs = int64(i) * 10
_, err = relayer.WriteBinlog(gen.Schema, gen.Table, gen.TiBinlog, gen.PV)
c.Assert(err, check.IsNil)
}

relayer.Close()
c.Assert(err, check.IsNil)

reader, err := relay.NewReader(relayDir, 1)
c.Assert(err, check.IsNil)

cfg := &Config{}
cfg.SyncerCfg = &SyncerConfig{
DestDBType: "oracle",
To: &dsync.DBConfig{
Checkpoint: dsync.CheckpointConfig{
Type: "oracle",
Host: "host-1",
User: "user-1",
Password: "password-1",
OracleServiceName: "oracle-service-name-1",
OracleConnectString: "oracle-connect-string-1",
Table: "new_checkpoint_table",
},
},
}
err = feedByRelayLog(reader, ld, cp, cfg)
c.Assert(err, check.IsNil)

ts := cp.TS()
Expand Down
2 changes: 2 additions & 0 deletions drainer/translator/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func genTable(schema string, tableInfo *model.TableInfo) (table *obinlog.Table)
info := new(obinlog.ColumnInfo)
info.Name = col.Name.O
info.MysqlType = types.TypeToStr(col.Tp, col.Charset)
info.Flen = int32(col.Flen)
info.Decimal = int32(col.Decimal)
info.IsPrimaryKey = mysql.HasPriKeyFlag(col.Flag)
columnInfos = append(columnInfos, info)
}
Expand Down
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ require (
github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63
github.com/pingcap/errors v0.11.5-0.20211009033009-93128226aaa3
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect
github.com/pingcap/kvproto v0.0.0-20211011042309-a4518fcacbc8
github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f
github.com/pingcap/log v0.0.0-20210906054005-afc726e70354
github.com/pingcap/tidb v1.1.0-beta.0.20211026030648-c497d5c06348
github.com/pingcap/tidb v1.1.0-beta.0.20211224065547-93d28b9d3ffe
github.com/pingcap/tidb-tools v5.2.3-0.20211101071251-40e8f0cfcb1d+incompatible
github.com/pingcap/tidb/parser v0.0.0-20211026030648-c497d5c06348
github.com/pingcap/tipb v0.0.0-20211026080602-ec68283c1735
github.com/pingcap/tidb/parser v0.0.0-20211224065547-93d28b9d3ffe
github.com/pingcap/tipb v0.0.0-20211201080053-bd104bb270ba
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.26.0 // indirect
Expand All @@ -40,8 +40,8 @@ require (
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.2.1 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211028082558-c4250227823e
github.com/tikv/pd v1.1.0-beta.0.20211027071649-433d4f2847be
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211223062159-300275dee63e
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee
github.com/unrolled/render v1.0.1
go.etcd.io/etcd v0.5.0-alpha.5.0.20210512015243-d19fbe541bf9
go.uber.org/zap v1.19.1
Expand Down
Loading

0 comments on commit 3330cb7

Please sign in to comment.