Skip to content

Commit

Permalink
<feat> 封装服务发现
Browse files Browse the repository at this point in the history
  • Loading branch information
刘乾洪 committed Apr 21, 2021
1 parent 5310cd7 commit de9ec4b
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 0 deletions.
181 changes: 181 additions & 0 deletions etcd/discovery/discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package discovery

import (
"context"
"fmt"
"strings"
"sync"
"time"

"go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

type Discovery struct {
client *clientv3.Client
leaseGrants map[string]*clientv3.LeaseGrantResponse
leaseGrantMutex sync.Mutex
logger *zap.Logger
closeChan chan struct{}
}

func NewDiscovery(endpoints string, logger *zap.Logger) *Discovery {
client, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(endpoints, ","),
DialTimeout: 5 * time.Second,
})
if err != nil {
panic(err)
}
return &Discovery{
client: client,
leaseGrants: make(map[string]*clientv3.LeaseGrantResponse),
logger: logger,
closeChan: make(chan struct{}),
}
}

func (s *Discovery) Register(key, val string) error {
s.leaseGrantMutex.Lock()
defer s.leaseGrantMutex.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

leaseGrant, err := s.client.Grant(ctx, 5)
if err != nil {
return err
}

_, err = s.client.Put(ctx, key, val, clientv3.WithLease(leaseGrant.ID))
if err != nil {
return err
}

/*keepAliveCtx, keepAliveCancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer keepAliveCancel()*/

leaseKeepActive, err := s.client.KeepAlive(context.Background(), leaseGrant.ID)
if err != nil {
return err
}

s.leaseGrants[key] = leaseGrant
s.logger.Info(fmt.Sprintf("register success: key = %s", key))

s.doAsync(func() {
for {
select {
case _, ok := <-leaseKeepActive:
if !ok {
s.logger.Info("etcd keep alive channel closed")
for {
select {
case <-time.After(time.Second):
s.UnRegister(key)
if err = s.Register(key, val); err == nil {
return
}
case <-s.closeChan:
s.logger.Info("exit watch loop goroutine")
return
}
}
}
}
}
})

return nil
}

func (s *Discovery) UnRegister(key string) error {
s.leaseGrantMutex.Lock()
defer s.leaseGrantMutex.Unlock()

if leaseGrant, ok := s.leaseGrants[key]; ok {
delete(s.leaseGrants, key)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

_, err := s.client.Revoke(ctx, leaseGrant.ID)
if err != nil {
s.logger.Error("revoke error", zap.String("key", key), zap.Error(err))
return err
}
}
return nil
}

func (s *Discovery) Watch(keyPrefix string) {
// 开启监控
s.doAsync(func() {
watchChan := s.client.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
for {
select {
case event, ok := <-watchChan:
if !ok || event.Canceled {
s.logger.Error("[watch] channel closed or canceled", zap.String("key", keyPrefix))
return
}
if event.Created {
continue
}

for _, ev := range event.Events {
s.logger.Info("[watch] receive event", zap.Int("type", int(ev.Type)), zap.String("key", string(ev.Kv.Key)), zap.String("val", string(ev.Kv.Value)))
}
}
}
})

// 获取初始状态
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

rsp, err := s.client.Get(ctx, keyPrefix, clientv3.WithPrefix())
if err != nil {
panic(err)
}

for _, kv := range rsp.Kvs {
s.logger.Info("[watch] get", zap.String("key", string(kv.Key)), zap.String("val", string(kv.Value)))
}
}

func (s *Discovery) Stop() {
s.leaseGrantMutex.Lock()
defer s.leaseGrantMutex.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

for _, leaseGrant := range s.leaseGrants {
_, err := s.client.Revoke(ctx, leaseGrant.ID)
if err != nil {
s.logger.Error("revoke error", zap.Int64("leaseID", int64(leaseGrant.ID)))
}
}
close(s.closeChan)
s.client.Close()
}

func (s *Discovery) doAsync(work func()) {
go func() {
defer func() {
if err := recover(); err != nil {
s.logger.Error("panic happens", zap.Any("err", err))
}
}()
work()
}()
}

func NewZapLogger() *zap.Logger {
logger, err := zap.NewDevelopment()
if err != nil {
panic(err)
}
return logger
}
29 changes: 29 additions & 0 deletions etcd/discovery/discovery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package discovery

import (
"testing"

"go.uber.org/goleak"
)

var (
endpoints = "localhost:12379"
)

func TestDiscovery_Watch(t *testing.T) {
d := NewDiscovery(endpoints, NewZapLogger())

// 服务发现
d.Watch("service")

// 服务注册
if err := d.Register("service/test", "hahahaha"); err != nil {
panic(err)
}

d.Stop()
}

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
11 changes: 11 additions & 0 deletions etcd/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/liuqianhong6007/demo/etcd

go 1.15

require (
go.etcd.io/etcd/client/v3 v3.5.0-alpha.0
go.uber.org/goleak v1.1.10
go.uber.org/zap v1.16.0
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/tools v0.1.0 // indirect
)
28 changes: 28 additions & 0 deletions etcd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package main

import (
"flag"
"time"

"github.com/liuqianhong6007/demo/etcd/discovery"
)

var (
etcdEndpoints = flag.String("etcd_endpoints", "localhost:2379", "etcd endpoints")
)

func main() {
d := discovery.NewDiscovery(*etcdEndpoints, discovery.NewZapLogger())

// 服务发现
d.Watch("service")

// 服务注册
if err := d.Register("service/test", "hahahaha"); err != nil {
panic(err)
}

for {
time.Sleep(time.Minute)
}
}

0 comments on commit de9ec4b

Please sign in to comment.