Skip to content

Commit

Permalink
refactor: add rebalancer system actor to handle cluster topology chan…
Browse files Browse the repository at this point in the history
…ges (#522)
  • Loading branch information
Tochemey authored Nov 20, 2024
1 parent a0383fe commit 7edea0c
Show file tree
Hide file tree
Showing 23 changed files with 499 additions and 289 deletions.
214 changes: 163 additions & 51 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ import (

"github.com/tochemey/goakt/v2/address"
"github.com/tochemey/goakt/v2/discovery"
"github.com/tochemey/goakt/v2/goaktpb"
"github.com/tochemey/goakt/v2/hash"
"github.com/tochemey/goakt/v2/internal/cluster"
"github.com/tochemey/goakt/v2/internal/errorschain"
"github.com/tochemey/goakt/v2/internal/eventstream"
"github.com/tochemey/goakt/v2/internal/internalpb"
"github.com/tochemey/goakt/v2/internal/internalpb/internalpbconnect"
Expand Down Expand Up @@ -144,6 +146,12 @@ type ActorSystem interface {
setActor(actor *PID)
// supervisor return the system supervisor
getSupervisor() *PID
// getPeerStateFromCache returns the peer state from the cache
getPeerStateFromCache(address string) (*internalpb.PeerState, error)
// getSystemActorName returns the system actor's name based upon their types
getSystemActorName(nameType nameType) string
// getCluster returns the cluster engine
getCluster() cluster.Interface
}

// ActorSystem represent a collection of actors on a given node
Expand Down Expand Up @@ -222,9 +230,10 @@ type actorSystem struct {
peersStateLoopInterval time.Duration
peersCache *sync.Map
clusterConfig *ClusterConfig
redistributionChan chan *cluster.Event
rebalancingChan chan *cluster.Event

supervisor *PID
rebalancer *PID
}

// enforce compilation error when all methods of the ActorSystem interface are not implemented
Expand Down Expand Up @@ -276,14 +285,23 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
opt.Apply(system)
}

// set the host and port
if err := system.setHostPort(); err != nil {
return nil, err
}

// we need to make sure the cluster kinds are defined
if system.clusterEnabled.Load() {
if err := system.clusterConfig.Validate(); err != nil {
return nil, err
}
}

system.scheduler = newScheduler(system.logger, system.shutdownTimeout, withSchedulerCluster(system.cluster), withSchedulerRemoting(NewRemoting()))
system.scheduler = newScheduler(system.logger,
system.shutdownTimeout,
withSchedulerCluster(system.cluster),
withSchedulerRemoting(NewRemoting()))

return system, nil
}

