Skip to content

Commit

Permalink
refactor registry-zookeeper
Browse files Browse the repository at this point in the history
  • Loading branch information
day253 committed Oct 19, 2024
1 parent 562cfb2 commit 03b6842
Show file tree
Hide file tree
Showing 14 changed files with 49 additions and 59 deletions.
6 changes: 3 additions & 3 deletions boilerplate/pkg/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/ishumei/krpc/objects"
"github.com/ishumei/krpc/protocols/arbiter/kitex_gen/com/shumei/service"
"github.com/ishumei/krpc/protocols/arbiter/kitex_gen/com/shumei/service/predictor"
"github.com/ishumei/krpc/registry-zookeeper/registry"
registry_zookeeper "github.com/ishumei/krpc/registry-zookeeper"
"github.com/jncornett/doublebuf"
"github.com/samber/do"
"github.com/samber/lo"
Expand Down Expand Up @@ -108,11 +108,11 @@ func (s *mirrorClients) Predict(ctx context.Context, request *service.PredictReq
}

func MustNew() *mirrorClients {
zkConn := do.MustInvoke[*registry.ZookeeperRegistry](sconfig.Injector)
zkConn := do.MustInvoke[*registry_zookeeper.ZookeeperRegistry](sconfig.Injector)
sConf := do.MustInvoke[*sconfig.FrameConfig](sconfig.Injector)
childNodes, _, err := zkConn.Children(sConf.ServiceName)
lo.Must0(err)
localIp, err := registry.GetLocalIp("")
localIp, err := registry_zookeeper.GetLocalIp("")
lo.Must0(err)
localIpPort := fmt.Sprintf("%s:%d", localIp, sConf.Port)
klog.Info("selfIpPort: ", localIpPort)
Expand Down
4 changes: 2 additions & 2 deletions boilerplate/pkg/strategies/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/ishumei/krpc/boilerplate/pkg/models"
"github.com/ishumei/krpc/kserver/sconfig"
"github.com/ishumei/krpc/objects"
"github.com/ishumei/krpc/registry-zookeeper/resolver"
registry_zookeeper "github.com/ishumei/krpc/registry-zookeeper"
"github.com/samber/do"
)

