Skip to content

Commit

Permalink
pump: implement gRPC server and make code more solid. (#9)
Browse files Browse the repository at this point in the history
* drainer: init new cmd

* pump: register node to etcd and refresh state by heartbeat.

* pump: implement grpc methods.

* *: some clean up.

* *: improve Makefile and make vet happy

* *: use `glide` to manage vendor packages.

* *: remove blank lines.

* *: address comment

* vendor: remove vendor temporarily in this PR

* node: fix a type bug.
  • Loading branch information
iamxy authored Sep 26, 2016
1 parent 3ba63de commit aa9bf27
Show file tree
Hide file tree
Showing 22 changed files with 625 additions and 252 deletions.
62 changes: 38 additions & 24 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
### Makefile for tidb-binlog


# Ensure GOPATH is set before running build process.
ifeq "$(GOPATH)" ""
$(error Please set the environment variable GOPATH before running `make`)
endif

CURDIR := $(shell pwd)
export GOPATH := $(CURDIR)/_vendor:$(GOPATH)
path_to_add := $(addsuffix /bin,$(subst :,/bin:,$(GOPATH)))
export PATH := $(path_to_add):$(PATH)

ARCH := "`uname -s`"
LINUX := "Linux"
MAC := "Darwin"
Expand All @@ -7,71 +20,72 @@ FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor')
LDFLAGS += -X "github.com/pingcap/tidb-binlog/pump.BuildTS=$(shell date -u '+%Y-%m-%d %I:%M:%S')"
LDFLAGS += -X "github.com/pingcap/tidb-binlog/pump.GitSHA=$(shell git rev-parse HEAD)"

default: build
default: build buildsucc

buildsucc:
@echo Build TiDB Binlog Utils successfully!

all: dev install

dev: build check test

build: pump server
build: pump server drainer

proto/pump.pb.go: proto/pump.proto
sh proto/generate.sh

pump: proto/pump.pb.go
rm -rf vendor && ln -s _vendor/vendor vendor
GO15VENDOREXPERIMENT=1 go build -ldflags '$(LDFLAGS)' -o bin/pump cmd/pump/main.go
rm -rf vendor

server: proto/pump.pb.go
rm -rf vendor && ln -s _vendor/vendor vendor
GO15VENDOREXPERIMENT=1 go build -ldflags '$(LDFLAGS)' -o bin/binlog-server cmd/binlog-server/main.go
rm -rf vendor

drainer:
GO15VENDOREXPERIMENT=1 go build -ldflags '$(LDFLAGS)' -o bin/drainer cmd/drainer/main.go

proto: proto/pump.pb.go

install:
rm -rf vendor && ln -s _vendor/vendor vendor
go install ./...
rm -rf vendor

test:
rm -rf vendor && ln -s _vendor/vendor vendor
@export log_level=error;\
go test -cover $(PACKAGES)
rm -rf vendor
GO15VENDOREXPERIMENT=1 go test -cover $(PACKAGES)

fmt:
go fmt ./...
@goimports -w $(FILES)

check:
bash gitcookie.sh
go get github.com/golang/lint/golint
go tool vet . 2>&1 | grep -vE 'vendor|render.Delims' | awk '{print} END{if(NR>0) {exit 1}}'
go tool vet --shadow . 2>&1 | grep -vE 'vendor' | awk '{print} END{if(NR>0) {exit 1}}'
golint ./... 2>&1 | grep -vE 'vendor' | awk '{print} END{if(NR>0) {exit 1}}'
gofmt -s -l . 2>&1 | grep -vE 'vendor' | awk '{print} END{if(NR>0) {exit 1}}'
@echo "vet"
@ go tool vet $(FILES) 2>&1 | awk '{print} END{if(NR>0) {exit 1}}'
@echo "vet --shadow"
@ go tool vet --shadow $(FILES) 2>&1 | awk '{print} END{if(NR>0) {exit 1}}'
@echo "golint"
@ golint ./... 2>&1 | grep -vE '\.pb\.go' | awk '{print} END{if(NR>0) {exit 1}}'
@echo "gofmt (simplify)"
@ gofmt -s -l -w $(FILES) 2>&1 | awk '{print} END{if(NR>0) {exit 1}}'

update:
which glide >/dev/null || curl https://glide.sh/get | sh
which glide-vc || go get -v -u github.com/sgotti/glide-vc
rm -r vendor && mv _vendor/vendor vendor || true
rm -r vendor && mv _vendor/src vendor || true
rm -rf _vendor
ifdef PKG
glide --verbose get --strip-vendor --skip-test ${PKG}
glide get -s -v --skip-test ${PKG}
else
glide --verbose update --strip-vendor --skip-test
glide update -s -v --skip-test
endif
@echo "removing test files"
glide vc --only-code --no-tests
mkdir -p _vendor
mv vendor _vendor/vendor
mv vendor _vendor/src

clean:
find . -type s -exec rm {} \;
rm -rf vendor && ln -s _vendor/vendor vendor
go clean ./...
rm -rf vendor
go clean -i ./...
rm -rf *.out

.PHONY: build test check update clean pump server fmt proto
.PHONY: build test check update clean pump server drainer fmt proto

Empty file added cmd/drainer/README.md
Empty file.
1 change: 1 addition & 0 deletions cmd/drainer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package drainer
2 changes: 1 addition & 1 deletion cmd/pump/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func main() {
os.Exit(2)
}

pump.InitLogger(cfg)
pump.InitLogger(cfg.Debug)
pump.PrintVersionInfo()

sc := make(chan os.Signal, 1)
Expand Down
10 changes: 10 additions & 0 deletions gitcookie.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/usr/bin/env bash
touch ~/.gitcookies
chmod 0600 ~/.gitcookies

git config --global http.cookiefile ~/.gitcookies

tr , \\t <<\__END__ >>~/.gitcookies
go.googlesource.com,FALSE,/,TRUE,2147483647,o,git-shenli.pingcap.com=1/rGvVlvFq_x9rxOmXqQe_rfcrjbOk6NSOHIQKhhsfidM
go-review.googlesource.com,FALSE,/,TRUE,2147483647,o,git-shenli.pingcap.com=1/rGvVlvFq_x9rxOmXqQe_rfcrjbOk6NSOHIQKhhsfidM
__END__
57 changes: 57 additions & 0 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package: github.com/pingcap/tidb-binlog
import:
- package: github.com/coreos/etcd
version: ^3.1.0-alpha.0
subpackages:
- clientv3
- package: github.com/ghodss/yaml
- package: github.com/golang/protobuf
subpackages:
- proto
- package: github.com/jonboulle/clockwork
version: ^0.1.0
- package: github.com/juju/errors
- package: github.com/ngaut/log
- package: github.com/twinj/uuid
version: ^0.10.0
- package: golang.org/x/net
subpackages:
- context
- package: google.golang.org/grpc
version: ^1.0.1-GA
testImport:
- package: github.com/pingcap/check
73 changes: 56 additions & 17 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@ package etcd
import (
"path"
"strings"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/juju/errors"
"golang.org/x/net/context"
)

const (
// DefaultRootPath is the root path of the keys stored in etcd
DefaultRootPath = "tidb-binlog"
)

// Node organize the ectd query result as a Trie tree
type Node struct {
Value []byte
Expand All @@ -17,23 +23,39 @@ type Node struct {

// Client is a wrapped etcd client that support some simple method
type Client struct {
client *clientv3.Client
pathPrefix string
client *clientv3.Client
rootPath string
}

// NewClient return an EtcdClient obj
func NewClient(client *clientv3.Client, pathPrefix string) *Client {
// NewClient return a wrapped etcd client
func NewClient(cli *clientv3.Client, root string) *Client {
return &Client{
client: client,
pathPrefix: pathPrefix,
client: cli,
rootPath: root,
}
}

// NewClientFromCfg return a wrapped etcd client
func NewClientFromCfg(endpoints []string, dialTimeout time.Duration, root string) (*Client, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
return nil, errors.Trace(err)
}

return &Client{
client: cli,
rootPath: root,
}, nil
}

// Create guarantees to set a key = value with some options(like ttl)
func (e *Client) Create(ctx context.Context, key string, val string, opts []clientv3.OpOption) error {
key = keyWithPrefix(e.pathPrefix, key)
key = keyWithPrefix(e.rootPath, key)
txnResp, err := e.client.KV.Txn(ctx).If(
notFound(key),
clientv3.Compare(clientv3.ModRevision(key), "=", 0),
).Then(
clientv3.OpPut(key, val, opts...),
).Commit()
Expand All @@ -42,15 +64,15 @@ func (e *Client) Create(ctx context.Context, key string, val string, opts []clie
}

if !txnResp.Succeeded {
return errors.AlreadyExistsf("key %s is not found in etcd", key)
return errors.AlreadyExistsf("key %s in etcd", key)
}

return nil
}

// Get return a key/value matchs the given key
func (e *Client) Get(ctx context.Context, key string) ([]byte, error) {
key = keyWithPrefix(e.pathPrefix, key)
key = keyWithPrefix(e.rootPath, key)
resp, err := e.client.KV.Get(ctx, key)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -66,7 +88,7 @@ func (e *Client) Get(ctx context.Context, key string) ([]byte, error) {
// Update updates a key/value.
// set ttl 0 to disable the Lease ttl feature
func (e *Client) Update(ctx context.Context, key string, val string, ttl int64) error {
key = keyWithPrefix(e.pathPrefix, key)
key = keyWithPrefix(e.rootPath, key)

var opts []clientv3.OpOption
if ttl > 0 {
Expand All @@ -88,15 +110,36 @@ func (e *Client) Update(ctx context.Context, key string, val string, ttl int64)
}

if !txnResp.Succeeded {
return errors.NotFoundf("key %s is not found in etcd", key)
return errors.NotFoundf("key %s in etcd", key)
}

return nil
}

// UpdateOrCreate updates a key/value, if the key does not exist then create, or update
func (e *Client) UpdateOrCreate(ctx context.Context, key string, val string, ttl int64) error {
key = keyWithPrefix(e.rootPath, key)

var opts []clientv3.OpOption
if ttl > 0 {
lcr, err := e.client.Lease.Grant(ctx, ttl)
if err != nil {
return errors.Trace(err)
}

opts = []clientv3.OpOption{clientv3.WithLease(lcr.ID)}
}

_, err := e.client.KV.Do(ctx, clientv3.OpPut(key, val, opts...))
if err != nil {
return errors.Trace(err)
}
return nil
}

// List return the trie struct that constructed by the key/value with same prefix
func (e *Client) List(ctx context.Context, key string) (*Node, error) {
key = keyWithPrefix(e.pathPrefix, key)
key = keyWithPrefix(e.rootPath, key)
if !strings.HasSuffix(key, "/") {
key += "/"
}
Expand Down Expand Up @@ -145,10 +188,6 @@ func parseToDirTree(root *Node, path string) *Node {
return current
}

func notFound(key string) clientv3.Cmp {
return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
}

func keyWithPrefix(prefix, key string) string {
if strings.HasPrefix(key, prefix) {
return key
Expand Down
3 changes: 1 addition & 2 deletions pkg/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,5 @@ func TestList(t *testing.T) {
func testSetup(t *testing.T) (context.Context, *Client, *integration.ClusterV3) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
etcd := NewClient(cluster.RandClient(), "binlog")
ctx := context.Background()
return ctx, etcd, cluster
return context.Background(), etcd, cluster
}
1 change: 0 additions & 1 deletion pkg/file/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
. "github.com/pingcap/check"
)


var _ = Suite(&testLockSuite{})

type testLockSuite struct{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/flags/flag.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func URLsFromFlag(fs *flag.FlagSet, urlsFlagName string) []url.URL {
return fs.Lookup(urlsFlagName).Value.(*URLsValue).URLSlice()
}

// URLsFromFlag returns a string slices from url got from the flag.
// URLStrsFromFlag returns a string slices from url got from the flag.
func URLStrsFromFlag(fs *flag.FlagSet, urlsFlagName string) []string {
return fs.Lookup(urlsFlagName).Value.(*URLsValue).StringSlice()
}
Loading

0 comments on commit aa9bf27

Please sign in to comment.