Skip to content

Commit

Permalink
dependency: update binlog parser/tidb/etcd to release-6.0 (#1146)
Browse files Browse the repository at this point in the history
ref #1145
  • Loading branch information
lichunzhu authored Mar 18, 2022
1 parent 6d826a7 commit efb767b
Show file tree
Hide file tree
Showing 18 changed files with 478 additions and 627 deletions.
5 changes: 3 additions & 2 deletions arbiter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -70,8 +71,8 @@ var (
var Registry = prometheus.NewRegistry()

func init() {
Registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
Registry.MustRegister(prometheus.NewGoCollector())
Registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
Registry.MustRegister(collectors.NewGoCollector())

Registry.MustRegister(checkpointTSOGauge)
Registry.MustRegister(queryHistogramVec)
Expand Down
5 changes: 4 additions & 1 deletion binlogctl/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"go.etcd.io/etcd/tests/v3/integration"

"github.com/pingcap/tidb-binlog/pkg/etcd"
"github.com/pingcap/tidb-binlog/pkg/node"
"go.etcd.io/etcd/integration"
)

func Test(t *testing.T) {
integration.BeforeTest(t)

testEtcdCluster = integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer testEtcdCluster.Terminate(t)

Expand Down
3 changes: 2 additions & 1 deletion drainer/checkpoint/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"

"github.com/pingcap/tidb-binlog/pkg/util"
)

Expand Down Expand Up @@ -74,7 +75,7 @@ func (sp *FileCheckPoint) Load() error {
}
defer file.Close()

_, err = toml.DecodeReader(file, sp)
_, err = toml.NewDecoder(file).Decode(sp)
if err != nil {
return errors.Trace(err)
}
Expand Down
9 changes: 6 additions & 3 deletions drainer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,23 @@ import (
"github.com/Shopify/sarama"
"github.com/pingcap/check"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/parser/mysql"
"github.com/samuel/go-zookeeper/zk"
"go.etcd.io/etcd/tests/v3/integration"

dsync "github.com/pingcap/tidb-binlog/drainer/sync"
"github.com/pingcap/tidb-binlog/pkg/encrypt"
"github.com/pingcap/tidb-binlog/pkg/filter"
"github.com/pingcap/tidb-binlog/pkg/util"
pkgzk "github.com/pingcap/tidb-binlog/pkg/zk"
"github.com/pingcap/tidb/parser/mysql"
"github.com/samuel/go-zookeeper/zk"
"go.etcd.io/etcd/integration"
)

var testEtcdCluster *integration.ClusterV3

// Hook up gocheck into the "go test" runner.
func Test(t *testing.T) {
integration.BeforeTest(t)

testEtcdCluster = integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer testEtcdCluster.Terminate(t)
TestingT(t)
Expand Down
8 changes: 5 additions & 3 deletions drainer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
package drainer

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"

"github.com/pingcap/tidb-binlog/drainer/sync"
bf "github.com/pingcap/tidb-binlog/pkg/binlogfile"
"github.com/prometheus/client_golang/prometheus"
)

var (
Expand Down Expand Up @@ -127,8 +129,8 @@ var registry = prometheus.NewRegistry()
func init() {
sync.QueueSizeGauge = queueSizeGauge

registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
registry.MustRegister(prometheus.NewGoCollector())
registry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
registry.MustRegister(collectors.NewGoCollector())
registry.MustRegister(pumpPositionGauge)
registry.MustRegister(ddlJobsCounter)
registry.MustRegister(errorCount)
Expand Down
12 changes: 8 additions & 4 deletions drainer/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@ import (
"github.com/dustin/go-humanize"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/pkg/util"
"github.com/pingcap/tidb-binlog/pump"
pb "github.com/pingcap/tipb/go-binlog"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

"github.com/pingcap/tidb-binlog/pkg/util"
"github.com/pingcap/tidb-binlog/pump"
)

const (
Expand Down Expand Up @@ -209,11 +211,13 @@ func (p *Pump) createPullBinlogsClient(ctx context.Context, last int64) error {
}

dialOpts := []grpc.DialOption{grpc.WithDefaultCallOptions(callOpts...)}
var tlsCredential credentials.TransportCredentials
if p.tlsConfig != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(p.tlsConfig)))
tlsCredential = credentials.NewTLS(p.tlsConfig)
} else {
dialOpts = append(dialOpts, grpc.WithInsecure())
tlsCredential = insecure.NewCredentials()
}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(tlsCredential))

conn, err := grpc.Dial(p.addr, dialOpts...)
if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions drainer/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ import (
"net/http/httptest"
"os"
"path"
"testing"
"time"

"github.com/gorilla/mux"
. "github.com/pingcap/check"
pd "github.com/tikv/pd/client"
"golang.org/x/net/context"
"google.golang.org/grpc"

"github.com/pingcap/tidb-binlog/pkg/etcd"
"github.com/pingcap/tidb-binlog/pkg/node"
"github.com/pingcap/tidb-binlog/pkg/security"
"github.com/pingcap/tidb-binlog/pkg/util"
pd "github.com/tikv/pd/client"
"golang.org/x/net/context"
"google.golang.org/grpc"
)

var _ = Suite(&testServerSuite{})
var _ = SerialSuites(&testServerSuite{})

type testServerSuite struct{}

Expand Down Expand Up @@ -111,7 +111,7 @@ func (t *testServerSuite) TestClose(c *C) {
c.Assert(server.isClosed, Equals, int32(1))
}

func TestCommitStatus(t *testing.T) {
func (t *testServerSuite) TestCommitStatus(c *C) {
cli := etcd.NewClient(testEtcdCluster.RandClient(), node.DefaultRootPath)
reg := node.NewEtcdRegistry(cli, 5*time.Second)

Expand All @@ -134,10 +134,10 @@ func TestCommitStatus(t *testing.T) {
}
s.commitStatus()
if s.status.State != cs.expected {
t.Fatalf("Invalid state after commit: expect %s, get %s", cs.expected, s.status.State)
c.Fatalf("Invalid state after commit: expect %s, get %s", cs.expected, s.status.State)
}
if s.status.MaxCommitTS != cs.ts {
t.Fatalf("MaxCommitTS not updated correctly: expect %d, get %d", cs.ts, s.status.MaxCommitTS)
c.Fatalf("MaxCommitTS not updated correctly: expect %d, get %d", cs.ts, s.status.MaxCommitTS)
}
}
}
Expand Down
Loading

0 comments on commit efb767b

Please sign in to comment.