Expand Down Expand Up @@ -631,28 +649,19 @@ func (x *actorSystem) RemoteActor(ctx context.Context, actorName string) (addr *
// Start starts the actor system
func (x *actorSystem) Start(ctx context.Context) error {
x.started.Store(true)

if x.remotingEnabled.Load() {
x.enableRemoting(ctx)
}

if x.clusterEnabled.Load() {
if err := x.enableClustering(ctx); err != nil {
return err
}
if err := errorschain.
New(errorschain.ReturnFirst()).
AddError(x.createSystemSupervisor(ctx)).
AddError(x.createRebalancer(ctx)).
AddError(x.enableRemoting(ctx)).
AddError(x.enableClustering(ctx)).
Error(); err != nil {
// reset the start
x.started.Store(false)
return err
}

x.scheduler.Start(ctx)

actorName := x.getSystemActorName(supervisorType)
pid, err := x.configPID(ctx, actorName, newSystemSupervisor(x.logger))
if err != nil {
return fmt.Errorf("actor=%s failed to start system supervisor: %w", actorName, err)
}

x.supervisor = pid
x.setActor(pid)

go x.janitor()

x.logger.Infof("%s started..:)", x.name)
Expand Down Expand Up @@ -699,7 +708,7 @@ func (x *actorSystem) Stop(ctx context.Context) error {
close(x.actorsChan)
x.clusterSyncStopSig <- types.Unit{}
x.clusterEnabled.Store(false)
close(x.redistributionChan)
close(x.rebalancingChan)
}

// stop the supervisor actor
Expand Down Expand Up @@ -1044,6 +1053,22 @@ func (x *actorSystem) getSupervisor() *PID {
return supervisor
}

// getPeerStateFromCache returns the peer state from the cache
func (x *actorSystem) getPeerStateFromCache(address string) (*internalpb.PeerState, error) {
x.locker.Lock()
value, ok := x.peersCache.Load(address)
x.locker.Unlock()
if !ok {
return nil, ErrPeerNotFound
}

peerState := new(internalpb.PeerState)
if err := proto.Unmarshal(value.([]byte), peerState); err != nil {
return nil, err
}
return peerState, nil
}

// setActor implements ActorSystem.
func (x *actorSystem) setActor(actor *PID) {
x.actors.Set(actor)
Expand All @@ -1058,6 +1083,10 @@ func (x *actorSystem) setActor(actor *PID) {
// enableClustering enables clustering. When clustering is enabled remoting is also enabled to facilitate remote
// communication
func (x *actorSystem) enableClustering(ctx context.Context) error {
if !x.clusterEnabled.Load() {
return nil
}

x.logger.Info("enabling clustering...")

if !x.remotingEnabled.Load() {
Expand Down Expand Up @@ -1108,7 +1137,7 @@ func (x *actorSystem) enableClustering(ctx context.Context) error {
x.locker.Lock()
x.cluster = clusterEngine
x.clusterEventsChan = clusterEngine.Events()
x.redistributionChan = make(chan *cluster.Event, 1)
x.rebalancingChan = make(chan *cluster.Event, 1)
for _, kind := range x.clusterConfig.Kinds() {
x.registry.Register(kind)
x.logger.Infof("cluster kind=(%s) registered", types.TypeName(kind))
Expand All @@ -1118,24 +1147,18 @@ func (x *actorSystem) enableClustering(ctx context.Context) error {
go x.clusterEventsLoop()
go x.replicationLoop()
go x.peersStateLoop()
go x.redistributionLoop()
go x.rebalancingLoop()

x.logger.Info("clustering enabled...:)")
return nil
}

// enableRemoting enables the remoting service to handle remote messaging
func (x *actorSystem) enableRemoting(ctx context.Context) {
x.logger.Info("enabling remoting...")

remotingHost, remotingPort, err := tcp.GetHostPort(fmt.Sprintf("%s:%d", x.host, x.port))
if err != nil {
x.logger.Panic(fmt.Errorf("failed to resolve remoting TCP address: %w", err))
func (x *actorSystem) enableRemoting(ctx context.Context) error {
if !x.remotingEnabled.Load() {
return nil
}

x.host = remotingHost
x.port = int32(remotingPort)

x.logger.Info("enabling remoting...")
remotingServicePath, remotingServiceHandler := internalpbconnect.NewRemotingServiceHandler(x)
clusterServicePath, clusterServiceHandler := internalpbconnect.NewClusterServiceHandler(x)

Expand All @@ -1144,13 +1167,14 @@ func (x *actorSystem) enableRemoting(ctx context.Context) {
mux.Handle(clusterServicePath, clusterServiceHandler)

x.locker.Lock()
defer x.locker.Unlock()

// configure the appropriate server
if err := x.configureServer(ctx, mux); err != nil {
x.locker.Unlock()
x.logger.Panic(fmt.Errorf("failed enable remoting: %w", err))
return
x.logger.Error(fmt.Errorf("failed enable remoting: %w", err))
return err
}
x.locker.Unlock()

go func() {
if err := x.startHTTPServer(); err != nil {
Expand All @@ -1162,6 +1186,7 @@ func (x *actorSystem) enableRemoting(ctx context.Context) {

x.remoting = NewRemoting()
x.logger.Info("remoting enabled...:)")
return nil
}

// reset the actor system
Expand Down Expand Up @@ -1234,7 +1259,7 @@ func (x *actorSystem) clusterEventsLoop() {
x.eventsStream.Publish(eventsTopic, message)
x.logger.Debugf("cluster event=(%s) successfully published by node=(%s)", event.Type, x.name)
}
x.redistributionChan <- event
x.rebalancingChan <- event
}
}
}
Expand Down Expand Up @@ -1306,18 +1331,38 @@ func (x *actorSystem) peersStateLoop() {
x.logger.Info("peers state synchronization has stopped...")
}

func (x *actorSystem) redistributionLoop() {
for event := range x.redistributionChan {
// rebalancingLoop help perform cluster rebalancing
func (x *actorSystem) rebalancingLoop() {
for event := range x.rebalancingChan {
if x.InCluster() {
// check for cluster rebalancing
if err := x.redistribute(context.Background(), event); err != nil {
x.logger.Errorf("cluster rebalancing failed: %v", err)
// TODO: panic or retry or shutdown actor system
// get peer state
peerState, err := x.nodeLeftStateFromEvent(event)
if err != nil {
x.logger.Error(err)
continue
}

ctx := context.Background()
if !x.shouldRebalance(ctx, peerState) {
continue
}

message := &internalpb.Rebalance{PeerState: peerState}
if err := x.supervisor.Tell(ctx, x.rebalancer, message); err != nil {
x.logger.Error(err)
}
}
}
}

// shouldRebalance returns true when the current can perform the cluster rebalancing
func (x *actorSystem) shouldRebalance(ctx context.Context, peerState *internalpb.PeerState) bool {
return !(peerState == nil ||
proto.Equal(peerState, new(internalpb.PeerState)) ||
len(peerState.GetActors()) == 0 ||
!x.cluster.IsLeader(ctx))
}

// processPeerState processes a given peer synchronization record.
func (x *actorSystem) processPeerState(ctx context.Context, peer *cluster.Peer) error {
peerAddress := net.JoinHostPort(peer.Host, strconv.Itoa(peer.Port))
Expand Down Expand Up @@ -1360,7 +1405,6 @@ func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor, o
withInitMaxRetries(x.actorInitMaxRetries),
withCustomLogger(x.logger),
withActorSystem(x),
withSupervisorDirective(x.supervisorDirective),
withEventsStream(x.eventsStream),
withInitTimeout(x.actorInitTimeout),
}
Expand All @@ -1371,12 +1415,20 @@ func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor, o
pidOpts = append(pidOpts, withMailbox(spawnConfig.mailbox))
}

// set the supervisor directive
if spawnConfig.supervisorDirective != nil {
pidOpts = append(pidOpts, withSupervisorDirective(spawnConfig.supervisorDirective))
} else {
// use the system-wide supervisor directive
pidOpts = append(pidOpts, withSupervisorDirective(x.supervisorDirective))
}

// enable stash
if x.stashEnabled {
pidOpts = append(pidOpts, withStash())
}

// disable passivation for system supervisor
// disable passivation for system actor
if isSystemName(name) {
pidOpts = append(pidOpts, withPassivationDisabled())
} else {
Expand All @@ -1396,7 +1448,12 @@ func (x *actorSystem) configPID(ctx context.Context, name string, actor Actor, o
return pid, nil
}

// getSystemActorName returns the system supervisor name
// getCluster returns the cluster engine
func (x *actorSystem) getCluster() cluster.Interface {
return x.cluster
}

// getSystemActorName returns the system actor name
func (x *actorSystem) getSystemActorName(nameType nameType) string {
if x.remotingEnabled.Load() {
return fmt.Sprintf(
Expand All @@ -1416,10 +1473,6 @@ func (x *actorSystem) getSystemActorName(nameType nameType) string {
)
}

func isSystemName(name string) bool {
return strings.HasPrefix(name, systemNamePrefix)
}

// actorAddress returns the actor path provided the actor name
func (x *actorSystem) actorAddress(name string) *address.Address {
return address.New(name, x.name, x.host, int(x.port))
Expand Down Expand Up @@ -1458,6 +1511,65 @@ func (x *actorSystem) configureServer(ctx context.Context, mux *nethttp.ServeMux
return nil
}

// nodeLeftStateFromEvent returns the node left state from the cluster event
func (x *actorSystem) nodeLeftStateFromEvent(event *cluster.Event) (*internalpb.PeerState, error) {
if event.Type != cluster.NodeLeft {
return nil, nil
}
nodeLeft := new(goaktpb.NodeLeft)
if err := event.Payload.UnmarshalTo(nodeLeft); err != nil {
return nil, err
}

return x.getPeerStateFromCache(nodeLeft.GetAddress())
}

// setHostPort sets the host and port
func (x *actorSystem) setHostPort() error {
remotingHost, remotingPort, err := tcp.GetHostPort(fmt.Sprintf("%s:%d", x.host, x.port))
if err != nil {
return err
}

x.host = remotingHost
x.port = int32(remotingPort)
return nil
}

// createSystemSupervisor creates the system supervisor
func (x *actorSystem) createSystemSupervisor(ctx context.Context) error {
var err error
actorName := x.getSystemActorName(supervisorType)
x.supervisor, err = x.configPID(ctx, actorName, newSystemSupervisor(x.logger))
if err != nil {
return fmt.Errorf("actor=%s failed to start system supervisor: %w", actorName, err)
}

x.setActor(x.supervisor)
return nil
}

// createRebalancer creates the cluster rebalancer
func (x *actorSystem) createRebalancer(ctx context.Context) error {
var err error
actorName := x.getSystemActorName(rebalancerType)
x.rebalancer, err = x.configPID(ctx,
actorName,
newRebalancer(x.reflection),
WithSupervisor(NewResumeDirective()),
)
if err != nil {
return fmt.Errorf("actor=%s failed to start cluster rebalancer: %w", actorName, err)
}
x.setActor(x.rebalancer)
x.supervisor.Watch(x.rebalancer)
return nil
}

func isSystemName(name string) bool {
return strings.HasPrefix(name, systemNamePrefix)
}

// getServer creates an instance of http server
func getServer(ctx context.Context, address string) *nethttp.Server {
return &nethttp.Server{
Expand Down
Loading

0 comments on commit 7edea0c

Please sign in to comment.