Skip to content

Commit

Permalink
refactor: remove unecessary comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Apr 2, 2024
1 parent 85ed0f0 commit 396f843
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 78 deletions.
32 changes: 4 additions & 28 deletions actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,46 +80,36 @@ func newActor[T State](behavior EntityBehavior[T], eventsStore eventstore.Events
// PreStart pre-starts the actor
// At this stage we connect to the various stores
func (entity *actor[T]) PreStart(ctx context.Context) error {
// add a span context
spanCtx, span := telemetry.SpanContext(ctx, "PreStart")
defer span.End()
// acquire the lock
entity.mu.Lock()
// release lock when done
defer entity.mu.Unlock()

// connect to the various stores
if entity.eventsStore == nil {
return errors.New("events store is not defined")
}

// call the connect method of the journal store
if err := entity.eventsStore.Ping(spanCtx); err != nil {
return fmt.Errorf("failed to connect to the events store: %v", err)
}

// check whether there is a snapshot to recover from
if err := entity.recoverFromSnapshot(spanCtx); err != nil {
return errors.Wrap(err, "failed to recover from snapshot")
}
return nil
}

// Receive processes any message dropped into the actor mailbox.
func (entity *actor[T]) Receive(ctx actors.ReceiveContext) {
// add a span context
_, span := telemetry.SpanContext(ctx.Context(), "Receive")
defer span.End()

// acquire the lock
entity.mu.Lock()
// release lock when done
defer entity.mu.Unlock()

// grab the command sent
switch command := ctx.Message().(type) {
case *goaktpb.PostStart:
// TODO: handle this properly
if err := entity.recoverFromSnapshot(ctx.Context()); err != nil {
ctx.Err(errors.Wrap(err, "failed to recover from snapshot"))
}
case *goaktpb.PostStop:
// TODO: handle this properly. Remove this from Go-Akt
case *goaktpb.PreStart:
Expand All @@ -133,13 +123,10 @@ func (entity *actor[T]) Receive(ctx actors.ReceiveContext) {

// PostStop prepares the actor to gracefully shutdown
func (entity *actor[T]) PostStop(ctx context.Context) error {
// add a span context
_, span := telemetry.SpanContext(ctx, "PostStop")
defer span.End()

// acquire the lock
entity.mu.Lock()
// release lock when done
defer entity.mu.Unlock()

return nil
Expand All @@ -148,61 +135,51 @@ func (entity *actor[T]) PostStop(ctx context.Context) error {
// recoverFromSnapshot reset the persistent actor to the latest snapshot in case there is one
// this is vital when the entity actor is restarting.
func (entity *actor[T]) recoverFromSnapshot(ctx context.Context) error {
// add a span context
spanCtx, span := telemetry.SpanContext(ctx, "RecoverFromSnapshot")
defer span.End()

// check whether there is a snapshot to recover from
event, err := entity.eventsStore.GetLatestEvent(spanCtx, entity.ID())
// handle the error
if err != nil {
return errors.Wrap(err, "failed to recover the latest journal")
}

// we do have the latest state just recover from it
if event != nil {
// set the current state
currentState := entity.InitialState()
if err := event.GetResultingState().UnmarshalTo(currentState); err != nil {
return errors.Wrap(err, "failed unmarshal the latest state")
}
entity.currentState = currentState

// set the event counter
entity.eventsCounter.Store(event.GetSequenceNumber())
return nil
}

// in case there is no snapshot
entity.currentState = entity.InitialState()
return nil
}

// sendErrorReply sends an error as a reply message
func (entity *actor[T]) sendErrorReply(ctx actors.ReceiveContext, err error) {
// create a new error reply
reply := &egopb.CommandReply{
Reply: &egopb.CommandReply_ErrorReply{
ErrorReply: &egopb.ErrorReply{
Message: err.Error(),
},
},
}
// send the response

ctx.Response(reply)
}

// getStateAndReply returns the current state of the entity
func (entity *actor[T]) getStateAndReply(ctx actors.ReceiveContext) {
// let us fetch the latest journal
latestEvent, err := entity.eventsStore.GetLatestEvent(ctx.Context(), entity.ID())
// handle the error
if err != nil {
entity.sendErrorReply(ctx, err)
return
}

// reply with the state unmarshalled
resultingState := latestEvent.GetResultingState()
reply := &egopb.CommandReply{
Reply: &egopb.CommandReply_StateReply{
Expand All @@ -215,7 +192,6 @@ func (entity *actor[T]) getStateAndReply(ctx actors.ReceiveContext) {
},
}

// send the response
ctx.Response(reply)
}

Expand Down
37 changes: 10 additions & 27 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ type Engine struct {

// NewEngine creates an instance of Engine
func NewEngine(name string, eventsStore eventstore.EventsStore, opts ...Option) *Engine {
// create an instance of ego
e := &Engine{
name: name,
eventsStore: eventsStore,
Expand All @@ -70,7 +69,7 @@ func NewEngine(name string, eventsStore eventstore.EventsStore, opts ...Option)
telemetry: telemetry.New(),
eventStream: eventstream.New(),
}
// apply the various options

for _, opt := range opts {
opt.Apply(e)
}
Expand All @@ -81,7 +80,6 @@ func NewEngine(name string, eventsStore eventstore.EventsStore, opts ...Option)

// Start starts the ego engine
func (x *Engine) Start(ctx context.Context) error {
// create a variable to hold the options
opts := []actors.Option{
actors.WithLogger(x.logger),
actors.WithPassivationDisabled(),
Expand All @@ -90,56 +88,49 @@ func (x *Engine) Start(ctx context.Context) error {
actors.WithTelemetry(x.telemetry),
actors.WithSupervisorStrategy(actors.StopDirective),
}
// set the remaining options

if x.enableCluster.Load() {
opts = append(opts, actors.WithClustering(
discovery.NewServiceDiscovery(x.discoveryProvider, x.discoveryConfig),
x.partitionsCount))
}

var err error
// create the actor system that will empower the entities
x.actorSystem, err = actors.NewActorSystem(x.name, opts...)
// handle the error
if err != nil {
// log the error
x.logger.Error(errors.Wrap(err, "failed to create the ego actor system"))
return err
}
// start the actor system

if err := x.actorSystem.Start(ctx); err != nil {
return err
}
// set the started to true

x.started.Store(true)

return nil
}

// AddProjection add a projection to the running eGo engine and start it
func (x *Engine) AddProjection(ctx context.Context, name string, handler projection.Handler, offsetStore offsetstore.OffsetStore, opts ...projection.Option) error {
// add a span context
spanCtx, span := egotel.SpanContext(ctx, "AddProjection")
defer span.End()

// first check whether the ego engine has started or not
if !x.started.Load() {
return errors.New("eGo engine has not started")
}
// create the projection actor

actor := projection.New(name, handler, x.eventsStore, offsetStore, opts...)
// define variables to hold projection actor ref and error

var pid actors.PID
var err error
// spawn the actor

if pid, err = x.actorSystem.Spawn(spanCtx, name, actor); err != nil {
// add some error logging
x.logger.Error(errors.Wrapf(err, "failed to register the projection=(%s)", name))
return err
}
// start the projection

if err := actors.Tell(spanCtx, pid, projection.Start); err != nil {
// add some error logging
x.logger.Error(errors.Wrapf(err, "failed to start the projection=(%s)", name))
return err
}
Expand All @@ -149,33 +140,25 @@ func (x *Engine) AddProjection(ctx context.Context, name string, handler project

// Stop stops the ego engine
func (x *Engine) Stop(ctx context.Context) error {
// set the started to false
x.started.Store(false)
// close the event stream
x.eventStream.Close()
// stop the actor system and return the possible error
return x.actorSystem.Stop(ctx)
}

// Subscribe creates an events subscriber
func (x *Engine) Subscribe(ctx context.Context) (eventstream.Subscriber, error) {
// add a span context
_, span := egotel.SpanContext(ctx, "Subscribe")
defer span.End()

// first check whether the ego engine has started or not
if !x.started.Load() {
return nil, errors.New("eGo engine has not started")
}
// create the subscriber

subscriber := x.eventStream.AddSubscriber()
// subscribe to all the topics
for i := 0; i < int(x.partitionsCount); i++ {
// create the topic
topic := fmt.Sprintf(eventsTopic, i)
// subscribe to the topic
x.eventStream.Subscribe(subscriber, topic)
}
// return the subscriber

return subscriber, nil
}
27 changes: 7 additions & 20 deletions entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,15 @@ type Entity[T State] struct {

// NewEntity creates an instance of Entity
func NewEntity[T State](ctx context.Context, behavior EntityBehavior[T], engine *Engine) (*Entity[T], error) {
// check whether the ego engine is defined
if engine == nil {
return nil, ErrEngineRequired
}
// check whether the eGo engine has started or not

if !engine.started.Load() {
return nil, ErrEngineNotStarted
}

// create the instance of the actor
pid, err := engine.actorSystem.Spawn(ctx, behavior.ID(), newActor(behavior, engine.eventsStore, engine.eventStream))
// return the error in case there is one
if err != nil {
return nil, err
}
Expand All @@ -77,30 +74,23 @@ func NewEntity[T State](ctx context.Context, behavior EntityBehavior[T], engine
// 2. nil when there is no resulting state or no event persisted
// 3. an error in case of error
func (x Entity[T]) SendCommand(ctx context.Context, command Command) (resultingState T, revision uint64, err error) {
// define a nil state
var nilT T
var nilOfT T

// check whether the underlying actor is set and running
if x.actor == nil || !x.actor.IsRunning() {
return nilT, 0, ErrUndefinedEntity
return nilOfT, 0, ErrUndefinedEntity
}

// send the command to the actor
reply, err := actors.Ask(ctx, x.actor, command, time.Second)
// handle the error
if err != nil {
return nilT, 0, err
return nilOfT, 0, err
}

// cast the reply to a command reply because that is the expected return type
commandReply, ok := reply.(*egopb.CommandReply)
// when casting is successful
if ok {
// parse the command reply and return the appropriate responses
return parseCommandReply[T](commandReply)
}
// casting failed
return nilT, 0, errors.New("failed to parse command reply")

return nilOfT, 0, errors.New("failed to parse command reply")
}

// parseCommandReply parses the command reply
Expand All @@ -109,17 +99,14 @@ func parseCommandReply[T State](reply *egopb.CommandReply) (T, uint64, error) {
state T
err error
)
// parse the command reply

switch r := reply.GetReply().(type) {
case *egopb.CommandReply_StateReply:
// unmarshal the state
msg, err := r.StateReply.GetState().UnmarshalNew()
// return the error in case there is one
if err != nil {
return state, 0, err
}

// unpack the state properly
switch v := msg.(type) {
case T:
return v, r.StateReply.GetSequenceNumber(), nil
Expand Down
2 changes: 2 additions & 0 deletions projection/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func (x *Projection) Receive(ctx actors.ReceiveContext) {
switch ctx.Message().(type) {
case *goaktpb.PostStart:
x.runner.Run(ctx.Context())
default:
ctx.Unhandled()
}
}

Expand Down
3 changes: 0 additions & 3 deletions projection/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,9 @@ func NewDiscardHandler(logger log.Logger) *DiscardHandler {

// Handle handles the events consumed
func (x *DiscardHandler) Handle(_ context.Context, persistenceID string, event *anypb.Any, state *anypb.Any, revision uint64) error {
// add some logging information
x.logger.Debugf("handling event=(%s) revision=(%d) with resulting state=(%s) of persistenceId=(%s)",
event.GetTypeUrl(), revision, state.GetTypeUrl(), persistenceID)
// increase the counter
x.eventsCounter.Inc()
// return successful process
return nil
}

Expand Down

0 comments on commit 396f843

Please sign in to comment.