diff --git a/actors/pid.go b/actors/pid.go index 95078f65..e32bba27 100644 --- a/actors/pid.go +++ b/actors/pid.go @@ -549,8 +549,7 @@ func (x *pid) SpawnChild(ctx context.Context, name string, actor Actor) (PID, er return cid, nil } - x.rwLocker.Lock() - defer x.rwLocker.Unlock() + x.rwLocker.RLock() // create the child actor options child inherit parent's options opts := []pidOption{ @@ -585,16 +584,20 @@ func (x *pid) SpawnChild(ctx context.Context, name string, actor Actor) (PID, er if err != nil { span.SetStatus(codes.Error, "SpawnChild") span.RecordError(err) + x.rwLocker.RUnlock() return nil, err } x.children.set(cid) - x.Watch(cid) + eventsStream := x.eventsStream + x.rwLocker.RUnlock() span.SetStatus(codes.Ok, "SpawnChild") - if x.eventsStream != nil { - x.eventsStream.Publish(eventsTopic, &goaktpb.ActorChildCreated{ + x.Watch(cid) + + if eventsStream != nil { + eventsStream.Publish(eventsTopic, &goaktpb.ActorChildCreated{ Address: cid.ActorPath().RemoteAddress(), CreatedAt: timestamppb.Now(), Parent: x.ActorPath().RemoteAddress(), diff --git a/actors/pid_test.go b/actors/pid_test.go index b416cf77..a992e6b9 100644 --- a/actors/pid_test.go +++ b/actors/pid_test.go @@ -42,6 +42,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/tochemey/goakt/v2/goaktpb" + "github.com/tochemey/goakt/v2/internal/eventstream" "github.com/tochemey/goakt/v2/log" "github.com/tochemey/goakt/v2/telemetry" "github.com/tochemey/goakt/v2/test/data/testpb" @@ -1024,6 +1025,58 @@ func TestSpawnChild(t *testing.T) { err = parent.Shutdown(ctx) assert.NoError(t, err) }) + t.Run("With child created event published", func(t *testing.T) { + // create a test context + ctx := context.TODO() + // create the actor path + actorPath := NewPath("Parent", NewAddress("sys", "host", 1)) + + eventsStream := eventstream.New() + + // add a subscriber + subsriber := eventsStream.AddSubscriber() + eventsStream.Subscribe(subsriber, eventsTopic) + + // create the parent actor + parent, err := newPID(ctx, actorPath, + newSupervisor(), + withInitMaxRetries(1), + withCustomLogger(log.DiscardLogger), + withEventsStream(eventsStream), + withAskTimeout(replyTimeout)) + + require.NoError(t, err) + assert.NotNil(t, parent) + + // create the child actor + child, err := parent.SpawnChild(ctx, "SpawnChild", newSupervised()) + assert.NoError(t, err) + assert.NotNil(t, child) + + assert.Len(t, parent.Children(), 1) + + time.Sleep(time.Second) + + var events []*goaktpb.ActorChildCreated + for message := range subsriber.Iterator() { + // get the event payload + payload := message.Payload() + switch msg := payload.(type) { + case *goaktpb.ActorChildCreated: + events = append(events, msg) + } + } + + require.NotEmpty(t, events) + require.Len(t, events, 1) + + event := events[0] + assert.True(t, proto.Equal(parent.ActorPath().RemoteAddress(), event.GetParent())) + + //stop the actor + err = parent.Shutdown(ctx) + assert.NoError(t, err) + }) } func TestPoisonPill(t *testing.T) { ctx := context.TODO()