Skip to content

Commit

Permalink
added api for node cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
dobyte committed Oct 29, 2024
1 parent 6f85551 commit 67fc06e
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 9 deletions.
7 changes: 6 additions & 1 deletion cluster/node/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (a *Actor) Next(ctx Context) {
a.mailbox <- ctx
}

// Deliver 投递消息到Actor中进行处理
// Deliver 投递消息到当前Actor中进行处理
func (a *Actor) Deliver(uid int64, message *cluster.Message) {
req := a.scheduler.node.reqPool.Get().(*request)
req.nid = a.scheduler.node.opts.id
Expand All @@ -128,6 +128,11 @@ func (a *Actor) Deliver(uid int64, message *cluster.Message) {
a.Next(req)
}

// Push 推送消息到本地Node队列上进行处理
func (a *Actor) Push(uid int64, message *cluster.Message) {
a.scheduler.node.router.deliver("", a.scheduler.node.opts.id, a.PID(), 0, uid, message.Seq, message.Route, message.Data)
}

// Destroy 销毁Actor
func (a *Actor) Destroy() {
if !a.state.CompareAndSwap(started, destroyed) {
Expand Down
2 changes: 2 additions & 0 deletions cluster/node/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type Context interface {
Context() context.Context
// GetIP 获取客户端IP
GetIP() (string, error)
// Deliver 投递消息给节点处理
Deliver(args *cluster.DeliverArgs) error
// Reply 回复消息
Reply(message *cluster.Message) error
// Response 响应消息
Expand Down
5 changes: 5 additions & 0 deletions cluster/node/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ func (e *event) GetIP() (string, error) {
})
}

// Deliver 投递消息给节点处理
func (e *event) Deliver(args *cluster.DeliverArgs) error {
return e.node.proxy.Deliver(e.ctx, args)
}

// Reply 回复消息
func (e *event) Reply(message *cluster.Message) error {
return e.node.proxy.Push(e.ctx, &cluster.PushArgs{
Expand Down
2 changes: 1 addition & 1 deletion cluster/node/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (p *provider) Deliver(ctx context.Context, gid, nid string, cid, uid int64,
}
}

p.node.router.deliver(gid, nid, cid, uid, msg.Seq, msg.Route, msg.Buffer)
p.node.router.deliver(gid, nid, "", cid, uid, msg.Seq, msg.Route, msg.Buffer)

return nil
}
Expand Down
5 changes: 2 additions & 3 deletions cluster/node/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,8 @@ func (p *Proxy) Broadcast(ctx context.Context, args *cluster.BroadcastArgs) erro

// Deliver 投递消息给节点处理
func (p *Proxy) Deliver(ctx context.Context, args *cluster.DeliverArgs) error {
if args.NID == p.GetID() {
p.node.router.deliver("", args.NID, 0, args.UID, args.Message.Seq, args.Message.Route, args.Message.Data)
return nil
if args.NID == p.node.opts.id {
return errors.ErrIllegalOperation
}

return p.nodeLinker.Deliver(ctx, &link.DeliverArgs{
Expand Down
16 changes: 14 additions & 2 deletions cluster/node/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type request struct {
ctx context.Context // 上下文
gid string // 来源网关ID
nid string // 来源节点ID
pid string // 来源Actor ID
cid int64 // 连接ID
uid int64 // 用户ID
message *cluster.Message // 请求消息
Expand Down Expand Up @@ -271,14 +272,25 @@ func (r *request) Deliver(args *cluster.DeliverArgs) error {
// Reply 回复消息
func (r *request) Reply(message *cluster.Message) error {
switch {
case r.gid != "":
case r.gid != "": // 来源于网关
return r.node.proxy.Push(r.ctx, &cluster.PushArgs{
GID: r.gid,
Kind: session.Conn,
Target: r.cid,
Message: message,
})
case r.nid != "":
case r.pid != "": // 来源于Actor
if actor, ok := r.node.scheduler.doLoad(r.pid); ok {
actor.Deliver(r.uid, message)
return nil
} else {
return errors.ErrIllegalOperation
}
case r.nid != "": // 来源于其他Node
if r.nid == r.node.opts.id {
return nil
}

return r.node.proxy.Deliver(r.ctx, &cluster.DeliverArgs{
NID: r.nid,
UID: r.uid,
Expand Down
3 changes: 2 additions & 1 deletion cluster/node/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,11 @@ func (r *Router) Group(groups ...func(group *RouterGroup)) *RouterGroup {
return group
}

func (r *Router) deliver(gid, nid string, cid, uid int64, seq, route int32, data interface{}) {
func (r *Router) deliver(gid, nid, pid string, cid, uid int64, seq, route int32, data interface{}) {
req := r.node.reqPool.Get().(*request)
req.gid = gid
req.nid = nid
req.pid = pid
req.cid = cid
req.uid = uid
req.message.Seq = seq
Expand Down
7 changes: 6 additions & 1 deletion cluster/node/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ func (s *Scheduler) remove(kind, id string) (*Actor, bool) {

// 加载Actor
func (s *Scheduler) load(kind, id string) (*Actor, bool) {
if actor, ok := s.actors.Load(kind + "/" + id); ok {
return s.doLoad(kind + "/" + id)
}

// 执行加载Actor
func (s *Scheduler) doLoad(pid string) (*Actor, bool) {
if actor, ok := s.actors.Load(pid); ok {
return actor.(*Actor), true
}

Expand Down

0 comments on commit 67fc06e

Please sign in to comment.