From 0bd7756fc9905ebf3c9b073d13d370a3b3a91506 Mon Sep 17 00:00:00 2001 From: Arsene Date: Sat, 12 Oct 2024 15:00:15 +0100 Subject: [PATCH] perf: add sync loop to message processing (#491) --- actors/api.go | 14 +++++++----- actors/pid.go | 26 ++++++++++++++++++---- actors/pid_test.go | 2 +- actors/receive_context.go | 45 ++++++++++++++++++++++++++++++++++++++- 4 files changed, 76 insertions(+), 11 deletions(-) diff --git a/actors/api.go b/actors/api.go index 0b066460..cf3dcce2 100644 --- a/actors/api.go +++ b/actors/api.go @@ -47,7 +47,7 @@ func Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Durat return nil, ErrDead } - receiveContext, err := toReceiveContext(ctx, to, message) + receiveContext, err := toReceiveContext(ctx, to, message, false) if err != nil { return nil, err } @@ -71,7 +71,7 @@ func Tell(ctx context.Context, to *PID, message proto.Message) error { return ErrDead } - receiveContext, err := toReceiveContext(ctx, to, message) + receiveContext, err := toReceiveContext(ctx, to, message, true) if err != nil { return err } @@ -416,15 +416,19 @@ func RemoteSpawn(ctx context.Context, host string, port int, name, actorType str } // toReceiveContext creates a ReceiveContext provided a message and a receiver -func toReceiveContext(ctx context.Context, to *PID, message proto.Message) (*ReceiveContext, error) { +func toReceiveContext(ctx context.Context, to *PID, message proto.Message, async bool) (*ReceiveContext, error) { switch msg := message.(type) { case *internalpb.RemoteMessage: actual, err := msg.GetMessage().UnmarshalNew() if err != nil { return nil, ErrInvalidRemoteMessage(err) } - return newReceiveContext(ctx, NoSender, to, actual).withRemoteSender(address.From(msg.GetSender())), nil + receiveContext := contextFromPool() + receiveContext.build(ctx, NoSender, to, actual, async) + return receiveContext.withRemoteSender(address.From(msg.GetSender())), nil default: - return newReceiveContext(ctx, NoSender, to, message).withRemoteSender(address.From(address.NoSender)), nil + receiveContext := contextFromPool() + receiveContext.build(ctx, NoSender, to, message, async) + return receiveContext.withRemoteSender(address.From(address.NoSender)), nil } } diff --git a/actors/pid.go b/actors/pid.go index 41d9580f..2a04568d 100644 --- a/actors/pid.go +++ b/actors/pid.go @@ -239,7 +239,9 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ... go pid.passivationLoop() } - pid.doReceive(newReceiveContext(ctx, NoSender, pid, new(goaktpb.PostStart))) + receiveContext := contextFromPool() + receiveContext.build(ctx, NoSender, pid, new(goaktpb.PostStart), true) + pid.doReceive(receiveContext) return pid, nil } @@ -545,7 +547,9 @@ func (pid *PID) Ask(ctx context.Context, to *PID, message proto.Message) (respon return nil, ErrDead } - receiveContext := newReceiveContext(ctx, pid, to, message) + receiveContext := contextFromPool() + receiveContext.build(ctx, pid, to, message, false) + to.doReceive(receiveContext) timeout := pid.askTimeout.Load() @@ -564,7 +568,11 @@ func (pid *PID) Tell(ctx context.Context, to *PID, message proto.Message) error if !to.IsRunning() { return ErrDead } - to.doReceive(newReceiveContext(ctx, pid, to, message)) + + receiveContext := contextFromPool() + receiveContext.build(ctx, pid, to, message, true) + + to.doReceive(receiveContext) return nil } @@ -1063,9 +1071,15 @@ func (pid *PID) receiveLoop() { case <-pid.receiveStopSignal: return case <-pid.receiveSignal: + var received *ReceiveContext + if received != nil { + returnToPool(received) + received = nil + } + // Process all messages in the queue one by one for { - received := pid.mailbox.Dequeue() + received = pid.mailbox.Dequeue() if received == nil { // If no more messages, stop processing pid.processingMessages.Store(int32(idle)) @@ -1469,6 +1483,10 @@ func (pid *PID) supervise(cid *PID, watcher *watcher) { pid.logger.Debugf("stop watching cid=(%s)", cid.ID()) return case err := <-watcher.ErrChan: + // skip dead error + if errors.Is(err, ErrDead) { + return + } pid.logger.Errorf("child actor=(%s) is failing: Err=%v", cid.ID(), err) switch directive := pid.supervisorDirective.(type) { case *StopDirective: diff --git a/actors/pid_test.go b/actors/pid_test.go index 9cb44eff..7f9ea077 100644 --- a/actors/pid_test.go +++ b/actors/pid_test.go @@ -2181,7 +2181,7 @@ func TestPipeTo(t *testing.T) { wg.Wait() - lib.Pause(time.Second) + lib.Pause(2 * time.Second) require.EqualValues(t, 3, pid1.ProcessedCount()-1) require.Zero(t, pid2.ProcessedCount()) diff --git a/actors/receive_context.go b/actors/receive_context.go index 8bce9cf5..0423121b 100644 --- a/actors/receive_context.go +++ b/actors/receive_context.go @@ -26,6 +26,7 @@ package actors import ( "context" + "sync" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" @@ -35,6 +36,24 @@ import ( "github.com/tochemey/goakt/v2/goaktpb" ) +// pool holds a pool of ReceiveContext +var pool = sync.Pool{ + New: func() interface{} { + return new(ReceiveContext) + }, +} + +// contextFromPool retrieves a message from the pool +func contextFromPool() *ReceiveContext { + return pool.Get().(*ReceiveContext) +} + +// returnToPool sends the message context back to the pool +func returnToPool(receiveContext *ReceiveContext) { + receiveContext.reset() + pool.Put(receiveContext) +} + // ReceiveContext is the context that is used by the actor to receive messages type ReceiveContext struct { ctx context.Context @@ -58,6 +77,29 @@ func newReceiveContext(ctx context.Context, from, to *PID, message proto.Message } } +// build sets the necessary fields of ReceiveContext +func (c *ReceiveContext) build(ctx context.Context, from, to *PID, message proto.Message, async bool) *ReceiveContext { + c.ctx = ctx + c.sender = from + c.self = to + c.message = message + + if async { + return c + } + c.response = make(chan proto.Message, 1) + return c +} + +// reset resets the fields of ReceiveContext +func (c *ReceiveContext) reset() { + var pid *PID + c.message = nil + c.self = pid + c.sender = pid + c.err = nil +} + // withRemoteSender set the remote sender for a given context func (c *ReceiveContext) withRemoteSender(remoteSender *address.Address) *ReceiveContext { c.remoteSender = remoteSender @@ -333,7 +375,8 @@ func (c *ReceiveContext) Forward(to *PID) { if to.IsRunning() { ctx := context.WithoutCancel(c.ctx) - receiveContext := newReceiveContext(ctx, sender, to, message) + receiveContext := contextFromPool() + receiveContext.build(ctx, sender, to, message, true) to.doReceive(receiveContext) } }