Skip to content

Commit

Permalink
added rolling-update feature
Browse files Browse the repository at this point in the history
  • Loading branch information
dobyte committed Nov 19, 2024
1 parent c51b68d commit 86f7e7a
Show file tree
Hide file tree
Showing 16 changed files with 328 additions and 71 deletions.
10 changes: 5 additions & 5 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
Shut State = iota // 关闭(节点已经关闭,无法正常访问该节点)
Work // 工作(节点正常工作,可以分配更多玩家到该节点)
Busy // 繁忙(节点资源紧张,不建议分配更多玩家到该节点上)
Hang // 挂起(节点即将关闭,正处于资源回收中)
Hang // 挂起(节点即将销毁,正处于资源回收中)
)

// State 集群实例状态
Expand Down Expand Up @@ -73,9 +73,9 @@ func (e Event) String() string {
}

const (
Init Hook = iota // 初始化组件
Init Hook = iota // 初始组件
Start // 启动组件
Restart // 重启组件
Close // 关闭组件
Destroy // 销毁组件
)

Expand All @@ -86,8 +86,8 @@ func (h Hook) String() string {
switch h {
case Start:
return "start"
case Restart:
return "restart"
case Close:
return "close"
case Destroy:
return "destroy"
default:
Expand Down
8 changes: 7 additions & 1 deletion cluster/node/actor_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package node
type actorOptions struct {
id string // Actor编号
args []any // 传递到Processor中的参数
wait bool // 是否需要等待
}

type ActorOption func(o *actorOptions)

func defaultActorOptions() *actorOptions {
return &actorOptions{}
return &actorOptions{wait: true}
}

// WithActorID 设置Actor编号
Expand All @@ -20,3 +21,8 @@ func WithActorID(id string) ActorOption {
func WithActorArgs(args ...any) ActorOption {
return func(o *actorOptions) { o.args = append(o.args, args...) }
}

// WithActorNonWait 设置Actor无需等待属性(Node组件关关闭时无需等待此Actor结束)
func WithActorNonWait() ActorOption {
return func(o *actorOptions) { o.wait = false }
}
26 changes: 23 additions & 3 deletions cluster/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Node struct {
fnChan chan func()
scheduler *Scheduler
transporter transport.Server
wg *sync.WaitGroup
rw sync.RWMutex
hooks map[cluster.Hook][]HookHandler
}
Expand All @@ -63,6 +64,7 @@ func NewNode(opts ...Option) *Node {
n.instances = make([]*registry.ServiceInstance, 0)
n.fnChan = make(chan func(), 4096)
n.state.Store(int32(cluster.Shut))
n.wg = &sync.WaitGroup{}
n.evtPool = &sync.Pool{New: func() interface{} {
return &event{
ctx: context.Background(),
Expand Down Expand Up @@ -112,7 +114,7 @@ func (n *Node) Init() {

// Start 启动节点
func (n *Node) Start() {
if n.state.Swap(int32(cluster.Work)) != int32(cluster.Shut) {
if !n.state.CompareAndSwap(int32(cluster.Shut), int32(cluster.Work)) {
return
}

Expand All @@ -131,9 +133,24 @@ func (n *Node) Start() {
n.runHookFunc(cluster.Start)
}

// Close 关闭节点
func (n *Node) Close() {
if !n.state.CompareAndSwap(int32(cluster.Work), int32(cluster.Hang)) {
if !n.state.CompareAndSwap(int32(cluster.Busy), int32(cluster.Hang)) {
return
}
}

n.registerServiceInstances()

n.runHookFunc(cluster.Close)

n.wg.Wait()
}

// Destroy 销毁节点服务器
func (n *Node) Destroy() {
if n.state.Swap(int32(cluster.Shut)) == int32(cluster.Shut) {
if !n.state.CompareAndSwap(int32(cluster.Hang), int32(cluster.Shut)) {
return
}

Expand Down Expand Up @@ -182,7 +199,10 @@ func (n *Node) dispatch() {
if !ok {
return
}
xcall.Call(handle)
xcall.Call(func() {
handle()
n.wg.Done()
})
}
}
}()
Expand Down
33 changes: 27 additions & 6 deletions cluster/node/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,22 +125,42 @@ func (p *Proxy) UnbindGate(ctx context.Context, uid int64) error {
// 单个用户可以绑定到多个节点服务器上,相同名称的节点服务器只能绑定一个,多次绑定会到相同名称的节点服务器会覆盖之前的绑定。
// 绑定操作会通过发布订阅方式同步到网关服务器和其他相关节点服务器上。
func (p *Proxy) BindNode(ctx context.Context, uid int64, nameAndNID ...string) error {
name, nid := p.node.opts.name, p.node.opts.id

if len(nameAndNID) >= 2 && nameAndNID[0] != "" && nameAndNID[1] != "" {
return p.nodeLinker.Bind(ctx, uid, nameAndNID[0], nameAndNID[1])
} else {
return p.nodeLinker.Bind(ctx, uid, p.node.opts.name, p.node.opts.id)
name, nid = nameAndNID[0], nameAndNID[1]
}

if err := p.nodeLinker.Bind(ctx, uid, name, nid); err != nil {
return err
}

if nid == p.node.opts.id {
p.node.wg.Add(1)
}

return nil
}

// UnbindNode 解绑节点
// 解绑时会对对应名称的节点服务器进行解绑,解绑时会对解绑节点ID进行校验,不匹配则解绑失败。
// 解绑操作会通过发布订阅方式同步到网关服务器和其他相关节点服务器上。
func (p *Proxy) UnbindNode(ctx context.Context, uid int64, nameAndNID ...string) error {
name, nid := p.node.opts.name, p.node.opts.id

if len(nameAndNID) >= 2 && nameAndNID[0] != "" && nameAndNID[1] != "" {
return p.nodeLinker.Unbind(ctx, uid, nameAndNID[0], nameAndNID[1])
} else {
return p.nodeLinker.Unbind(ctx, uid, p.node.opts.name, p.node.opts.id)
name, nid = nameAndNID[0], nameAndNID[1]
}

if err := p.nodeLinker.Unbind(ctx, uid, name, nid); err != nil {
return err
}

if nid == p.node.opts.id {
p.node.wg.Done()
}

return nil
}

// LocateGate 定位用户所在网关
Expand Down Expand Up @@ -259,6 +279,7 @@ func (p *Proxy) Deliver(ctx context.Context, args *cluster.DeliverArgs) error {

// Invoke 调用函数(线程安全)
func (p *Proxy) Invoke(fn func()) {
p.node.wg.Add(1)
p.node.fnChan <- fn
}

Expand Down
12 changes: 11 additions & 1 deletion cluster/node/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (s *Scheduler) spawn(creator Creator, opts ...ActorOption) (*Actor, error)
return nil, errors.ErrActorExists
}

if act.opts.wait {
s.node.wg.Add(1)
}

act.processor.Init()

if _, ok := s.kinds.Load(act.Kind()); !ok {
Expand Down Expand Up @@ -74,7 +78,13 @@ func (s *Scheduler) kill(kind, id string) bool {
return false
}

return act.destroy()
ok = act.destroy()

if act.opts.wait {
s.node.wg.Done()
}

return ok
}

// 移除Actor
Expand Down
8 changes: 4 additions & 4 deletions component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ type Component interface {
Init()
// Start 启动组件
Start()
// Restart 重启组件
Restart()
// Close 关闭组件
Close()
// Destroy 销毁组件
Destroy()
}
Expand All @@ -25,8 +25,8 @@ func (b *Base) Init() {}
// Start 启动组件
func (b *Base) Start() {}

// Restart 重启组件
func (b *Base) Restart() {}
// Close 关闭组件
func (b *Base) Close() {}

// Destroy 销毁组件
func (b *Base) Destroy() {}
87 changes: 69 additions & 18 deletions container.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
package due

import (
"context"
"github.com/dobyte/due/v2/component"
"github.com/dobyte/due/v2/config"
"github.com/dobyte/due/v2/core/info"
"github.com/dobyte/due/v2/etc"
"github.com/dobyte/due/v2/eventbus"
"github.com/dobyte/due/v2/log"
"github.com/dobyte/due/v2/task"
"github.com/dobyte/due/v2/utils/xcall"
"github.com/dobyte/due/v2/utils/xfile"
"os"
"os/signal"
"runtime"
"strconv"
"syscall"
"time"
)

type Container struct {
sig chan os.Signal
components []component.Component
}

// NewContainer 创建一个容器
func NewContainer() *Container {
return &Container{sig: make(chan os.Signal)}
return &Container{}
}

// Add 添加组件
Expand All @@ -33,37 +35,79 @@ func (c *Container) Add(components ...component.Component) {

// Serve 启动容器
func (c *Container) Serve() {
c.doSaveProcessID()

c.doPrintFrameworkInfo()

c.doInitComponents()

c.doStartComponents()

c.doWaitSystemSignal()

c.doCloseComponents()

c.doDestroyComponents()

c.doClearModules()
}

// 初始化所有组件
func (c *Container) doInitComponents() {
for _, comp := range c.components {
comp.Init()
}
}

info.PrintFrameworkInfo()
// 启动所有组件
func (c *Container) doStartComponents() {
for _, comp := range c.components {
comp.Start()
}
}

info.PrintGlobalInfo()
// 关闭所有组件
func (c *Container) doCloseComponents() {
g := xcall.NewGoroutines()

for _, comp := range c.components {
comp.Start()
g.Add(comp.Close)
}

c.doSavePID()
g.Run(context.Background(), etc.Get("etc.shutdownTimeout", 0).Duration())
}

// 销毁所有组件
func (c *Container) doDestroyComponents() {
g := xcall.NewGoroutines()

for _, comp := range c.components {
g.Add(comp.Destroy)
}

g.Run(context.Background(), 5*time.Second)
}

// 等待系统信号
func (c *Container) doWaitSystemSignal() {
sig := make(chan os.Signal)

switch runtime.GOOS {
case `windows`:
signal.Notify(c.sig, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
signal.Notify(sig, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM)
default:
signal.Notify(c.sig, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGABRT, syscall.SIGKILL, syscall.SIGTERM)
signal.Notify(sig, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGABRT, syscall.SIGKILL, syscall.SIGTERM)
}

sig := <-c.sig

log.Warnf("process got signal %v, container will close", sig)
s := <-sig

signal.Stop(c.sig)
signal.Stop(sig)

for _, comp := range c.components {
comp.Destroy()
}
log.Warnf("process got signal %v, container will close", s)
}

// 清理所有模块
func (c *Container) doClearModules() {
if err := eventbus.Close(); err != nil {
log.Warnf("eventbus close failed: %v", err)
}
Expand All @@ -77,14 +121,21 @@ func (c *Container) Serve() {
log.Close()
}

func (c *Container) doSavePID() {
// 保存进程号
func (c *Container) doSaveProcessID() {
filename := etc.Get("etc.pid").String()
if filename == "" {
return
}

err := xfile.WriteFile(filename, []byte(strconv.Itoa(syscall.Getpid())))
if err != nil {
if err := xfile.WriteFile(filename, []byte(strconv.Itoa(syscall.Getpid()))); err != nil {
log.Fatalf("pid save failed: %v", err)
}
}

// 打印框架信息
func (c *Container) doPrintFrameworkInfo() {
info.PrintFrameworkInfo()

info.PrintGlobalInfo()
}
Loading

0 comments on commit 86f7e7a

Please sign in to comment.