Expand All @@ -17,7 +17,7 @@ func Init() error {
if err != nil {
return err
}
zookeeperConn, err := do.Invoke[*resolver.ZookeeperResolver](sconfig.Injector)
zookeeperConn, err := do.Invoke[*registry_zookeeper.ZookeeperResolver](sconfig.Injector)
if err != nil {
return err
}
Expand Down
5 changes: 2 additions & 3 deletions kclient/kclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/cloudwego/kitex/pkg/klog"
"github.com/ishumei/krpc/logging"
registry_zookeeper "github.com/ishumei/krpc/registry-zookeeper"
"github.com/ishumei/krpc/registry-zookeeper/resolver"
"github.com/kitex-contrib/obs-opentelemetry/provider"
"github.com/samber/do"
)
Expand Down Expand Up @@ -43,12 +42,12 @@ func MustNewKclient(c *SingleClientConf) *Kclient {
do.Override(Injector, func(i *do.Injector) (discovery.Resolver, error) {
logger, err := do.Invoke[*logging.Logger](logging.Injector)
if err == nil {
return resolver.NewZookeeperResolverWithConf(
return registry_zookeeper.NewZookeeperResolverWithConf(
c.ResolverConf.Resolver,
registry_zookeeper.WithLogger(logger),
)
} else {
return resolver.NewZookeeperResolverWithConf(
return registry_zookeeper.NewZookeeperResolverWithConf(
c.ResolverConf.Resolver,
)
}
Expand Down
5 changes: 2 additions & 3 deletions kclient/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/ishumei/krpc/logging"
"github.com/ishumei/krpc/objects"
registry_zookeeper "github.com/ishumei/krpc/registry-zookeeper"
"github.com/ishumei/krpc/registry-zookeeper/resolver"
"github.com/samber/do"
)

Expand All @@ -28,12 +27,12 @@ func InjectClientFromMultiClientConf(c *MultiClientConf) {
do.Override(Injector, func(i *do.Injector) (discovery.Resolver, error) {
logger, err := do.Invoke[*logging.Logger](logging.Injector)
if err == nil {
return resolver.NewZookeeperResolverWithConf(
return registry_zookeeper.NewZookeeperResolverWithConf(
c.ResolverConf.Resolver,
registry_zookeeper.WithLogger(logger),
)
} else {
return resolver.NewZookeeperResolverWithConf(
return registry_zookeeper.NewZookeeperResolverWithConf(
c.ResolverConf.Resolver,
)
}
Expand Down
7 changes: 3 additions & 4 deletions kserver/governance/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,21 @@ import (
"github.com/ishumei/krpc/kserver/sconfig"
"github.com/ishumei/krpc/logging"
registry_zookeeper "github.com/ishumei/krpc/registry-zookeeper"
zkregistry "github.com/ishumei/krpc/registry-zookeeper/registry"
"github.com/samber/do"
)

func init() {
do.Provide(sconfig.Injector, func(i *do.Injector) (*zkregistry.ZookeeperRegistry, error) {
do.Provide(sconfig.Injector, func(i *do.Injector) (*registry_zookeeper.ZookeeperRegistry, error) {
f := do.MustInvoke[*sconfig.FrameConfig](sconfig.Injector)
c := f.Registry
logger := do.MustInvoke[*logging.Logger](logging.Injector)
return zkregistry.NewZookeeperRegistryWithConf(
return registry_zookeeper.NewZookeeperRegistryWithConf(
c,
f.Addr,
registry_zookeeper.WithLogger(logger),
)
})
do.Provide(sconfig.Injector, func(i *do.Injector) (registry.Registry, error) {
return do.Invoke[*zkregistry.ZookeeperRegistry](sconfig.Injector)
return do.Invoke[*registry_zookeeper.ZookeeperRegistry](sconfig.Injector)
})
}
7 changes: 3 additions & 4 deletions kserver/governance/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,19 @@ import (
"github.com/ishumei/krpc/kserver/sconfig"
"github.com/ishumei/krpc/logging"
registry_zookeeper "github.com/ishumei/krpc/registry-zookeeper"
"github.com/ishumei/krpc/registry-zookeeper/resolver"
"github.com/samber/do"
)

func init() {
do.Provide(sconfig.Injector, func(i *do.Injector) (*resolver.ZookeeperResolver, error) {
do.Provide(sconfig.Injector, func(i *do.Injector) (*registry_zookeeper.ZookeeperResolver, error) {
c := do.MustInvoke[*sconfig.FrameConfig](sconfig.Injector).Registry
logger := do.MustInvoke[*logging.Logger](logging.Injector)
return resolver.NewZookeeperResolverWithConf(
return registry_zookeeper.NewZookeeperResolverWithConf(
c,
registry_zookeeper.WithLogger(logger),
)
})
do.Provide(sconfig.Injector, func(i *do.Injector) (discovery.Resolver, error) {
return do.Invoke[*resolver.ZookeeperResolver](sconfig.Injector)
return do.Invoke[*registry_zookeeper.ZookeeperResolver](sconfig.Injector)
})
}
4 changes: 2 additions & 2 deletions kserver/grace/grace.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/cloudwego/kitex/pkg/klog"
"github.com/ishumei/krpc/kserver/sconfig"
"github.com/ishumei/krpc/registry-zookeeper/registry"
registry_zookeeper "github.com/ishumei/krpc/registry-zookeeper"
"github.com/samber/do"
)

Expand All @@ -27,7 +27,7 @@ func DefaultDeregisterSignal() {
sig := DeregisterSignal()
defer signal.Stop(sig)
for range sig {
err := do.MustInvoke[*registry.ZookeeperRegistry](sconfig.Injector).Deregister(nil)
err := do.MustInvoke[*registry_zookeeper.ZookeeperRegistry](sconfig.Injector).Deregister(nil)
klog.Info("deregister service", err)
}
<-sig
Expand Down
11 changes: 5 additions & 6 deletions registry-zookeeper/example/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import (

"github.com/cloudwego/kitex/pkg/registry"
"github.com/cloudwego/kitex/pkg/rpcinfo"
zkregistry "github.com/ishumei/krpc/registry-zookeeper/registry"
"github.com/ishumei/krpc/registry-zookeeper/resolver"
registry_zookeeper "github.com/ishumei/krpc/registry-zookeeper"
"github.com/stretchr/testify/assert"
)

Expand All @@ -32,15 +31,15 @@ func TestZookeeperDiscovery(t *testing.T) {
targetPort := testRegisterPort()

// register
r, err := zkregistry.NewZookeeperRegistry(testZkServers, 40*time.Second, "")
r, err := registry_zookeeper.NewZookeeperRegistry(testZkServers, 40*time.Second, "")
assert.Nil(t, err)
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf(":%d", targetPort))
assert.Nil(t, err)
info := &registry.Info{ServiceName: testRegisterPath, Weight: 100, PayloadCodec: "thrift", Addr: addr}
assert.Nil(t, r.Register(info))

// resolve
res, err := resolver.NewZookeeperResolver(testZkServers, 40*time.Second)
res, err := registry_zookeeper.NewZookeeperResolver(testZkServers, 40*time.Second)
assert.Nil(t, err)
target := res.Target(context.Background(), rpcinfo.NewEndpointInfo(testRegisterPath, "", nil, nil))
result, err := res.Resolve(context.Background(), target)
Expand Down Expand Up @@ -68,15 +67,15 @@ func TestZookeeperResolverWithAuth(t *testing.T) {
targetPort := testRegisterPort()

// register
r, err := zkregistry.NewZookeeperRegistryWithAuth(testZkServers, 40*time.Second, "zkadmin", "zkadmin123", "")
r, err := registry_zookeeper.NewZookeeperRegistryWithAuth(testZkServers, 40*time.Second, "zkadmin", "zkadmin123", "")
assert.Nil(t, err)
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf(":%d", targetPort))
assert.Nil(t, err)
info := &registry.Info{ServiceName: testRegisterPath, Weight: 100, PayloadCodec: "thrift", Addr: addr}
assert.Nil(t, r.Register(info))

// resolve
res, err := resolver.NewZookeeperResolverWithAuth(testZkServers, 40*time.Second, "zkadmin", "zkadmin123")
res, err := registry_zookeeper.NewZookeeperResolverWithAuth(testZkServers, 40*time.Second, "zkadmin", "zkadmin123")
assert.Nil(t, err)
target := res.Target(context.Background(), rpcinfo.NewEndpointInfo(testRegisterPath, "", nil, nil))
result, err := res.Resolve(context.Background(), target)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package registry
package registry_zookeeper

import (
"errors"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package registry
package registry_zookeeper

import (
"net"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package registry
package registry_zookeeper

import (
"context"
Expand All @@ -12,7 +12,6 @@ import (
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/registry"
"github.com/go-zookeeper/zk"
registry_zookeeper "github.com/ishumei/krpc/registry-zookeeper"
"go.uber.org/multierr"
)

Expand All @@ -28,7 +27,7 @@ type meta struct {
}

type ZookeeperRegistry struct {
*registry_zookeeper.ZookeeperParams
*ZookeeperParams
sync.RWMutex
inetPrefix string
metas map[string]*meta
Expand All @@ -44,9 +43,9 @@ var (
)

// NewZookeeperRegistryWithConf
func NewZookeeperRegistryWithConf(c registry_zookeeper.Conf, inetPrefix string, options ...registry_zookeeper.Option) (*ZookeeperRegistry, error) {
func NewZookeeperRegistryWithConf(c Conf, inetPrefix string, options ...Option) (*ZookeeperRegistry, error) {
return NewZookeeperRegistryWithAuth(
strings.Split(c.Metabase, registry_zookeeper.DefaultRegistrySeparater),
strings.Split(c.Metabase, DefaultRegistrySeparater),
time.Duration(c.TimeoutMs)*time.Millisecond,
c.User,
c.Password,
Expand All @@ -55,12 +54,12 @@ func NewZookeeperRegistryWithConf(c registry_zookeeper.Conf, inetPrefix string,
)
}

func NewZookeeperRegistry(servers []string, sessionTimeout time.Duration, inetPrefix string, options ...registry_zookeeper.Option) (*ZookeeperRegistry, error) {
func NewZookeeperRegistry(servers []string, sessionTimeout time.Duration, inetPrefix string, options ...Option) (*ZookeeperRegistry, error) {
return NewZookeeperRegistryWithAuth(servers, sessionTimeout, "", "", inetPrefix, options...)
}

func NewZookeeperRegistryWithAuth(servers []string, sessionTimeout time.Duration, user, password string, inetPrefix string, options ...registry_zookeeper.Option) (*ZookeeperRegistry, error) {
p := registry_zookeeper.NewZookeeperParams(servers, sessionTimeout, user, password)
func NewZookeeperRegistryWithAuth(servers []string, sessionTimeout time.Duration, user, password string, inetPrefix string, options ...Option) (*ZookeeperRegistry, error) {
p := NewZookeeperParams(servers, sessionTimeout, user, password)
for _, option := range options {
option(p)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package resolver
package registry_zookeeper

import (
"context"
Expand All @@ -11,17 +11,16 @@ import (
"github.com/cloudwego/kitex/pkg/discovery"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/go-zookeeper/zk"
registry_zookeeper "github.com/ishumei/krpc/registry-zookeeper"
)

type ZookeeperResolver struct {
*zk.Conn
}

// NewZookeeperResolverWithConf
func NewZookeeperResolverWithConf(c registry_zookeeper.Conf, options ...registry_zookeeper.Option) (*ZookeeperResolver, error) {
func NewZookeeperResolverWithConf(c Conf, options ...Option) (*ZookeeperResolver, error) {
return NewZookeeperResolverWithAuth(
strings.Split(c.Metabase, registry_zookeeper.DefaultRegistrySeparater),
strings.Split(c.Metabase, DefaultRegistrySeparater),
time.Duration(c.TimeoutMs)*time.Millisecond,
c.User,
c.Password,
Expand All @@ -30,13 +29,13 @@ func NewZookeeperResolverWithConf(c registry_zookeeper.Conf, options ...registry
}

// NewZookeeperResolver create a zookeeper based resolver
func NewZookeeperResolver(servers []string, sessionTimeout time.Duration, options ...registry_zookeeper.Option) (*ZookeeperResolver, error) {
func NewZookeeperResolver(servers []string, sessionTimeout time.Duration, options ...Option) (*ZookeeperResolver, error) {
return NewZookeeperResolverWithAuth(servers, sessionTimeout, "", "", options...)
}

// NewZookeeperResolver create a zookeeper based resolver with auth
func NewZookeeperResolverWithAuth(servers []string, sessionTimeout time.Duration, user, password string, options ...registry_zookeeper.Option) (*ZookeeperResolver, error) {
p := registry_zookeeper.NewZookeeperParams(servers, sessionTimeout, user, password)
func NewZookeeperResolverWithAuth(servers []string, sessionTimeout time.Duration, user, password string, options ...Option) (*ZookeeperResolver, error) {
p := NewZookeeperParams(servers, sessionTimeout, user, password)
for _, option := range options {
option(p)
}
Expand All @@ -57,8 +56,8 @@ func (r *ZookeeperResolver) Target(ctx context.Context, target rpcinfo.EndpointI
// Resolve implements the Resolver interface.
func (r *ZookeeperResolver) Resolve(ctx context.Context, desc string) (discovery.Result, error) {
path := desc
if !strings.HasPrefix(path, registry_zookeeper.Separator) {
path = registry_zookeeper.Separator + path
if !strings.HasPrefix(path, Separator) {
path = Separator + path
}
eps, err := r.getEndPoints(path)
if err != nil {
Expand All @@ -85,11 +84,11 @@ func (r *ZookeeperResolver) getEndPoints(path string) ([]string, error) {
}

func (r *ZookeeperResolver) detailEndPoints(path, ep string) (discovery.Instance, error) {
data, _, err := r.Get(path + registry_zookeeper.Separator + ep)
data, _, err := r.Get(path + Separator + ep)
if err != nil {
return nil, err
}
en := new(registry_zookeeper.NodeInfo)
en := new(NodeInfo)
err = json.Unmarshal(data, en)
if err != nil {
return nil, fmt.Errorf("unmarshal data [%s] error, cause %w", data, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
package registry
package registry_zookeeper

import (
"net"
"path/filepath"
"strings"

"github.com/cloudwego/kitex/pkg/registry"
registry_zookeeper "github.com/ishumei/krpc/registry-zookeeper"
)

type serviceInfo struct {
ServiceName string
Port string
*registry_zookeeper.NodeInfo
*NodeInfo
}

// path format as follows:
// /{ENV_ROLE}/{serviceName}/{ip}:{port}
func (n *serviceInfo) Path() string {
var path string
if !strings.HasPrefix(n.ServiceName, registry_zookeeper.Separator) {
path = registry_zookeeper.Separator + n.ServiceName
if !strings.HasPrefix(n.ServiceName, Separator) {
path = Separator + n.ServiceName
} else {
path = n.ServiceName
}
Expand Down Expand Up @@ -51,6 +50,6 @@ func newServiceInfo(info *registry.Info, inetPrefix string) *serviceInfo {
return &serviceInfo{
ServiceName: info.ServiceName,
Port: port,
NodeInfo: registry_zookeeper.NewNodeInfo(host, port),
NodeInfo: NewNodeInfo(host, port),
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package registry
package registry_zookeeper

import (
"testing"

registry_zookeeper "github.com/ishumei/krpc/registry-zookeeper"
)

func TestRegistryInfo_Path(t *testing.T) {
type fields struct {
ServiceName string
Port string
NodeInfo *registry_zookeeper.NodeInfo
NodeInfo *NodeInfo
}
tests := []struct {
name string
Expand All @@ -22,7 +20,7 @@ func TestRegistryInfo_Path(t *testing.T) {
fields: fields{
ServiceName: "test",
Port: "80",
NodeInfo: &registry_zookeeper.NodeInfo{
NodeInfo: &NodeInfo{
Host: "127.0.0.1",
Port: 80,
Weight: 1,
Expand Down

0 comments on commit 03b6842

Please sign in to comment.