diff --git a/remote/endpoint_manager.go b/remote/endpoint_manager.go index 034a23ce..90edfdc3 100644 --- a/remote/endpoint_manager.go +++ b/remote/endpoint_manager.go @@ -31,7 +31,11 @@ func (el *endpointLazy) connect() { el.manager.remote.actorSystem.Logger().Debug("connecting to remote address", slog.String("address", el.address)) em := el.manager system := em.remote.actorSystem - rst, _ := system.Root.RequestFuture(em.endpointSupervisor, el.address, -1).Result() + rst, err := system.Root.RequestFuture(em.endpointSupervisor, el.address, -1).Result() + if err != nil { + system.Logger().Error("failed to connect to remote address", slog.String("address", el.address), slog.Any("error", err)) + return + } ep := rst.(*endpoint) el.Set(ep) } @@ -42,8 +46,8 @@ func (el *endpointLazy) Set(ep *endpoint) { func (el *endpointLazy) Get() *endpoint { el.once.Do(el.connect) - ep := el.endpoint.Load() - return ep.(*endpoint) + ep, _ := el.endpoint.Load().(*endpoint) + return ep } type endpoint struct { @@ -167,6 +171,10 @@ func (em *endpointManager) endpointEvent(evn interface{}) { em.removeEndpoint(msg) case *EndpointConnectedEvent: endpoint := em.ensureConnected(msg.Address) + if endpoint == nil { + em.remote.Logger().Error("EndpointManager failed to handle endpoint connected event", slog.String("address", msg.Address)) + return + } em.remote.actorSystem.Root.Send(endpoint.watcher, msg) } } @@ -177,6 +185,16 @@ func (em *endpointManager) remoteTerminate(msg *remoteTerminate) { } address := msg.Watchee.Address endpoint := em.ensureConnected(address) + if endpoint == nil { + terminated := &actor.Terminated{ + Who: msg.Watchee, + Why: actor.TerminatedReason_Stopped, + } + if ref, ok := em.remote.actorSystem.ProcessRegistry.GetLocal(msg.Watcher.Id); ok { + ref.SendSystemMessage(msg.Watcher, terminated) + } + return + } em.remote.actorSystem.Root.Send(endpoint.watcher, msg) } @@ -186,6 +204,16 @@ func (em *endpointManager) remoteWatch(msg *remoteWatch) { } address := msg.Watchee.Address endpoint := em.ensureConnected(address) + if endpoint == nil { + terminated := &actor.Terminated{ + Who: msg.Watchee, + Why: actor.TerminatedReason_AddressTerminated, + } + if ref, ok := em.remote.actorSystem.ProcessRegistry.GetLocal(msg.Watcher.Id); ok { + ref.SendSystemMessage(msg.Watcher, terminated) + } + return + } em.remote.actorSystem.Root.Send(endpoint.watcher, msg) } @@ -195,6 +223,9 @@ func (em *endpointManager) remoteUnwatch(msg *remoteUnwatch) { } address := msg.Watchee.Address endpoint := em.ensureConnected(address) + if endpoint == nil { + return + } em.remote.actorSystem.Root.Send(endpoint.watcher, msg) } @@ -210,6 +241,14 @@ func (em *endpointManager) remoteDeliver(msg *remoteDeliver) { } address := msg.target.Address endpoint := em.ensureConnected(address) + if endpoint == nil { + em.remote.actorSystem.EventStream.Publish(&actor.DeadLetterEvent{ + PID: msg.target, + Message: msg.message, + Sender: msg.sender, + }) + return + } em.remote.actorSystem.Root.Send(endpoint.writer, msg) } @@ -220,7 +259,11 @@ func (em *endpointManager) ensureConnected(address string) *endpoint { e, _ = em.connections.LoadOrStore(address, el) } el := e.(*endpointLazy) - return el.Get() + ep := el.Get() + if ep == nil { + em.connections.Delete(address) + } + return ep } // func (em *endpointManager) ensureConnected(address string) *endpoint { @@ -252,6 +295,9 @@ func (em *endpointManager) removeEndpoint(msg *EndpointTerminatedEvent) { if le.unloaded.CompareAndSwap(false, true) { em.connections.Delete(msg.Address) ep := le.Get() + if ep == nil { + return + } em.remote.Logger().Debug("Sending EndpointTerminatedEvent to EndpointWatcher and EndpointWriter", slog.String("address", msg.Address)) em.remote.actorSystem.Root.Send(ep.watcher, msg) em.remote.actorSystem.Root.Send(ep.writer, msg)