Skip to content

Commit

Permalink
perf: add sync loop to message processing (#491)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Oct 12, 2024
1 parent b888355 commit 0bd7756
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 11 deletions.
14 changes: 9 additions & 5 deletions actors/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func Ask(ctx context.Context, to *PID, message proto.Message, timeout time.Durat
return nil, ErrDead
}

receiveContext, err := toReceiveContext(ctx, to, message)
receiveContext, err := toReceiveContext(ctx, to, message, false)
if err != nil {
return nil, err
}
Expand All @@ -71,7 +71,7 @@ func Tell(ctx context.Context, to *PID, message proto.Message) error {
return ErrDead
}

receiveContext, err := toReceiveContext(ctx, to, message)
receiveContext, err := toReceiveContext(ctx, to, message, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -416,15 +416,19 @@ func RemoteSpawn(ctx context.Context, host string, port int, name, actorType str
}

// toReceiveContext creates a ReceiveContext provided a message and a receiver
func toReceiveContext(ctx context.Context, to *PID, message proto.Message) (*ReceiveContext, error) {
func toReceiveContext(ctx context.Context, to *PID, message proto.Message, async bool) (*ReceiveContext, error) {
switch msg := message.(type) {
case *internalpb.RemoteMessage:
actual, err := msg.GetMessage().UnmarshalNew()
if err != nil {
return nil, ErrInvalidRemoteMessage(err)
}
return newReceiveContext(ctx, NoSender, to, actual).withRemoteSender(address.From(msg.GetSender())), nil
receiveContext := contextFromPool()
receiveContext.build(ctx, NoSender, to, actual, async)
return receiveContext.withRemoteSender(address.From(msg.GetSender())), nil
default:
return newReceiveContext(ctx, NoSender, to, message).withRemoteSender(address.From(address.NoSender)), nil
receiveContext := contextFromPool()
receiveContext.build(ctx, NoSender, to, message, async)
return receiveContext.withRemoteSender(address.From(address.NoSender)), nil
}
}
26 changes: 22 additions & 4 deletions actors/pid.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,9 @@ func newPID(ctx context.Context, address *address.Address, actor Actor, opts ...
go pid.passivationLoop()
}

pid.doReceive(newReceiveContext(ctx, NoSender, pid, new(goaktpb.PostStart)))
receiveContext := contextFromPool()
receiveContext.build(ctx, NoSender, pid, new(goaktpb.PostStart), true)
pid.doReceive(receiveContext)

return pid, nil
}
Expand Down Expand Up @@ -545,7 +547,9 @@ func (pid *PID) Ask(ctx context.Context, to *PID, message proto.Message) (respon
return nil, ErrDead
}

receiveContext := newReceiveContext(ctx, pid, to, message)
receiveContext := contextFromPool()
receiveContext.build(ctx, pid, to, message, false)

to.doReceive(receiveContext)
timeout := pid.askTimeout.Load()

Expand All @@ -564,7 +568,11 @@ func (pid *PID) Tell(ctx context.Context, to *PID, message proto.Message) error
if !to.IsRunning() {
return ErrDead
}
to.doReceive(newReceiveContext(ctx, pid, to, message))

receiveContext := contextFromPool()
receiveContext.build(ctx, pid, to, message, true)

to.doReceive(receiveContext)
return nil
}

Expand Down Expand Up @@ -1063,9 +1071,15 @@ func (pid *PID) receiveLoop() {
case <-pid.receiveStopSignal:
return
case <-pid.receiveSignal:
var received *ReceiveContext
if received != nil {
returnToPool(received)
received = nil
}

// Process all messages in the queue one by one
for {
received := pid.mailbox.Dequeue()
received = pid.mailbox.Dequeue()
if received == nil {
// If no more messages, stop processing
pid.processingMessages.Store(int32(idle))
Expand Down Expand Up @@ -1469,6 +1483,10 @@ func (pid *PID) supervise(cid *PID, watcher *watcher) {
pid.logger.Debugf("stop watching cid=(%s)", cid.ID())
return
case err := <-watcher.ErrChan:
// skip dead error
if errors.Is(err, ErrDead) {
return
}
pid.logger.Errorf("child actor=(%s) is failing: Err=%v", cid.ID(), err)
switch directive := pid.supervisorDirective.(type) {
case *StopDirective:
Expand Down
2 changes: 1 addition & 1 deletion actors/pid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2181,7 +2181,7 @@ func TestPipeTo(t *testing.T) {

wg.Wait()

lib.Pause(time.Second)
lib.Pause(2 * time.Second)

require.EqualValues(t, 3, pid1.ProcessedCount()-1)
require.Zero(t, pid2.ProcessedCount())
Expand Down
45 changes: 44 additions & 1 deletion actors/receive_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package actors

import (
"context"
"sync"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
Expand All @@ -35,6 +36,24 @@ import (
"github.com/tochemey/goakt/v2/goaktpb"
)

// pool holds a pool of ReceiveContext
var pool = sync.Pool{
New: func() interface{} {
return new(ReceiveContext)
},
}

// contextFromPool retrieves a message from the pool
func contextFromPool() *ReceiveContext {
return pool.Get().(*ReceiveContext)
}

// returnToPool sends the message context back to the pool
func returnToPool(receiveContext *ReceiveContext) {
receiveContext.reset()
pool.Put(receiveContext)
}

// ReceiveContext is the context that is used by the actor to receive messages
type ReceiveContext struct {
ctx context.Context
Expand All @@ -58,6 +77,29 @@ func newReceiveContext(ctx context.Context, from, to *PID, message proto.Message
}
}

// build sets the necessary fields of ReceiveContext
func (c *ReceiveContext) build(ctx context.Context, from, to *PID, message proto.Message, async bool) *ReceiveContext {
c.ctx = ctx
c.sender = from
c.self = to
c.message = message

if async {
return c
}
c.response = make(chan proto.Message, 1)
return c
}

// reset resets the fields of ReceiveContext
func (c *ReceiveContext) reset() {
var pid *PID
c.message = nil
c.self = pid
c.sender = pid
c.err = nil
}

// withRemoteSender set the remote sender for a given context
func (c *ReceiveContext) withRemoteSender(remoteSender *address.Address) *ReceiveContext {
c.remoteSender = remoteSender
Expand Down Expand Up @@ -333,7 +375,8 @@ func (c *ReceiveContext) Forward(to *PID) {

if to.IsRunning() {
ctx := context.WithoutCancel(c.ctx)
receiveContext := newReceiveContext(ctx, sender, to, message)
receiveContext := contextFromPool()
receiveContext.build(ctx, sender, to, message, true)
to.doReceive(receiveContext)
}
}
Expand Down

0 comments on commit 0bd7756

Please sign in to comment.