Skip to content

Commit

Permalink
Merge branch 'v1-feature-main'
Browse files Browse the repository at this point in the history
  • Loading branch information
dobyte committed Jun 29, 2023
2 parents dad808e + 56fd55c commit 6168122
Show file tree
Hide file tree
Showing 58 changed files with 1,109 additions and 553 deletions.
33 changes: 15 additions & 18 deletions cluster/gate/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/dobyte/due/component"
"github.com/dobyte/due/log"
"github.com/dobyte/due/network"
"github.com/dobyte/due/packet"
"github.com/dobyte/due/registry"
)

Expand Down Expand Up @@ -120,37 +119,35 @@ func (g *Gate) stopNetworkServer() {
// 处理连接打开
func (g *Gate) handleConnect(conn network.Conn) {
g.session.AddConn(conn)

cid, uid := conn.ID(), conn.UID()
ctx, cancel := context.WithTimeout(g.ctx, g.opts.timeout)
g.proxy.trigger(ctx, cluster.Connect, cid, uid)
cancel()
}

// 处理断开连接
func (g *Gate) handleDisconnect(conn network.Conn) {
g.session.RemConn(conn)

if cid, uid := conn.ID(), conn.UID(); uid > 0 {
if cid, uid := conn.ID(), conn.UID(); uid != 0 {
ctx, cancel := context.WithTimeout(g.ctx, g.opts.timeout)
err := g.proxy.unbindGate(ctx, cid, uid)
_ = g.proxy.unbindGate(ctx, cid, uid)
g.proxy.trigger(ctx, cluster.Disconnect, cid, uid)
cancel()
} else {
ctx, cancel := context.WithTimeout(g.ctx, g.opts.timeout)
g.proxy.trigger(ctx, cluster.Disconnect, cid, uid)
cancel()
if err != nil {
log.Errorf("user unbind failed, gid: %d, uid: %d, err: %v", g.opts.id, uid, err)
}
}
}

// 处理接收到的消息
func (g *Gate) handleReceive(conn network.Conn, data []byte, _ int) {
message, err := packet.Unpack(data)
if err != nil {
log.Errorf("unpack data to struct failed: %v", err)
return
}

cid, uid := conn.ID(), conn.UID()
ctx, cancel := context.WithTimeout(g.ctx, g.opts.timeout)
err = g.proxy.deliver(ctx, cid, uid, message)
g.proxy.deliver(ctx, cid, uid, data)
cancel()
if err != nil {
log.Warnf("deliver message failed: %v", err)
}
}

// 启动RPC服务器
Expand Down Expand Up @@ -191,7 +188,7 @@ func (g *Gate) registerServiceInstance() {
err := g.opts.registry.Register(ctx, g.instance)
cancel()
if err != nil {
log.Fatalf("register service instance failed: %v", err)
log.Fatalf("register dispatcher instance failed: %v", err)
}
}

Expand All @@ -201,7 +198,7 @@ func (g *Gate) deregisterServiceInstance() {
err := g.opts.registry.Deregister(ctx, g.instance)
defer cancel()
if err != nil {
log.Errorf("deregister service instance failed: %v", err)
log.Errorf("deregister dispatcher instance failed: %v", err)
}
}

Expand Down
23 changes: 16 additions & 7 deletions cluster/gate/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gate

import (
"context"
"github.com/dobyte/due/cluster"
"github.com/dobyte/due/packet"
"github.com/dobyte/due/session"
)
Expand Down Expand Up @@ -31,7 +32,7 @@ func (p *provider) Bind(ctx context.Context, cid, uid int64) error {

// Unbind 解绑用户与网关间的关系
func (p *provider) Unbind(ctx context.Context, uid int64) error {
if uid <= 0 {
if uid == 0 {
return ErrInvalidArgument
}

Expand All @@ -49,22 +50,30 @@ func (p *provider) Unbind(ctx context.Context, uid int64) error {
}

// GetIP 获取客户端IP地址
func (p *provider) GetIP(kind session.Kind, target int64) (string, error) {
func (p *provider) GetIP(ctx context.Context, kind session.Kind, target int64) (string, error) {
return p.gate.session.RemoteIP(kind, target)
}

// Push 发送消息
func (p *provider) Push(kind session.Kind, target int64, message *packet.Message) error {
func (p *provider) Push(ctx context.Context, kind session.Kind, target int64, message *packet.Message) error {
msg, err := packet.Pack(message)
if err != nil {
return err
}

return p.gate.session.Push(kind, target, msg)
err = p.gate.session.Push(kind, target, msg)
if kind == session.User && err == session.ErrNotFoundSession {
err = p.gate.opts.locator.Rem(ctx, target, cluster.Gate, p.gate.opts.id)
if err != nil {
return err
}
}

return err
}

// Multicast 推送组播消息
func (p *provider) Multicast(kind session.Kind, targets []int64, message *packet.Message) (int64, error) {
func (p *provider) Multicast(ctx context.Context, kind session.Kind, targets []int64, message *packet.Message) (int64, error) {
if len(targets) == 0 {
return 0, nil
}
Expand All @@ -78,7 +87,7 @@ func (p *provider) Multicast(kind session.Kind, targets []int64, message *packet
}

// Broadcast 推送广播消息
func (p *provider) Broadcast(kind session.Kind, message *packet.Message) (int64, error) {
func (p *provider) Broadcast(ctx context.Context, kind session.Kind, message *packet.Message) (int64, error) {
msg, err := packet.Pack(message)
if err != nil {
return 0, err
Expand All @@ -88,6 +97,6 @@ func (p *provider) Broadcast(kind session.Kind, message *packet.Message) (int64,
}

// Disconnect 断开连接
func (p *provider) Disconnect(kind session.Kind, target int64, isForce bool) error {
func (p *provider) Disconnect(ctx context.Context, kind session.Kind, target int64, isForce bool) error {
return p.gate.session.Close(kind, target, isForce)
}
44 changes: 23 additions & 21 deletions cluster/gate/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ import (
"context"
"github.com/dobyte/due/cluster"
"github.com/dobyte/due/internal/link"
"github.com/dobyte/due/internal/router"
"github.com/dobyte/due/packet"

"github.com/dobyte/due/log"
"github.com/dobyte/due/packet"
)

var (
Expand Down Expand Up @@ -41,14 +40,7 @@ func (p *proxy) bindGate(ctx context.Context, cid, uid int64) error {
return err
}

err = p.link.Trigger(ctx, &link.TriggerArgs{
Event: cluster.Reconnect,
CID: cid,
UID: uid,
})
if err != nil && err != ErrNotFoundUserSource && err != router.ErrNotFoundEndpoint {
log.Errorf("trigger event failed, gid: %s, uid: %d, event: %v, err: %v", p.gate.opts.id, uid, cluster.Reconnect, err)
}
p.trigger(ctx, cluster.Reconnect, cid, uid)

return nil
}
Expand All @@ -57,28 +49,38 @@ func (p *proxy) bindGate(ctx context.Context, cid, uid int64) error {
func (p *proxy) unbindGate(ctx context.Context, cid, uid int64) error {
err := p.gate.opts.locator.Rem(ctx, uid, cluster.Gate, p.gate.opts.id)
if err != nil {
return err
log.Errorf("user unbind failed, gid: %d, cid: %d, uid: %d, err: %v", p.gate.opts.id, cid, uid, err)
}

err = p.link.Trigger(ctx, &link.TriggerArgs{
Event: cluster.Disconnect,
return err
}

// 触发事件
func (p *proxy) trigger(ctx context.Context, event cluster.Event, cid, uid int64) {
if err := p.link.Trigger(ctx, &link.TriggerArgs{
Event: event,
CID: cid,
UID: uid,
})
if err != nil && err != ErrNotFoundUserSource && err != router.ErrNotFoundEndpoint {
log.Errorf("trigger event failed, gid: %s, uid: %d, event: %v, err: %v", p.gate.opts.id, uid, cluster.Disconnect, err)
}); err != nil {
log.Warnf("trigger event failed, gid: %s, cid: %d, uid: %d, event: %v, err: %v", p.gate.opts.id, cid, uid, event, err)
}

return nil
}

// 投递消息
func (p *proxy) deliver(ctx context.Context, cid, uid int64, message *packet.Message) error {
return p.link.Deliver(ctx, &link.DeliverArgs{
func (p *proxy) deliver(ctx context.Context, cid, uid int64, data []byte) {
message, err := packet.Unpack(data)
if err != nil {
log.Errorf("unpack data to struct failed: %v", err)
return
}

if err = p.link.Deliver(ctx, &link.DeliverArgs{
CID: cid,
UID: uid,
Message: message,
})
}); err != nil {
log.Errorf("deliver message failed: %v", err)
}
}

// 启动监听
Expand Down
8 changes: 4 additions & 4 deletions cluster/mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (m *Mesh) startRPCServer() {
for _, entity := range m.services {
err = m.rpc.RegisterService(entity.desc, entity.provider)
if err != nil {
log.Fatalf("register service failed: %v", err)
log.Fatalf("register dispatcher failed: %v", err)
}
}

Expand All @@ -137,7 +137,7 @@ func (m *Mesh) registerServiceInstances() {
for _, entity := range m.services {
id, err := xuuid.UUID()
if err != nil {
log.Fatalf("generate service id failed: %v", err)
log.Fatalf("generate dispatcher id failed: %v", err)
}

m.instances = append(m.instances, &registry.ServiceInstance{
Expand All @@ -161,7 +161,7 @@ func (m *Mesh) registerServiceInstances() {
}

if err := eg.Wait(); err != nil {
log.Fatalf("register service instance failed: %v", err)
log.Fatalf("register dispatcher instance failed: %v", err)
}
}

Expand All @@ -178,7 +178,7 @@ func (m *Mesh) deregisterServiceInstances() {
}

if err := eg.Wait(); err != nil {
log.Errorf("deregister service instance failed: %v", err)
log.Errorf("deregister dispatcher instance failed: %v", err)
}
}

Expand Down
10 changes: 8 additions & 2 deletions cluster/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,21 +169,27 @@ func (n *Node) registerServiceInstance() {
})
}

events := make([]cluster.Event, 0, len(n.events.events))
for event := range n.events.events {
events = append(events, event)
}

n.instance = &registry.ServiceInstance{
ID: n.opts.id,
Name: string(cluster.Node),
Kind: cluster.Node,
Alias: n.opts.name,
State: n.getState(),
Routes: routes,
Events: events,
Endpoint: n.rpc.Endpoint().String(),
}

ctx, cancel := context.WithTimeout(n.ctx, 10*time.Second)
err := n.opts.registry.Register(ctx, n.instance)
cancel()
if err != nil {
log.Fatalf("register service instance failed: %v", err)
log.Fatalf("register dispatcher instance failed: %v", err)
}
}

Expand All @@ -193,7 +199,7 @@ func (n *Node) deregisterServiceInstance() {
err := n.opts.registry.Deregister(ctx, n.instance)
cancel()
if err != nil {
log.Errorf("deregister service instance failed: %v", err)
log.Errorf("deregister dispatcher instance failed: %v", err)
}
}

Expand Down
Loading

0 comments on commit 6168122

Please sign in to comment.