From 8959214f068ff41972680272952c5ef91a7fe5b1 Mon Sep 17 00:00:00 2001 From: Arsene Date: Mon, 16 Dec 2024 15:19:44 +0000 Subject: [PATCH] feat: add durable state actor (#100) --- Earthfile | 5 +- behavior.go | 35 +- durable_state_actor.go | 233 +++++++++++ durable_state_actor_test.go | 384 ++++++++++++++++++ egopb/ego.pb.go | 153 ++++++- engine.go | 79 +++- engine_test.go | 256 +++++++++++- actor.go => event_sourced_actor.go | 54 ++- actor_test.go => event_sourced_actor_test.go | 36 +- example/main.go | 6 +- go.mod | 4 +- helper_test.go | 69 +++- internal/errorschain/errorschain.go | 88 ++++ internal/errorschain/errorschain_test.go | 53 +++ .../events_store.go | 2 +- mocks/persistence/state_store.go | 261 ++++++++++++ offsetstore/{iface.go => offset_store.go} | 0 option.go | 9 + .../iface.go => persistence/events_store.go | 4 +- persistence/state_store.go | 45 ++ .../eventstore}/memory/memory.go | 4 +- .../eventstore}/memory/memory_test.go | 4 +- .../eventstore}/memory/schemas.go | 0 .../eventstore}/postgres/config.go | 0 .../eventstore}/postgres/helper_test.go | 0 .../eventstore}/postgres/postgres.go | 4 +- .../eventstore}/postgres/postgres_test.go | 4 +- .../eventstore}/postgres/row.go | 0 .../eventstore}/postgres/schema_utils.go | 0 .../eventstore}/postgres/schema_utils_test.go | 0 plugins/statestore/memory/memory.go | 110 +++++ plugins/statestore/postgres/config.go | 35 ++ plugins/statestore/postgres/postgres.go | 218 ++++++++++ plugins/statestore/postgres/row.go | 82 ++++ plugins/statestore/postgres/schema_utils.go | 67 +++ projection/projection.go | 4 +- projection/projection_test.go | 2 +- projection/runner.go | 6 +- projection/runner_test.go | 4 +- protos/ego/v3/ego.proto | 15 + readme.md | 287 +++++++------ resources/durablestore_postgres.sql | 35 ++ 42 files changed, 2381 insertions(+), 276 deletions(-) create mode 100644 durable_state_actor.go create mode 100644 durable_state_actor_test.go rename actor.go => event_sourced_actor.go (81%) rename actor_test.go => event_sourced_actor_test.go (94%) create mode 100644 internal/errorschain/errorschain.go create mode 100644 internal/errorschain/errorschain_test.go rename mocks/{eventstore => persistence}/events_store.go (99%) create mode 100644 mocks/persistence/state_store.go rename offsetstore/{iface.go => offset_store.go} (100%) rename eventstore/iface.go => persistence/events_store.go (97%) create mode 100644 persistence/state_store.go rename {eventstore => plugins/eventstore}/memory/memory.go (99%) rename {eventstore => plugins/eventstore}/memory/memory_test.go (98%) rename {eventstore => plugins/eventstore}/memory/schemas.go (100%) rename {eventstore => plugins/eventstore}/postgres/config.go (100%) rename {eventstore => plugins/eventstore}/postgres/helper_test.go (100%) rename {eventstore => plugins/eventstore}/postgres/postgres.go (99%) rename {eventstore => plugins/eventstore}/postgres/postgres_test.go (99%) rename {eventstore => plugins/eventstore}/postgres/row.go (100%) rename {eventstore => plugins/eventstore}/postgres/schema_utils.go (100%) rename {eventstore => plugins/eventstore}/postgres/schema_utils_test.go (100%) create mode 100644 plugins/statestore/memory/memory.go create mode 100644 plugins/statestore/postgres/config.go create mode 100644 plugins/statestore/postgres/postgres.go create mode 100644 plugins/statestore/postgres/row.go create mode 100644 plugins/statestore/postgres/schema_utils.go create mode 100644 resources/durablestore_postgres.sql diff --git a/Earthfile b/Earthfile index 68517a3..ebf27a9 100644 --- a/Earthfile +++ b/Earthfile @@ -82,7 +82,7 @@ local-test: FROM +vendor WITH DOCKER --pull postgres:11 - RUN go-acc ./... -o coverage.out --ignore egopb,test,example,mocks -- -mod=vendor -race -v + RUN go-acc ./... -o coverage.out --ignore egopb,test,example,mocks -- -mod=vendor -timeout 0 -race -v END SAVE ARTIFACT coverage.out AS LOCAL coverage.out @@ -92,7 +92,8 @@ mock: FROM +code # generate the mocks - RUN mockery --dir eventstore --name EventsStore --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/eventstore --case snake + RUN mockery --dir persistence --all --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/persistence --case snake RUN mockery --dir offsetstore --name OffsetStore --keeptree --exported=true --with-expecter=true --inpackage=true --disable-version-string=true --output ./mocks/offsetstore --case snake + SAVE ARTIFACT ./mocks mocks AS LOCAL mocks \ No newline at end of file diff --git a/behavior.go b/behavior.go index 30fd1a7..9e4513f 100644 --- a/behavior.go +++ b/behavior.go @@ -34,8 +34,8 @@ type Command proto.Message type Event proto.Message type State proto.Message -// EntityBehavior defines an event sourced behavior when modeling a CQRS EntityBehavior. -type EntityBehavior interface { +// EventSourcedBehavior defines an event sourced behavior when modeling a CQRS EventSourcedBehavior. +type EventSourcedBehavior interface { // ID defines the id that will be used in the event journal. // This helps track the entity in the events store. ID() string @@ -46,7 +46,7 @@ type EntityBehavior interface { // which validations must be applied, and finally, which events will be persisted if any. When there is no event to be persisted a nil can // be returned as a no-op. Command handlers are the meat of the event sourced actor. // They encode the business rules of your event sourced actor and act as a guardian of the event sourced actor consistency. - // The command eventSourcedHandler must first validate that the incoming command can be applied to the current model state. + // The command must first validate that the incoming command can be applied to the current model state. // Any decision should be solely based on the data passed in the commands and the state of the Behavior. // In case of successful validation, one or more events expressing the mutations are persisted. // Once the events are persisted, they are applied to the state producing a new valid state. @@ -58,3 +58,32 @@ type EntityBehavior interface { // Event handlers must be pure functions as they will be used when instantiating the event sourced actor and replaying the event journal. HandleEvent(ctx context.Context, event Event, priorState State) (state State, err error) } + +// DurableStateBehavior represents a type of Actor that persists its full state after processing each command instead of using event sourcing. +// This type of Actor keeps its current state in memory during command handling and based upon the command response +// persists its full state into a durable store. The store can be a SQL or NoSQL database. +// The whole concept is given the current state of the actor and a command produce a new state with a higher version as shown in this diagram: (State, Command) => State +// DurableStateBehavior reacts to commands which result in a new version of the actor state. Only the latest version of the actor state is +// persisted to the durable store. There is no concept of history regarding the actor state since this is not an event sourced actor. +// However, one can rely on the version number of the actor state and exactly know how the actor state has evolved overtime. +// State actor version number are numerically incremented by the command handler which means it is imperative that the newer version of the state is greater than the current version by one. +// +// DurableStateBehavior will attempt to recover its state whenever available from the durable state. +// During a normal shutdown process, it will persist its current state to the durable store prior to shutting down. +// This behavior help maintain some consistency across the actor state evolution. +type DurableStateBehavior interface { + // ID defines the id that will be used in the event journal. + // This helps track the entity in the events store. + ID() string + // InitialState returns the durable state actor initial state. + // This is set as the initial state when there are no snapshots found the entity + InitialState() State + // HandleCommand processes every command sent to the DurableStateBehavior. One needs to use the command, the priorVersion and the priorState sent to produce a newState and newVersion. + // This defines how to handle each incoming command, which validations must be applied, and finally, whether a resulting state will be persisted depending upon the response. + // They encode the business rules of your durable state actor and act as a guardian of the actor consistency. + // The command handler must first validate that the incoming command can be applied to the current model state. + // Any decision should be solely based on the data passed in the command, the priorVersion and the priorState. + // In case of successful validation and processing , the new state will be stored in the durable store depending upon response. + // The actor state will be updated with the newState only if the newVersion is 1 more than the already existing state. + HandleCommand(ctx context.Context, command Command, priorVersion uint64, priorState State) (newState State, newVersion uint64, err error) +} diff --git a/durable_state_actor.go b/durable_state_actor.go new file mode 100644 index 0000000..dd5a57e --- /dev/null +++ b/durable_state_actor.go @@ -0,0 +1,233 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package ego + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/tochemey/goakt/v2/actors" + "github.com/tochemey/goakt/v2/goaktpb" + "golang.org/x/sync/errgroup" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/tochemey/ego/v3/egopb" + "github.com/tochemey/ego/v3/eventstream" + "github.com/tochemey/ego/v3/internal/errorschain" + "github.com/tochemey/ego/v3/persistence" +) + +var ( + statesTopic = "topic.states.%d" +) + +// durableStateActor is a durable state based actor +type durableStateActor struct { + DurableStateBehavior + stateStore persistence.StateStore + currentState State + currentVersion uint64 + lastCommandTime time.Time + eventsStream eventstream.Stream + actorSystem actors.ActorSystem +} + +// implements the actors.Actor interface +var _ actors.Actor = (*durableStateActor)(nil) + +// newDurableStateActor creates an instance of actor provided the DurableStateBehavior +func newDurableStateActor(behavior DurableStateBehavior, stateStore persistence.StateStore, eventsStream eventstream.Stream) *durableStateActor { + return &durableStateActor{ + stateStore: stateStore, + eventsStream: eventsStream, + DurableStateBehavior: behavior, + } +} + +// PreStart pre-starts the actor +func (entity *durableStateActor) PreStart(ctx context.Context) error { + return errorschain. + New(errorschain.ReturnFirst()). + AddError(entity.durableStateRequired()). + AddError(entity.stateStore.Ping(ctx)). + AddError(entity.recoverFromStore(ctx)). + Error() +} + +// Receive processes any message dropped into the actor mailbox. +func (entity *durableStateActor) Receive(ctx *actors.ReceiveContext) { + switch command := ctx.Message().(type) { + case *goaktpb.PostStart: + entity.actorSystem = ctx.ActorSystem() + case *egopb.GetStateCommand: + entity.sendStateReply(ctx) + default: + entity.processCommand(ctx, command) + } +} + +// PostStop prepares the actor to gracefully shutdown +func (entity *durableStateActor) PostStop(ctx context.Context) error { + return errorschain. + New(errorschain.ReturnFirst()). + AddError(entity.stateStore.Ping(ctx)). + AddError(entity.persistStateAndPublish(ctx)). + Error() +} + +// recoverFromStore reset the persistent actor to the latest state in case there is one +// this is vital when the entity actor is restarting. +func (entity *durableStateActor) recoverFromStore(ctx context.Context) error { + durableState, err := entity.stateStore.GetLatestState(ctx, entity.ID()) + if err != nil { + return fmt.Errorf("failed unmarshal the latest state: %w", err) + } + + if durableState != nil && proto.Equal(durableState, new(egopb.DurableState)) { + currentState := entity.InitialState() + if err := durableState.GetResultingState().UnmarshalTo(currentState); err != nil { + return fmt.Errorf("failed unmarshal the latest state: %w", err) + } + + entity.currentState = currentState + entity.currentVersion = durableState.GetVersionNumber() + return nil + } + + entity.currentState = entity.InitialState() + return nil +} + +// processCommand processes the incoming command +func (entity *durableStateActor) processCommand(receiveContext *actors.ReceiveContext, command Command) { + ctx := receiveContext.Context() + newState, newVersion, err := entity.HandleCommand(ctx, command, entity.currentVersion, entity.currentState) + if err != nil { + entity.sendErrorReply(receiveContext, err) + return + } + + // check whether the pre-conditions have met + if err := entity.checkPreconditions(newState, newVersion); err != nil { + entity.sendErrorReply(receiveContext, err) + return + } + + // set the current state with the newState + entity.currentState = newState + entity.lastCommandTime = timestamppb.Now().AsTime() + entity.currentVersion = newVersion + + if err := entity.persistStateAndPublish(ctx); err != nil { + entity.sendErrorReply(receiveContext, err) + return + } + + entity.sendStateReply(receiveContext) +} + +// sendStateReply sends a state reply message +func (entity *durableStateActor) sendStateReply(ctx *actors.ReceiveContext) { + state, _ := anypb.New(entity.currentState) + ctx.Response(&egopb.CommandReply{ + Reply: &egopb.CommandReply_StateReply{ + StateReply: &egopb.StateReply{ + PersistenceId: entity.ID(), + State: state, + SequenceNumber: entity.currentVersion, + Timestamp: entity.lastCommandTime.Unix(), + }, + }, + }) +} + +// sendErrorReply sends an error as a reply message +func (entity *durableStateActor) sendErrorReply(ctx *actors.ReceiveContext, err error) { + ctx.Response(&egopb.CommandReply{ + Reply: &egopb.CommandReply_ErrorReply{ + ErrorReply: &egopb.ErrorReply{ + Message: err.Error(), + }, + }, + }) +} + +// checkAndSetPreconditions validates the newState and the newVersion +func (entity *durableStateActor) checkPreconditions(newState State, newVersion uint64) error { + currentState := entity.currentState + currentStateType := currentState.ProtoReflect().Descriptor().FullName() + latestStateType := newState.ProtoReflect().Descriptor().FullName() + if currentStateType != latestStateType { + return fmt.Errorf("mismatch state types: %s != %s", currentStateType, latestStateType) + } + + proceed := int(math.Abs(float64(newVersion-entity.currentVersion))) == 1 + if !proceed { + return fmt.Errorf("%s received version=(%d) while current version is (%d)", + entity.ID(), + newVersion, + entity.currentVersion) + } + return nil +} + +// checks whether the durable state store is set or not +func (entity *durableStateActor) durableStateRequired() error { + if entity.stateStore == nil { + return ErrDurableStateStoreRequired + } + return nil +} + +// persistState persists the actor state +func (entity *durableStateActor) persistStateAndPublish(ctx context.Context) error { + resultingState, _ := anypb.New(entity.currentState) + shardNumber := entity.actorSystem.GetPartition(entity.ID()) + topic := fmt.Sprintf(statesTopic, shardNumber) + + durableState := &egopb.DurableState{ + PersistenceId: entity.ID(), + VersionNumber: entity.currentVersion, + ResultingState: resultingState, + Timestamp: entity.lastCommandTime.Unix(), + Shard: shardNumber, + } + + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + entity.eventsStream.Publish(topic, durableState) + return nil + }) + + eg.Go(func() error { + return entity.stateStore.WriteState(ctx, durableState) + }) + + return eg.Wait() +} diff --git a/durable_state_actor_test.go b/durable_state_actor_test.go new file mode 100644 index 0000000..c6030e2 --- /dev/null +++ b/durable_state_actor_test.go @@ -0,0 +1,384 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package ego + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tochemey/goakt/v2/actors" + "github.com/tochemey/goakt/v2/log" + "google.golang.org/protobuf/proto" + + "github.com/tochemey/ego/v3/egopb" + "github.com/tochemey/ego/v3/eventstream" + "github.com/tochemey/ego/v3/internal/lib" + "github.com/tochemey/ego/v3/internal/postgres" + "github.com/tochemey/ego/v3/plugins/statestore/memory" + pgstore "github.com/tochemey/ego/v3/plugins/statestore/postgres" + testpb "github.com/tochemey/ego/v3/test/data/pb/v3" +) + +func TestDurableStateBehavior(t *testing.T) { + t.Run("with state reply", func(t *testing.T) { + ctx := context.TODO() + // create an actor system + actorSystem, err := actors.NewActorSystem("TestActorSystem", + actors.WithPassivationDisabled(), + actors.WithLogger(log.DiscardLogger), + actors.WithActorInitMaxRetries(3)) + require.NoError(t, err) + assert.NotNil(t, actorSystem) + + // start the actor system + err = actorSystem.Start(ctx) + require.NoError(t, err) + + lib.Pause(time.Second) + + durableStore := memory.NewStateStore() + // create a persistence id + persistenceID := uuid.NewString() + behavior := NewAccountDurableStateBehavior(persistenceID) + err = durableStore.Connect(ctx) + require.NoError(t, err) + + // create an instance of events stream + eventStream := eventstream.New() + + actor := newDurableStateActor(behavior, durableStore, eventStream) + pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor) + require.NotNil(t, pid) + + lib.Pause(time.Second) + + var command proto.Message + + command = &testpb.CreateAccount{AccountBalance: 500.00} + // send the command to the actor + reply, err := actors.Ask(ctx, pid, command, 5*time.Second) + require.NoError(t, err) + require.NotNil(t, reply) + require.IsType(t, new(egopb.CommandReply), reply) + + commandReply := reply.(*egopb.CommandReply) + require.IsType(t, new(egopb.CommandReply_StateReply), commandReply.GetReply()) + + state := commandReply.GetReply().(*egopb.CommandReply_StateReply) + require.EqualValues(t, 1, state.StateReply.GetSequenceNumber()) + + // marshal the resulting state + resultingState := new(testpb.Account) + err = state.StateReply.GetState().UnmarshalTo(resultingState) + require.NoError(t, err) + + expected := &testpb.Account{ + AccountId: persistenceID, + AccountBalance: 500.00, + } + require.True(t, proto.Equal(expected, resultingState)) + + // send another command to credit the balance + command = &testpb.CreditAccount{ + AccountId: persistenceID, + Balance: 250, + } + reply, err = actors.Ask(ctx, pid, command, 5*time.Second) + require.NoError(t, err) + require.NotNil(t, reply) + require.IsType(t, new(egopb.CommandReply), reply) + + commandReply = reply.(*egopb.CommandReply) + require.IsType(t, new(egopb.CommandReply_StateReply), commandReply.GetReply()) + + state = commandReply.GetReply().(*egopb.CommandReply_StateReply) + assert.EqualValues(t, 2, state.StateReply.GetSequenceNumber()) + + // marshal the resulting state + resultingState = new(testpb.Account) + err = state.StateReply.GetState().UnmarshalTo(resultingState) + require.NoError(t, err) + + expected = &testpb.Account{ + AccountId: persistenceID, + AccountBalance: 750.00, + } + require.True(t, proto.Equal(expected, resultingState)) + + // stop the actor system + err = actorSystem.Stop(ctx) + require.NoError(t, err) + + err = durableStore.Disconnect(ctx) + require.NoError(t, err) + + lib.Pause(time.Second) + eventStream.Close() + }) + t.Run("with error reply", func(t *testing.T) { + ctx := context.TODO() + + // create an actor system + actorSystem, err := actors.NewActorSystem("TestActorSystem", + actors.WithPassivationDisabled(), + actors.WithLogger(log.DiscardLogger), + actors.WithActorInitMaxRetries(3)) + require.NoError(t, err) + assert.NotNil(t, actorSystem) + + // start the actor system + err = actorSystem.Start(ctx) + require.NoError(t, err) + + lib.Pause(time.Second) + + durableStore := memory.NewStateStore() + // create a persistence id + persistenceID := uuid.NewString() + // create the persistence behavior + behavior := NewAccountDurableStateBehavior(persistenceID) + + err = durableStore.Connect(ctx) + require.NoError(t, err) + + lib.Pause(time.Second) + + // create an instance of events stream + eventStream := eventstream.New() + + // create the persistence actor using the behavior previously created + persistentActor := newDurableStateActor(behavior, durableStore, eventStream) + // spawn the actor + pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor) + require.NotNil(t, pid) + + lib.Pause(time.Second) + + var command proto.Message + + command = &testpb.CreateAccount{AccountBalance: 500.00} + // send the command to the actor + reply, err := actors.Ask(ctx, pid, command, time.Second) + require.NoError(t, err) + require.NotNil(t, reply) + require.IsType(t, new(egopb.CommandReply), reply) + + commandReply := reply.(*egopb.CommandReply) + require.IsType(t, new(egopb.CommandReply_StateReply), commandReply.GetReply()) + + state := commandReply.GetReply().(*egopb.CommandReply_StateReply) + assert.EqualValues(t, 1, state.StateReply.GetSequenceNumber()) + + // marshal the resulting state + resultingState := new(testpb.Account) + err = state.StateReply.GetState().UnmarshalTo(resultingState) + require.NoError(t, err) + + expected := &testpb.Account{ + AccountId: persistenceID, + AccountBalance: 500.00, + } + assert.True(t, proto.Equal(expected, resultingState)) + + // send another command to credit the balance + command = &testpb.CreditAccount{ + AccountId: "different-id", + Balance: 250, + } + reply, err = actors.Ask(ctx, pid, command, time.Second) + require.NoError(t, err) + require.NotNil(t, reply) + require.IsType(t, new(egopb.CommandReply), reply) + + commandReply = reply.(*egopb.CommandReply) + require.IsType(t, new(egopb.CommandReply_ErrorReply), commandReply.GetReply()) + + errorReply := commandReply.GetReply().(*egopb.CommandReply_ErrorReply) + assert.Equal(t, "command sent to the wrong entity", errorReply.ErrorReply.GetMessage()) + + err = actorSystem.Stop(ctx) + require.NoError(t, err) + + err = durableStore.Disconnect(ctx) + require.NoError(t, err) + + lib.Pause(time.Second) + eventStream.Close() + }) + t.Run("with state recovery from state store", func(t *testing.T) { + ctx := context.TODO() + actorSystem, err := actors.NewActorSystem("TestActorSystem", + actors.WithPassivationDisabled(), + actors.WithLogger(log.DiscardLogger), + actors.WithActorInitMaxRetries(3), + ) + require.NoError(t, err) + assert.NotNil(t, actorSystem) + + // start the actor system + err = actorSystem.Start(ctx) + require.NoError(t, err) + + lib.Pause(time.Second) + + var ( + testDatabase = "testdb" + testUser = "testUser" + testDatabasePassword = "testPass" + ) + + testContainer := postgres.NewTestContainer(testDatabase, testUser, testDatabasePassword) + db := testContainer.GetTestDB() + require.NoError(t, db.Connect(ctx)) + schemaUtils := pgstore.NewSchemaUtils(db) + require.NoError(t, schemaUtils.CreateTable(ctx)) + + config := &pgstore.Config{ + DBHost: testContainer.Host(), + DBPort: testContainer.Port(), + DBName: testDatabase, + DBUser: testUser, + DBPassword: testDatabasePassword, + DBSchema: testContainer.Schema(), + } + durableStore := pgstore.NewStateStore(config) + require.NoError(t, durableStore.Connect(ctx)) + + lib.Pause(time.Second) + + persistenceID := uuid.NewString() + behavior := NewAccountDurableStateBehavior(persistenceID) + + err = durableStore.Connect(ctx) + require.NoError(t, err) + + lib.Pause(time.Second) + + eventStream := eventstream.New() + + persistentActor := newDurableStateActor(behavior, durableStore, eventStream) + pid, err := actorSystem.Spawn(ctx, behavior.ID(), persistentActor) + require.NoError(t, err) + require.NotNil(t, pid) + + lib.Pause(time.Second) + + var command proto.Message + + command = &testpb.CreateAccount{AccountBalance: 500.00} + + reply, err := actors.Ask(ctx, pid, command, time.Second) + require.NoError(t, err) + require.NotNil(t, reply) + require.IsType(t, new(egopb.CommandReply), reply) + + commandReply := reply.(*egopb.CommandReply) + require.IsType(t, new(egopb.CommandReply_StateReply), commandReply.GetReply()) + + state := commandReply.GetReply().(*egopb.CommandReply_StateReply) + assert.EqualValues(t, 1, state.StateReply.GetSequenceNumber()) + + // marshal the resulting state + resultingState := new(testpb.Account) + err = state.StateReply.GetState().UnmarshalTo(resultingState) + require.NoError(t, err) + + expected := &testpb.Account{ + AccountId: persistenceID, + AccountBalance: 500.00, + } + assert.True(t, proto.Equal(expected, resultingState)) + + // send another command to credit the balance + command = &testpb.CreditAccount{ + AccountId: persistenceID, + Balance: 250, + } + reply, err = actors.Ask(ctx, pid, command, time.Second) + require.NoError(t, err) + require.NotNil(t, reply) + require.IsType(t, new(egopb.CommandReply), reply) + + commandReply = reply.(*egopb.CommandReply) + require.IsType(t, new(egopb.CommandReply_StateReply), commandReply.GetReply()) + + state = commandReply.GetReply().(*egopb.CommandReply_StateReply) + assert.EqualValues(t, 2, state.StateReply.GetSequenceNumber()) + + // marshal the resulting state + resultingState = new(testpb.Account) + err = state.StateReply.GetState().UnmarshalTo(resultingState) + require.NoError(t, err) + + expected = &testpb.Account{ + AccountId: persistenceID, + AccountBalance: 750.00, + } + + assert.True(t, proto.Equal(expected, resultingState)) + // wait a while + lib.Pause(time.Second) + + // restart the actor + pid, err = actorSystem.ReSpawn(ctx, behavior.ID()) + require.NoError(t, err) + + lib.Pause(time.Second) + + // fetch the current state + command = &egopb.GetStateCommand{} + reply, err = actors.Ask(ctx, pid, command, time.Second) + require.NoError(t, err) + require.NotNil(t, reply) + require.IsType(t, new(egopb.CommandReply), reply) + + commandReply = reply.(*egopb.CommandReply) + require.IsType(t, new(egopb.CommandReply_StateReply), commandReply.GetReply()) + + resultingState = new(testpb.Account) + err = state.StateReply.GetState().UnmarshalTo(resultingState) + require.NoError(t, err) + expected = &testpb.Account{ + AccountId: persistenceID, + AccountBalance: 750.00, + } + assert.True(t, proto.Equal(expected, resultingState)) + + err = actorSystem.Stop(ctx) + assert.NoError(t, err) + + lib.Pause(time.Second) + + // free resources + assert.NoError(t, schemaUtils.DropTable(ctx)) + assert.NoError(t, durableStore.Disconnect(ctx)) + testContainer.Cleanup() + eventStream.Close() + }) +} diff --git a/egopb/ego.pb.go b/egopb/ego.pb.go index bc1bd37..b9ffdac 100644 --- a/egopb/ego.pb.go +++ b/egopb/ego.pb.go @@ -551,6 +551,92 @@ func (x *ProjectionId) GetShardNumber() uint64 { return 0 } +// DurableState defines the durable state behavior +// actor +type DurableState struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Specifies the persistence unique identifier + PersistenceId string `protobuf:"bytes,1,opt,name=persistence_id,json=persistenceId,proto3" json:"persistence_id,omitempty"` + // Specifies the version number + VersionNumber uint64 `protobuf:"varint,2,opt,name=version_number,json=versionNumber,proto3" json:"version_number,omitempty"` + // the state obtained from processing a command + ResultingState *anypb.Any `protobuf:"bytes,3,opt,name=resulting_state,json=resultingState,proto3" json:"resulting_state,omitempty"` + // Specifies the timestamp + Timestamp int64 `protobuf:"varint,5,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + // Specifies the shard number + Shard uint64 `protobuf:"varint,6,opt,name=shard,proto3" json:"shard,omitempty"` +} + +func (x *DurableState) Reset() { + *x = DurableState{} + if protoimpl.UnsafeEnabled { + mi := &file_ego_v3_ego_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DurableState) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DurableState) ProtoMessage() {} + +func (x *DurableState) ProtoReflect() protoreflect.Message { + mi := &file_ego_v3_ego_proto_msgTypes[8] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DurableState.ProtoReflect.Descriptor instead. +func (*DurableState) Descriptor() ([]byte, []int) { + return file_ego_v3_ego_proto_rawDescGZIP(), []int{8} +} + +func (x *DurableState) GetPersistenceId() string { + if x != nil { + return x.PersistenceId + } + return "" +} + +func (x *DurableState) GetVersionNumber() uint64 { + if x != nil { + return x.VersionNumber + } + return 0 +} + +func (x *DurableState) GetResultingState() *anypb.Any { + if x != nil { + return x.ResultingState + } + return nil +} + +func (x *DurableState) GetTimestamp() int64 { + if x != nil { + return x.Timestamp + } + return 0 +} + +func (x *DurableState) GetShard() uint64 { + if x != nil { + return x.Shard + } + return 0 +} + var File_ego_v3_ego_proto protoreflect.FileDescriptor var file_ego_v3_ego_proto_rawDesc = []byte{ @@ -612,15 +698,28 @@ var file_ego_v3_ego_proto_rawDesc = []byte{ 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, - 0x73, 0x68, 0x61, 0x72, 0x64, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x42, 0x73, 0x0a, 0x0a, 0x63, - 0x6f, 0x6d, 0x2e, 0x65, 0x67, 0x6f, 0x2e, 0x76, 0x31, 0x42, 0x08, 0x45, 0x67, 0x6f, 0x50, 0x72, - 0x6f, 0x74, 0x6f, 0x48, 0x02, 0x50, 0x01, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, - 0x63, 0x6f, 0x6d, 0x2f, 0x74, 0x6f, 0x63, 0x68, 0x65, 0x6d, 0x65, 0x79, 0x2f, 0x65, 0x67, 0x6f, - 0x2f, 0x76, 0x33, 0x3b, 0x65, 0x67, 0x6f, 0x70, 0x62, 0xa2, 0x02, 0x03, 0x45, 0x58, 0x58, 0xaa, - 0x02, 0x06, 0x45, 0x67, 0x6f, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x06, 0x45, 0x67, 0x6f, 0x5c, 0x56, - 0x31, 0xe2, 0x02, 0x12, 0x45, 0x67, 0x6f, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x07, 0x45, 0x67, 0x6f, 0x3a, 0x3a, 0x56, 0x31, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x68, 0x61, 0x72, 0x64, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x22, 0xcf, 0x01, 0x0a, 0x0c, + 0x44, 0x75, 0x72, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x25, 0x0a, 0x0e, + 0x70, 0x65, 0x72, 0x73, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x70, 0x65, 0x72, 0x73, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x63, + 0x65, 0x49, 0x64, 0x12, 0x25, 0x0a, 0x0e, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x6e, + 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x76, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x0f, 0x72, 0x65, + 0x73, 0x75, 0x6c, 0x74, 0x69, 0x6e, 0x67, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x0e, 0x72, 0x65, 0x73, 0x75, 0x6c, + 0x74, 0x69, 0x6e, 0x67, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, + 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, 0x42, 0x73, 0x0a, + 0x0a, 0x63, 0x6f, 0x6d, 0x2e, 0x65, 0x67, 0x6f, 0x2e, 0x76, 0x31, 0x42, 0x08, 0x45, 0x67, 0x6f, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x48, 0x02, 0x50, 0x01, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x74, 0x6f, 0x63, 0x68, 0x65, 0x6d, 0x65, 0x79, 0x2f, 0x65, + 0x67, 0x6f, 0x2f, 0x76, 0x33, 0x3b, 0x65, 0x67, 0x6f, 0x70, 0x62, 0xa2, 0x02, 0x03, 0x45, 0x58, + 0x58, 0xaa, 0x02, 0x06, 0x45, 0x67, 0x6f, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x06, 0x45, 0x67, 0x6f, + 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x12, 0x45, 0x67, 0x6f, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, + 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, 0x07, 0x45, 0x67, 0x6f, 0x3a, 0x3a, + 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -635,7 +734,7 @@ func file_ego_v3_ego_proto_rawDescGZIP() []byte { return file_ego_v3_ego_proto_rawDescData } -var file_ego_v3_ego_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_ego_v3_ego_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_ego_v3_ego_proto_goTypes = []interface{}{ (*Event)(nil), // 0: ego.v1.Event (*CommandReply)(nil), // 1: ego.v1.CommandReply @@ -645,19 +744,21 @@ var file_ego_v3_ego_proto_goTypes = []interface{}{ (*GetStateCommand)(nil), // 5: ego.v1.GetStateCommand (*Offset)(nil), // 6: ego.v1.Offset (*ProjectionId)(nil), // 7: ego.v1.ProjectionId - (*anypb.Any)(nil), // 8: google.protobuf.Any + (*DurableState)(nil), // 8: ego.v1.DurableState + (*anypb.Any)(nil), // 9: google.protobuf.Any } var file_ego_v3_ego_proto_depIdxs = []int32{ - 8, // 0: ego.v1.Event.event:type_name -> google.protobuf.Any - 8, // 1: ego.v1.Event.resulting_state:type_name -> google.protobuf.Any + 9, // 0: ego.v1.Event.event:type_name -> google.protobuf.Any + 9, // 1: ego.v1.Event.resulting_state:type_name -> google.protobuf.Any 2, // 2: ego.v1.CommandReply.state_reply:type_name -> ego.v1.StateReply 3, // 3: ego.v1.CommandReply.error_reply:type_name -> ego.v1.ErrorReply - 8, // 4: ego.v1.StateReply.state:type_name -> google.protobuf.Any - 5, // [5:5] is the sub-list for method output_type - 5, // [5:5] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 9, // 4: ego.v1.StateReply.state:type_name -> google.protobuf.Any + 9, // 5: ego.v1.DurableState.resulting_state:type_name -> google.protobuf.Any + 6, // [6:6] is the sub-list for method output_type + 6, // [6:6] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_ego_v3_ego_proto_init() } @@ -762,6 +863,18 @@ func file_ego_v3_ego_proto_init() { return nil } } + file_ego_v3_ego_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DurableState); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_ego_v3_ego_proto_msgTypes[1].OneofWrappers = []interface{}{ (*CommandReply_StateReply)(nil), @@ -773,7 +886,7 @@ func file_ego_v3_ego_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_ego_v3_ego_proto_rawDesc, NumEnums: 0, - NumMessages: 8, + NumMessages: 9, NumExtensions: 0, NumServices: 0, }, diff --git a/engine.go b/engine.go index 614d6a2..875f3b0 100644 --- a/engine.go +++ b/engine.go @@ -41,9 +41,9 @@ import ( "github.com/tochemey/goakt/v2/log" "github.com/tochemey/ego/v3/egopb" - "github.com/tochemey/ego/v3/eventstore" "github.com/tochemey/ego/v3/eventstream" "github.com/tochemey/ego/v3/offsetstore" + "github.com/tochemey/ego/v3/persistence" "github.com/tochemey/ego/v3/projection" ) @@ -54,17 +54,20 @@ var ( ErrUndefinedEntityID = errors.New("eGo entity id is not defined") // ErrCommandReplyUnmarshalling is returned when unmarshalling command reply failed ErrCommandReplyUnmarshalling = errors.New("failed to parse command reply") + // ErrDurableStateStoreRequired is returned when the eGo engine durable store is not set + ErrDurableStateStoreRequired = errors.New("durable state store is required") ) // Engine represents the engine that empowers the various entities type Engine struct { - name string // name is the application name - eventsStore eventstore.EventsStore // eventsStore is the events store - enableCluster *atomic.Bool // enableCluster enable/disable cluster mode - actorSystem actors.ActorSystem // actorSystem is the underlying actor system - logger log.Logger // logger is the logging engine to use - discoveryProvider discovery.Provider // discoveryProvider is the discovery provider for clustering - partitionsCount uint64 // partitionsCount specifies the number of partitions + name string // name is the application name + eventsStore persistence.EventsStore // eventsStore is the events store + stateStore persistence.StateStore // stateStore is the durable state store + enableCluster *atomic.Bool // enableCluster enable/disable cluster mode + actorSystem actors.ActorSystem // actorSystem is the underlying actor system + logger log.Logger // logger is the logging engine to use + discoveryProvider discovery.Provider // discoveryProvider is the discovery provider for clustering + partitionsCount uint64 // partitionsCount specifies the number of partitions started atomic.Bool hostName string peersPort int @@ -77,7 +80,7 @@ type Engine struct { } // NewEngine creates an instance of Engine -func NewEngine(name string, eventsStore eventstore.EventsStore, opts ...Option) *Engine { +func NewEngine(name string, eventsStore persistence.EventsStore, opts ...Option) *Engine { e := &Engine{ name: name, eventsStore: eventsStore, @@ -122,7 +125,10 @@ func (engine *Engine) Start(ctx context.Context) error { WithMinimumPeersQuorum(uint32(engine.minimumPeersQuorum)). WithReplicaCount(uint32(replicaCount)). WithPartitionCount(engine.partitionsCount). - WithKinds(new(actor)) + WithKinds( + new(eventSourcedActor), + new(durableStateActor), + ) opts = append(opts, actors.WithCluster(clusterConfig), @@ -224,14 +230,24 @@ func (engine *Engine) Subscribe() (eventstream.Subscriber, error) { for i := 0; i < int(engine.partitionsCount); i++ { topic := fmt.Sprintf(eventsTopic, i) engine.eventStream.Subscribe(subscriber, topic) + topic = fmt.Sprintf(statesTopic, i) + engine.eventStream.Subscribe(subscriber, topic) } return subscriber, nil } -// Entity creates an entity. This will return the entity path -// that can be used to send command to the entity -func (engine *Engine) Entity(ctx context.Context, behavior EntityBehavior) error { +// Entity creates an event sourced entity. +// Entity persists its full state into an events store that tracks the history based upon events that occurred. +// An event sourced entity receives a (non-persistent) command which is first validated if it can be applied to the current state. +// Here validation can mean anything, from simple inspection of a command message’s fields up to a conversation with several external services, for instance. +// If validation succeeds, events are generated from the command, representing the outcome of the command. +// These events are then persisted and, after successful persistence, used to change the actor’s state. +// When the event sourced entity needs to be recovered, only the persisted events are replayed of which we know that they can be successfully applied. +// In other words, events cannot fail when being replayed to a persistent actor, in contrast to commands. +// When there are no events to persist the event sourced entity will return the current state of the entity. +// One can use the SendCommand to send a command a durable state entity. +func (engine *Engine) Entity(ctx context.Context, behavior EventSourcedBehavior) error { if !engine.Started() { return ErrEngineNotStarted } @@ -244,7 +260,40 @@ func (engine *Engine) Entity(ctx context.Context, behavior EntityBehavior) error _, err := actorSystem.Spawn(ctx, behavior.ID(), - newActor(behavior, eventsStore, eventStream)) + newEventSourcedActor(behavior, eventsStore, eventStream)) + if err != nil { + return err + } + + return nil +} + +// DurableStateEntity creates a durable state entity. +// A DurableStateEntity persists its full state into a durable store without any history of the state evolution. +// A durable state entity receives a (non-persistent) command which is first validated if it can be applied to the current state. +// Here validation can mean anything, from simple inspection of a command message’s fields up to a conversation with several external services, for instance. +// If validation succeeds, a new state is generated from the command, representing the outcome of the command. +// The new state is persisted and, after successful persistence, used to change the actor’s state. +// During a normal shutdown process, it will persist its current state to the durable store prior to shutting down. +// One can use the SendCommand to send a command a durable state entity. +func (engine *Engine) DurableStateEntity(ctx context.Context, behavior DurableStateBehavior) error { + if !engine.Started() { + return ErrEngineNotStarted + } + + engine.mutex.Lock() + actorSystem := engine.actorSystem + durableStateStore := engine.stateStore + eventStream := engine.eventStream + engine.mutex.Unlock() + + if durableStateStore == nil { + return ErrDurableStateStoreRequired + } + + _, err := actorSystem.Spawn(ctx, + behavior.ID(), + newDurableStateActor(behavior, durableStateStore, eventStream)) if err != nil { return err } @@ -254,7 +303,7 @@ func (engine *Engine) Entity(ctx context.Context, behavior EntityBehavior) error // SendCommand sends command to a given entity ref. // This will return: -// 1. the resulting state after the command has been handled and the emitted event persisted +// 1. the resulting state after the command has been handled and the emitted event/durable state persisted // 2. nil when there is no resulting state or no event persisted // 3. an error in case of error func (engine *Engine) SendCommand(ctx context.Context, entityID string, cmd Command, timeout time.Duration) (resultingState State, revision uint64, err error) { diff --git a/engine_test.go b/engine_test.go index 9ccd1f8..3c0a279 100644 --- a/engine_test.go +++ b/engine_test.go @@ -43,15 +43,17 @@ import ( mockdisco "github.com/tochemey/goakt/v2/mocks/discovery" "github.com/tochemey/ego/v3/egopb" - "github.com/tochemey/ego/v3/eventstore/memory" samplepb "github.com/tochemey/ego/v3/example/pbs/sample/pb/v1" "github.com/tochemey/ego/v3/internal/lib" offsetstore "github.com/tochemey/ego/v3/offsetstore/memory" + "github.com/tochemey/ego/v3/plugins/eventstore/memory" + memstore "github.com/tochemey/ego/v3/plugins/statestore/memory" "github.com/tochemey/ego/v3/projection" + testpb "github.com/tochemey/ego/v3/test/data/pb/v3" ) func TestEgo(t *testing.T) { - t.Run("With single node cluster enabled", func(t *testing.T) { + t.Run("EventSourced entity With single node cluster enabled", func(t *testing.T) { ctx := context.TODO() // create the event store eventStore := memory.NewEventsStore() @@ -112,7 +114,7 @@ func TestEgo(t *testing.T) { // create a persistence id entityID := uuid.NewString() // create an entity behavior with a given id - behavior := NewAccountBehavior(entityID) + behavior := NewEventSourcedEntity(entityID) // create an entity err = engine.Entity(ctx, behavior) require.NoError(t, err) @@ -170,7 +172,7 @@ func TestEgo(t *testing.T) { assert.NoError(t, offsetStore.Disconnect(ctx)) assert.NoError(t, engine.Stop(ctx)) }) - t.Run("With no cluster enabled", func(t *testing.T) { + t.Run("EventSourced entity With no cluster enabled", func(t *testing.T) { ctx := context.TODO() // create the event store eventStore := memory.NewEventsStore() @@ -184,7 +186,7 @@ func TestEgo(t *testing.T) { // create a persistence id entityID := uuid.NewString() // create an entity behavior with a given id - behavior := NewAccountBehavior(entityID) + behavior := NewEventSourcedEntity(entityID) // create an entity err = engine.Entity(ctx, behavior) require.NoError(t, err) @@ -223,7 +225,7 @@ func TestEgo(t *testing.T) { assert.NoError(t, eventStore.Disconnect(ctx)) assert.NoError(t, engine.Stop(ctx)) }) - t.Run("With SendCommand when not started", func(t *testing.T) { + t.Run("EventSourced entity With SendCommand when not started", func(t *testing.T) { ctx := context.TODO() // create the event store eventStore := memory.NewEventsStore() @@ -240,7 +242,7 @@ func TestEgo(t *testing.T) { assert.NoError(t, eventStore.Disconnect(ctx)) }) - t.Run("With SendCommand when entityID is not set", func(t *testing.T) { + t.Run("EventSourced entity With SendCommand when entityID is not set", func(t *testing.T) { ctx := context.TODO() // create the event store eventStore := memory.NewEventsStore() @@ -261,7 +263,7 @@ func TestEgo(t *testing.T) { assert.NoError(t, eventStore.Disconnect(ctx)) assert.NoError(t, engine.Stop(ctx)) }) - t.Run("With SendCommand when entity is not found", func(t *testing.T) { + t.Run("EventSourced entity With SendCommand when entity is not found", func(t *testing.T) { ctx := context.TODO() // create the event store eventStore := memory.NewEventsStore() @@ -282,7 +284,7 @@ func TestEgo(t *testing.T) { assert.NoError(t, eventStore.Disconnect(ctx)) assert.NoError(t, engine.Stop(ctx)) }) - t.Run("With IsProjectionRunning when not started", func(t *testing.T) { + t.Run("EventSourced entity With IsProjectionRunning when not started", func(t *testing.T) { ctx := context.TODO() // create the event store eventStore := memory.NewEventsStore() @@ -298,7 +300,7 @@ func TestEgo(t *testing.T) { assert.NoError(t, eventStore.Disconnect(ctx)) }) - t.Run("With RemoveProjection", func(t *testing.T) { + t.Run("EventSourced entity With RemoveProjection", func(t *testing.T) { ctx := context.TODO() // create the event store eventStore := memory.NewEventsStore() @@ -341,7 +343,7 @@ func TestEgo(t *testing.T) { assert.NoError(t, eventStore.Disconnect(ctx)) assert.NoError(t, engine.Stop(ctx)) }) - t.Run("With RemoveProjection when not started", func(t *testing.T) { + t.Run("EventSourced entity With RemoveProjection when not started", func(t *testing.T) { ctx := context.TODO() // create the event store eventStore := memory.NewEventsStore() @@ -356,33 +358,243 @@ func TestEgo(t *testing.T) { assert.NoError(t, eventStore.Disconnect(ctx)) }) + + t.Run("DurableStore entity With single node cluster enabled", func(t *testing.T) { + ctx := context.TODO() + stateStore := memstore.NewStateStore() + require.NoError(t, stateStore.Connect(ctx)) + + nodePorts := dynaport.Get(3) + gossipPort := nodePorts[0] + clusterPort := nodePorts[1] + remotingPort := nodePorts[2] + + host := "127.0.0.1" + + // define discovered addresses + addrs := []string{ + net.JoinHostPort(host, strconv.Itoa(gossipPort)), + } + + // mock the discovery provider + provider := new(mockdisco.Provider) + + provider.EXPECT().ID().Return("testDisco") + provider.EXPECT().Initialize().Return(nil) + provider.EXPECT().Register().Return(nil) + provider.EXPECT().Deregister().Return(nil) + provider.EXPECT().DiscoverPeers().Return(addrs, nil) + provider.EXPECT().Close().Return(nil) + + // create the ego engine + engine := NewEngine("Sample", nil, + WithLogger(log.DiscardLogger), + WithStateStore(stateStore), + WithCluster(provider, 4, 1, host, remotingPort, gossipPort, clusterPort)) + + err := engine.Start(ctx) + + // wait for the cluster to fully start + lib.Pause(time.Second) + + // subscribe to events + subscriber, err := engine.Subscribe() + require.NoError(t, err) + require.NotNil(t, subscriber) + + require.NoError(t, err) + // create a persistence id + entityID := uuid.NewString() + + behavior := NewAccountDurableStateBehavior(entityID) + // create an entity + err = engine.DurableStateEntity(ctx, behavior) + require.NoError(t, err) + // send some commands to the pid + var command proto.Message + command = &testpb.CreateAccount{ + AccountBalance: 500.00, + } + + // wait for the cluster to fully start + lib.Pause(time.Second) + + // send the command to the actor. Please don't ignore the error in production grid code + resultingState, revision, err := engine.SendCommand(ctx, entityID, command, time.Minute) + require.NoError(t, err) + account, ok := resultingState.(*testpb.Account) + require.True(t, ok) + + assert.EqualValues(t, 500.00, account.GetAccountBalance()) + assert.Equal(t, entityID, account.GetAccountId()) + assert.EqualValues(t, 1, revision) + + command = &testpb.CreditAccount{ + AccountId: entityID, + Balance: 250, + } + + newState, revision, err := engine.SendCommand(ctx, entityID, command, time.Minute) + require.NoError(t, err) + newAccount, ok := newState.(*testpb.Account) + require.True(t, ok) + + assert.EqualValues(t, 750.00, newAccount.GetAccountBalance()) + assert.Equal(t, entityID, newAccount.GetAccountId()) + assert.EqualValues(t, 2, revision) + + for message := range subscriber.Iterator() { + payload := message.Payload() + envelope, ok := payload.(*egopb.DurableState) + require.True(t, ok) + require.NotZero(t, envelope.GetVersionNumber()) + } + + // free resources + require.NoError(t, engine.Stop(ctx)) + lib.Pause(time.Second) + require.NoError(t, stateStore.Disconnect(ctx)) + }) + t.Run("DurableStore entity With no cluster enabled", func(t *testing.T) { + ctx := context.TODO() + stateStore := memstore.NewStateStore() + require.NoError(t, stateStore.Connect(ctx)) + + // create the ego engine + engine := NewEngine("Sample", nil, + WithStateStore(stateStore), + WithLogger(log.DiscardLogger)) + + err := engine.Start(ctx) + require.NoError(t, err) + + entityID := uuid.NewString() + behavior := NewAccountDurableStateBehavior(entityID) + + err = engine.DurableStateEntity(ctx, behavior) + require.NoError(t, err) + var command proto.Message + + command = &testpb.CreateAccount{ + AccountBalance: 500.00, + } + + resultingState, revision, err := engine.SendCommand(ctx, entityID, command, time.Minute) + require.NoError(t, err) + account, ok := resultingState.(*testpb.Account) + require.True(t, ok) + + assert.EqualValues(t, 500.00, account.GetAccountBalance()) + assert.Equal(t, entityID, account.GetAccountId()) + assert.EqualValues(t, 1, revision) + + // send another command to credit the balance + command = &testpb.CreditAccount{ + AccountId: entityID, + Balance: 250, + } + newState, revision, err := engine.SendCommand(ctx, entityID, command, time.Minute) + require.NoError(t, err) + newAccount, ok := newState.(*testpb.Account) + require.True(t, ok) + + assert.EqualValues(t, 750.00, newAccount.GetAccountBalance()) + assert.Equal(t, entityID, newAccount.GetAccountId()) + assert.EqualValues(t, 2, revision) + + assert.NoError(t, engine.Stop(ctx)) + lib.Pause(time.Second) + assert.NoError(t, stateStore.Disconnect(ctx)) + }) + t.Run("DurableStore entity With SendCommand when not started", func(t *testing.T) { + ctx := context.TODO() + + stateStore := memstore.NewStateStore() + require.NoError(t, stateStore.Connect(ctx)) + + // create the ego engine + engine := NewEngine("Sample", nil, + WithStateStore(stateStore), + WithLogger(log.DiscardLogger)) + + entityID := uuid.NewString() + + _, _, err := engine.SendCommand(ctx, entityID, new(testpb.CreateAccount), time.Minute) + require.Error(t, err) + assert.EqualError(t, err, ErrEngineNotStarted.Error()) + + assert.NoError(t, stateStore.Disconnect(ctx)) + }) + t.Run("DurableStore entity With SendCommand when entityID is not set", func(t *testing.T) { + ctx := context.TODO() + stateStore := memstore.NewStateStore() + require.NoError(t, stateStore.Connect(ctx)) + + // create the ego engine + engine := NewEngine("Sample", nil, + WithStateStore(stateStore), + WithLogger(log.DiscardLogger)) + err := engine.Start(ctx) + require.NoError(t, err) + + // create a persistence id + entityID := "" + + _, _, err = engine.SendCommand(ctx, entityID, new(testpb.CreateAccount), time.Minute) + require.Error(t, err) + assert.EqualError(t, err, ErrUndefinedEntityID.Error()) + assert.NoError(t, engine.Stop(ctx)) + assert.NoError(t, stateStore.Disconnect(ctx)) + }) + t.Run("DurableStore entity With SendCommand when entity is not found", func(t *testing.T) { + ctx := context.TODO() + + stateStore := memstore.NewStateStore() + require.NoError(t, stateStore.Connect(ctx)) + + // create the ego engine + engine := NewEngine("Sample", nil, + WithStateStore(stateStore), + WithLogger(log.DiscardLogger)) + err := engine.Start(ctx) + require.NoError(t, err) + + // create a persistence id + entityID := uuid.NewString() + + _, _, err = engine.SendCommand(ctx, entityID, new(testpb.CreateAccount), time.Minute) + require.Error(t, err) + assert.EqualError(t, err, actors.ErrActorNotFound(entityID).Error()) + assert.NoError(t, engine.Stop(ctx)) + assert.NoError(t, stateStore.Disconnect(ctx)) + }) } -// AccountBehavior implements persistence.Behavior -type AccountBehavior struct { +// EventSourcedEntity implements persistence.Behavior +type EventSourcedEntity struct { id string } -// make sure that AccountBehavior is a true persistence behavior -var _ EntityBehavior = &AccountBehavior{} +// make sure that EventSourcedEntity is a true persistence behavior +var _ EventSourcedBehavior = &EventSourcedEntity{} -// NewAccountBehavior creates an instance of AccountBehavior -func NewAccountBehavior(id string) *AccountBehavior { - return &AccountBehavior{id: id} +// NewEventSourcedEntity creates an instance of EventSourcedEntity +func NewEventSourcedEntity(id string) *EventSourcedEntity { + return &EventSourcedEntity{id: id} } // ID returns the id -func (a *AccountBehavior) ID() string { +func (a *EventSourcedEntity) ID() string { return a.id } // InitialState returns the initial state -func (a *AccountBehavior) InitialState() State { +func (a *EventSourcedEntity) InitialState() State { return State(new(samplepb.Account)) } // HandleCommand handles every command that is sent to the persistent behavior -func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ State) (events []Event, err error) { +func (a *EventSourcedEntity) HandleCommand(_ context.Context, command Command, _ State) (events []Event, err error) { switch cmd := command.(type) { case *samplepb.CreateAccount: // TODO in production grid app validate the command using the prior state @@ -408,7 +620,7 @@ func (a *AccountBehavior) HandleCommand(_ context.Context, command Command, _ St } // HandleEvent handles every event emitted -func (a *AccountBehavior) HandleEvent(_ context.Context, event Event, priorState State) (state State, err error) { +func (a *EventSourcedEntity) HandleEvent(_ context.Context, event Event, priorState State) (state State, err error) { switch evt := event.(type) { case *samplepb.AccountCreated: return &samplepb.Account{ diff --git a/actor.go b/event_sourced_actor.go similarity index 81% rename from actor.go rename to event_sourced_actor.go index efabf6f..0949cc8 100644 --- a/actor.go +++ b/event_sourced_actor.go @@ -38,58 +38,51 @@ import ( "github.com/tochemey/goakt/v2/goaktpb" "github.com/tochemey/ego/v3/egopb" - "github.com/tochemey/ego/v3/eventstore" "github.com/tochemey/ego/v3/eventstream" + "github.com/tochemey/ego/v3/persistence" ) var ( eventsTopic = "topic.events.%d" ) -// actor is an event sourced based actor -type actor struct { - EntityBehavior - // specifies the events store - eventsStore eventstore.EventsStore - // specifies the current state - currentState State - +// eventSourcedActor is an event sourced based actor +type eventSourcedActor struct { + EventSourcedBehavior + eventsStore persistence.EventsStore + currentState State eventsCounter uint64 lastCommandTime time.Time - - eventsStream eventstream.Stream + eventsStream eventstream.Stream } -// enforce compilation error -var _ actors.Actor = (*actor)(nil) +// implements the actors.Actor interface +var _ actors.Actor = (*eventSourcedActor)(nil) -// newActor creates an instance of actor provided the eventSourcedHandler and the events store -func newActor(behavior EntityBehavior, eventsStore eventstore.EventsStore, eventsStream eventstream.Stream) *actor { - // create an instance of entity and return it - return &actor{ - eventsStore: eventsStore, - EntityBehavior: behavior, - eventsStream: eventsStream, +// newEventSourcedActor creates an instance of actor provided the eventSourcedHandler and the events store +func newEventSourcedActor(behavior EventSourcedBehavior, eventsStore persistence.EventsStore, eventsStream eventstream.Stream) *eventSourcedActor { + return &eventSourcedActor{ + eventsStore: eventsStore, + EventSourcedBehavior: behavior, + eventsStream: eventsStream, } } // PreStart pre-starts the actor -// At this stage we connect to the various stores -func (entity *actor) PreStart(ctx context.Context) error { +func (entity *eventSourcedActor) PreStart(ctx context.Context) error { if entity.eventsStore == nil { return errors.New("events store is not defined") } if err := entity.eventsStore.Ping(ctx); err != nil { - return fmt.Errorf("failed to connect to the events store: %v", err) + return fmt.Errorf("failed to connect to the events store: %w", err) } return nil } // Receive processes any message dropped into the actor mailbox. -func (entity *actor) Receive(ctx *actors.ReceiveContext) { - // grab the command sent +func (entity *eventSourcedActor) Receive(ctx *actors.ReceiveContext) { switch command := ctx.Message().(type) { case *goaktpb.PostStart: if err := entity.recoverFromSnapshot(ctx.Context()); err != nil { @@ -104,14 +97,14 @@ func (entity *actor) Receive(ctx *actors.ReceiveContext) { // PostStop prepares the actor to gracefully shutdown // nolint -func (entity *actor) PostStop(ctx context.Context) error { +func (entity *eventSourcedActor) PostStop(ctx context.Context) error { entity.eventsCounter = 0 return nil } // recoverFromSnapshot reset the persistent actor to the latest snapshot in case there is one // this is vital when the entity actor is restarting. -func (entity *actor) recoverFromSnapshot(ctx context.Context) error { +func (entity *eventSourcedActor) recoverFromSnapshot(ctx context.Context) error { event, err := entity.eventsStore.GetLatestEvent(ctx, entity.ID()) if err != nil { return fmt.Errorf("failed to recover the latest journal: %w", err) @@ -124,7 +117,6 @@ func (entity *actor) recoverFromSnapshot(ctx context.Context) error { return fmt.Errorf("failed unmarshal the latest state: %w", err) } entity.currentState = currentState - entity.eventsCounter = event.GetSequenceNumber() return nil } @@ -134,7 +126,7 @@ func (entity *actor) recoverFromSnapshot(ctx context.Context) error { } // sendErrorReply sends an error as a reply message -func (entity *actor) sendErrorReply(ctx *actors.ReceiveContext, err error) { +func (entity *eventSourcedActor) sendErrorReply(ctx *actors.ReceiveContext, err error) { reply := &egopb.CommandReply{ Reply: &egopb.CommandReply_ErrorReply{ ErrorReply: &egopb.ErrorReply{ @@ -147,7 +139,7 @@ func (entity *actor) sendErrorReply(ctx *actors.ReceiveContext, err error) { } // getStateAndReply returns the current state of the entity -func (entity *actor) getStateAndReply(ctx *actors.ReceiveContext) { +func (entity *eventSourcedActor) getStateAndReply(ctx *actors.ReceiveContext) { latestEvent, err := entity.eventsStore.GetLatestEvent(ctx.Context(), entity.ID()) if err != nil { entity.sendErrorReply(ctx, err) @@ -170,7 +162,7 @@ func (entity *actor) getStateAndReply(ctx *actors.ReceiveContext) { } // processCommandAndReply processes the incoming command -func (entity *actor) processCommandAndReply(ctx *actors.ReceiveContext, command Command) { +func (entity *eventSourcedActor) processCommandAndReply(ctx *actors.ReceiveContext, command Command) { goCtx := ctx.Context() events, err := entity.HandleCommand(goCtx, command, entity.currentState) if err != nil { diff --git a/actor_test.go b/event_sourced_actor_test.go similarity index 94% rename from actor_test.go rename to event_sourced_actor_test.go index e41ae8f..4b258b2 100644 --- a/actor_test.go +++ b/event_sourced_actor_test.go @@ -39,15 +39,15 @@ import ( "github.com/tochemey/goakt/v2/log" "github.com/tochemey/ego/v3/egopb" - "github.com/tochemey/ego/v3/eventstore/memory" - pgeventstore "github.com/tochemey/ego/v3/eventstore/postgres" "github.com/tochemey/ego/v3/eventstream" "github.com/tochemey/ego/v3/internal/lib" "github.com/tochemey/ego/v3/internal/postgres" + "github.com/tochemey/ego/v3/plugins/eventstore/memory" + pgstore "github.com/tochemey/ego/v3/plugins/eventstore/postgres" testpb "github.com/tochemey/ego/v3/test/data/pb/v3" ) -func TestActor(t *testing.T) { +func TestEventSourcedActor(t *testing.T) { t.Run("with state reply", func(t *testing.T) { ctx := context.TODO() // create an actor system @@ -69,7 +69,7 @@ func TestActor(t *testing.T) { // create a persistence id persistenceID := uuid.NewString() // create the persistence behavior - behavior := NewAccountEntityBehavior(persistenceID) + behavior := NewAccountEventSourcedBehavior(persistenceID) // connect the event store err = eventStore.Connect(ctx) @@ -81,7 +81,7 @@ func TestActor(t *testing.T) { eventStream := eventstream.New() // create the persistence actor using the behavior previously created - actor := newActor(behavior, eventStore, eventStream) + actor := newEventSourcedActor(behavior, eventStore, eventStream) // spawn the actor pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor) require.NotNil(t, pid) @@ -175,7 +175,7 @@ func TestActor(t *testing.T) { // create a persistence id persistenceID := uuid.NewString() // create the persistence behavior - behavior := NewAccountEntityBehavior(persistenceID) + behavior := NewAccountEventSourcedBehavior(persistenceID) // connect the event store err = eventStore.Connect(ctx) @@ -187,7 +187,7 @@ func TestActor(t *testing.T) { eventStream := eventstream.New() // create the persistence actor using the behavior previously created - persistentActor := newActor(behavior, eventStore, eventStream) + persistentActor := newEventSourcedActor(behavior, eventStore, eventStream) // spawn the actor pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor) require.NotNil(t, pid) @@ -269,7 +269,7 @@ func TestActor(t *testing.T) { // create a persistence id persistenceID := uuid.NewString() // create the persistence behavior - behavior := NewAccountEntityBehavior(persistenceID) + behavior := NewAccountEventSourcedBehavior(persistenceID) // connect the event store err = eventStore.Connect(ctx) @@ -279,7 +279,7 @@ func TestActor(t *testing.T) { eventStream := eventstream.New() // create the persistence actor using the behavior previously created - persistentActor := newActor(behavior, eventStore, eventStream) + persistentActor := newEventSourcedActor(behavior, eventStore, eventStream) // spawn the actor pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor) require.NotNil(t, pid) @@ -337,10 +337,10 @@ func TestActor(t *testing.T) { db := testContainer.GetTestDB() // create the event store table require.NoError(t, db.Connect(ctx)) - schemaUtils := pgeventstore.NewSchemaUtils(db) + schemaUtils := pgstore.NewSchemaUtils(db) require.NoError(t, schemaUtils.CreateTable(ctx)) - config := &pgeventstore.Config{ + config := &pgstore.Config{ DBHost: testContainer.Host(), DBPort: testContainer.Port(), DBName: testDatabase, @@ -348,7 +348,7 @@ func TestActor(t *testing.T) { DBPassword: testDatabasePassword, DBSchema: testContainer.Schema(), } - eventStore := pgeventstore.NewEventsStore(config) + eventStore := pgstore.NewEventsStore(config) require.NoError(t, eventStore.Connect(ctx)) lib.Pause(time.Second) @@ -356,7 +356,7 @@ func TestActor(t *testing.T) { // create a persistence id persistenceID := uuid.NewString() // create the persistence behavior - behavior := NewAccountEntityBehavior(persistenceID) + behavior := NewAccountEventSourcedBehavior(persistenceID) // connect the event store err = eventStore.Connect(ctx) @@ -368,7 +368,7 @@ func TestActor(t *testing.T) { eventStream := eventstream.New() // create the persistence actor using the behavior previously created - persistentActor := newActor(behavior, eventStore, eventStream) + persistentActor := newEventSourcedActor(behavior, eventStore, eventStream) // spawn the actor pid, err := actorSystem.Spawn(ctx, behavior.ID(), persistentActor) require.NoError(t, err) @@ -491,7 +491,7 @@ func TestActor(t *testing.T) { // create a persistence id persistenceID := uuid.NewString() // create the persistence behavior - behavior := NewAccountEntityBehavior(persistenceID) + behavior := NewAccountEventSourcedBehavior(persistenceID) // connect the event store err = eventStore.Connect(ctx) @@ -503,7 +503,7 @@ func TestActor(t *testing.T) { eventStream := eventstream.New() // create the persistence actor using the behavior previously created - actor := newActor(behavior, eventStore, eventStream) + actor := newEventSourcedActor(behavior, eventStore, eventStream) // spawn the actor pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor) require.NotNil(t, pid) @@ -618,7 +618,7 @@ func TestActor(t *testing.T) { // create a persistence id persistenceID := uuid.NewString() // create the persistence behavior - behavior := NewAccountEntityBehavior(persistenceID) + behavior := NewAccountEventSourcedBehavior(persistenceID) // connect the event store err = eventStore.Connect(ctx) @@ -630,7 +630,7 @@ func TestActor(t *testing.T) { eventStream := eventstream.New() // create the persistence actor using the behavior previously created - actor := newActor(behavior, eventStore, eventStream) + actor := newEventSourcedActor(behavior, eventStore, eventStream) // spawn the actor pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor) require.NotNil(t, pid) diff --git a/example/main.go b/example/main.go index a500c33..1a510d4 100644 --- a/example/main.go +++ b/example/main.go @@ -37,8 +37,8 @@ import ( "google.golang.org/protobuf/proto" "github.com/tochemey/ego/v3" - "github.com/tochemey/ego/v3/eventstore/memory" samplepb "github.com/tochemey/ego/v3/example/pbs/sample/pb/v1" + "github.com/tochemey/ego/v3/plugins/eventstore/memory" ) func main() { @@ -93,13 +93,13 @@ func main() { os.Exit(0) } -// AccountBehavior implements EntityBehavior +// AccountBehavior implements EventSourcedBehavior type AccountBehavior struct { id string } // make sure that AccountBehavior is a true persistence behavior -var _ ego.EntityBehavior = &AccountBehavior{} +var _ ego.EventSourcedBehavior = &AccountBehavior{} // NewAccountBehavior creates an instance of AccountBehavior func NewAccountBehavior(id string) *AccountBehavior { diff --git a/go.mod b/go.mod index 9847eb8..a6c6e92 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/deckarep/golang-set/v2 v2.7.0 github.com/flowchartsman/retry v1.2.0 github.com/georgysavva/scany/v2 v2.1.3 + github.com/golang/protobuf v1.5.4 github.com/google/uuid v1.6.0 github.com/hashicorp/go-memdb v1.3.4 github.com/jackc/pgx/v5 v5.7.1 @@ -17,6 +18,7 @@ require ( github.com/travisjeffery/go-dynaport v1.0.0 go.uber.org/atomic v1.11.0 go.uber.org/goleak v1.3.0 + go.uber.org/multierr v1.11.0 golang.org/x/sync v0.10.0 google.golang.org/protobuf v1.35.2 ) @@ -49,7 +51,6 @@ require ( github.com/go-openapi/swag v0.23.0 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/protobuf v1.5.4 // indirect github.com/google/btree v1.1.3 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.6.0 // indirect @@ -100,7 +101,6 @@ require ( github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect - go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.31.0 // indirect golang.org/x/mod v0.22.0 // indirect diff --git a/helper_test.go b/helper_test.go index 3bfa692..882119c 100644 --- a/helper_test.go +++ b/helper_test.go @@ -33,30 +33,28 @@ import ( testpb "github.com/tochemey/ego/v3/test/data/pb/v3" ) -// AccountEntityBehavior implement EntityBehavior -type AccountEntityBehavior struct { +// AccountEventSourcedBehavior implements EventSourcedBehavior +type AccountEventSourcedBehavior struct { id string } -// make sure that testAccountBehavior is a true persistence behavior -var _ EntityBehavior = (*AccountEntityBehavior)(nil) +// enforces compilation error +var _ EventSourcedBehavior = (*AccountEventSourcedBehavior)(nil) -// NewAccountEntityBehavior creates an instance of AccountEntityBehavior -func NewAccountEntityBehavior(id string) *AccountEntityBehavior { - return &AccountEntityBehavior{id: id} +func NewAccountEventSourcedBehavior(id string) *AccountEventSourcedBehavior { + return &AccountEventSourcedBehavior{id: id} } -func (t *AccountEntityBehavior) ID() string { +func (t *AccountEventSourcedBehavior) ID() string { return t.id } -func (t *AccountEntityBehavior) InitialState() State { +func (t *AccountEventSourcedBehavior) InitialState() State { return new(testpb.Account) } -func (t *AccountEntityBehavior) HandleCommand(_ context.Context, command Command, _ State) (events []Event, err error) { +func (t *AccountEventSourcedBehavior) HandleCommand(_ context.Context, command Command, _ State) (events []Event, err error) { switch cmd := command.(type) { case *testpb.CreateAccount: - // TODO in production grid app validate the command using the prior state return []Event{ &testpb.AccountCreated{ AccountId: t.id, @@ -87,7 +85,7 @@ func (t *AccountEntityBehavior) HandleCommand(_ context.Context, command Command } } -func (t *AccountEntityBehavior) HandleEvent(_ context.Context, event Event, priorState State) (state State, err error) { +func (t *AccountEventSourcedBehavior) HandleEvent(_ context.Context, event Event, priorState State) (state State, err error) { switch evt := event.(type) { case *testpb.AccountCreated: return &testpb.Account{ @@ -96,7 +94,6 @@ func (t *AccountEntityBehavior) HandleEvent(_ context.Context, event Event, prio }, nil case *testpb.AccountCredited: - // we can safely cast the prior state to Account account := priorState.(*testpb.Account) bal := account.GetAccountBalance() + evt.GetAccountBalance() return &testpb.Account{ @@ -108,3 +105,49 @@ func (t *AccountEntityBehavior) HandleEvent(_ context.Context, event Event, prio return nil, errors.New("unhandled event") } } + +type AccountDurableStateBehavior struct { + id string +} + +// enforces compilation error +var _ DurableStateBehavior = (*AccountDurableStateBehavior)(nil) + +func NewAccountDurableStateBehavior(id string) *AccountDurableStateBehavior { + return &AccountDurableStateBehavior{id: id} +} + +func (x *AccountDurableStateBehavior) ID() string { + return x.id +} + +func (x *AccountDurableStateBehavior) InitialState() State { + return new(testpb.Account) +} + +// nolint +func (x *AccountDurableStateBehavior) HandleCommand(ctx context.Context, command Command, priorVersion uint64, priorState State) (newState State, newVersion uint64, err error) { + switch cmd := command.(type) { + case *testpb.CreateAccount: + return &testpb.Account{ + AccountId: x.id, + AccountBalance: cmd.GetAccountBalance(), + }, priorVersion + 1, nil + + case *testpb.CreditAccount: + if cmd.GetAccountId() == x.id { + account := priorState.(*testpb.Account) + bal := account.GetAccountBalance() + cmd.GetBalance() + + return &testpb.Account{ + AccountId: cmd.GetAccountId(), + AccountBalance: bal, + }, priorVersion + 1, nil + } + + return nil, 0, errors.New("command sent to the wrong entity") + + default: + return nil, 0, errors.New("unhandled command") + } +} diff --git a/internal/errorschain/errorschain.go b/internal/errorschain/errorschain.go new file mode 100644 index 0000000..c41c3ee --- /dev/null +++ b/internal/errorschain/errorschain.go @@ -0,0 +1,88 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Arsene Tochemey Gandote + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package errorschain + +import "go.uber.org/multierr" + +// Chain defines an error chain +type Chain struct { + returnFirst bool + errs []error +} + +// ChainOption configures a validation chain at creation time. +type ChainOption func(*Chain) + +// New creates a new error chain. All errors will be evaluated respectively +// according to their insertion order +func New(opts ...ChainOption) *Chain { + chain := &Chain{ + errs: make([]error, 0), + } + + for _, opt := range opts { + opt(chain) + } + + return chain +} + +// AddError add an error to the chain +func (c *Chain) AddError(err error) *Chain { + c.errs = append(c.errs, err) + return c +} + +// AddErrors add a slice of errors to the chain. Remember the slice order does matter here +func (c *Chain) AddErrors(errs ...error) *Chain { + c.errs = append(c.errs, errs...) + return c +} + +// Error returns the error +func (c *Chain) Error() error { + var err error + for _, v := range c.errs { + if v != nil { + if c.returnFirst { + // just return the error + return v + } + // append error to the violations + err = multierr.Append(err, v) + } + } + return err +} + +// ReturnFirst sets whether a chain should stop validation on first error. +func ReturnFirst() ChainOption { + return func(c *Chain) { c.returnFirst = true } +} + +// ReturnAll sets whether a chain should return all errors. +func ReturnAll() ChainOption { + return func(c *Chain) { c.returnFirst = false } +} diff --git a/internal/errorschain/errorschain_test.go b/internal/errorschain/errorschain_test.go new file mode 100644 index 0000000..ccad554 --- /dev/null +++ b/internal/errorschain/errorschain_test.go @@ -0,0 +1,53 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Arsene Tochemey Gandote + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package errorschain + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestErrorsChain(t *testing.T) { + t.Run("With ReturnFirst", func(t *testing.T) { + e1 := errors.New("err1") + e2 := errors.New("err2") + e3 := errors.New("err3") + + chain := New(ReturnFirst()).AddError(e1).AddError(e2).AddError(e3) + actual := chain.Error() + assert.True(t, errors.Is(actual, e1)) + }) + t.Run("With ReturnAll", func(t *testing.T) { + e1 := errors.New("err1") + e2 := errors.New("err2") + e3 := errors.New("err3") + + chain := New(ReturnAll()).AddError(e1).AddError(e2).AddError(e3) + actual := chain.Error() + assert.EqualError(t, actual, "err1; err2; err3") + }) +} diff --git a/mocks/eventstore/events_store.go b/mocks/persistence/events_store.go similarity index 99% rename from mocks/eventstore/events_store.go rename to mocks/persistence/events_store.go index 2e25688..2c5d6a1 100644 --- a/mocks/eventstore/events_store.go +++ b/mocks/persistence/events_store.go @@ -1,6 +1,6 @@ // Code generated by mockery. DO NOT EDIT. -package eventstore +package persistence import ( context "context" diff --git a/mocks/persistence/state_store.go b/mocks/persistence/state_store.go new file mode 100644 index 0000000..1bf39dd --- /dev/null +++ b/mocks/persistence/state_store.go @@ -0,0 +1,261 @@ +// Code generated by mockery. DO NOT EDIT. + +package persistence + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + egopb "github.com/tochemey/ego/v3/egopb" +) + +// StateStore is an autogenerated mock type for the StateStore type +type StateStore struct { + mock.Mock +} + +type StateStore_Expecter struct { + mock *mock.Mock +} + +func (_m *StateStore) EXPECT() *StateStore_Expecter { + return &StateStore_Expecter{mock: &_m.Mock} +} + +// Connect provides a mock function with given fields: ctx +func (_m *StateStore) Connect(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// StateStore_Connect_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Connect' +type StateStore_Connect_Call struct { + *mock.Call +} + +// Connect is a helper method to define mock.On call +// - ctx context.Context +func (_e *StateStore_Expecter) Connect(ctx interface{}) *StateStore_Connect_Call { + return &StateStore_Connect_Call{Call: _e.mock.On("Connect", ctx)} +} + +func (_c *StateStore_Connect_Call) Run(run func(ctx context.Context)) *StateStore_Connect_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *StateStore_Connect_Call) Return(_a0 error) *StateStore_Connect_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *StateStore_Connect_Call) RunAndReturn(run func(context.Context) error) *StateStore_Connect_Call { + _c.Call.Return(run) + return _c +} + +// Disconnect provides a mock function with given fields: ctx +func (_m *StateStore) Disconnect(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// StateStore_Disconnect_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Disconnect' +type StateStore_Disconnect_Call struct { + *mock.Call +} + +// Disconnect is a helper method to define mock.On call +// - ctx context.Context +func (_e *StateStore_Expecter) Disconnect(ctx interface{}) *StateStore_Disconnect_Call { + return &StateStore_Disconnect_Call{Call: _e.mock.On("Disconnect", ctx)} +} + +func (_c *StateStore_Disconnect_Call) Run(run func(ctx context.Context)) *StateStore_Disconnect_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *StateStore_Disconnect_Call) Return(_a0 error) *StateStore_Disconnect_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *StateStore_Disconnect_Call) RunAndReturn(run func(context.Context) error) *StateStore_Disconnect_Call { + _c.Call.Return(run) + return _c +} + +// GetLatestState provides a mock function with given fields: ctx, persistenceID +func (_m *StateStore) GetLatestState(ctx context.Context, persistenceID string) (*egopb.DurableState, error) { + ret := _m.Called(ctx, persistenceID) + + var r0 *egopb.DurableState + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (*egopb.DurableState, error)); ok { + return rf(ctx, persistenceID) + } + if rf, ok := ret.Get(0).(func(context.Context, string) *egopb.DurableState); ok { + r0 = rf(ctx, persistenceID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*egopb.DurableState) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, persistenceID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// StateStore_GetLatestState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestState' +type StateStore_GetLatestState_Call struct { + *mock.Call +} + +// GetLatestState is a helper method to define mock.On call +// - ctx context.Context +// - persistenceID string +func (_e *StateStore_Expecter) GetLatestState(ctx interface{}, persistenceID interface{}) *StateStore_GetLatestState_Call { + return &StateStore_GetLatestState_Call{Call: _e.mock.On("GetLatestState", ctx, persistenceID)} +} + +func (_c *StateStore_GetLatestState_Call) Run(run func(ctx context.Context, persistenceID string)) *StateStore_GetLatestState_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *StateStore_GetLatestState_Call) Return(_a0 *egopb.DurableState, _a1 error) *StateStore_GetLatestState_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *StateStore_GetLatestState_Call) RunAndReturn(run func(context.Context, string) (*egopb.DurableState, error)) *StateStore_GetLatestState_Call { + _c.Call.Return(run) + return _c +} + +// Ping provides a mock function with given fields: ctx +func (_m *StateStore) Ping(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// StateStore_Ping_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Ping' +type StateStore_Ping_Call struct { + *mock.Call +} + +// Ping is a helper method to define mock.On call +// - ctx context.Context +func (_e *StateStore_Expecter) Ping(ctx interface{}) *StateStore_Ping_Call { + return &StateStore_Ping_Call{Call: _e.mock.On("Ping", ctx)} +} + +func (_c *StateStore_Ping_Call) Run(run func(ctx context.Context)) *StateStore_Ping_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *StateStore_Ping_Call) Return(_a0 error) *StateStore_Ping_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *StateStore_Ping_Call) RunAndReturn(run func(context.Context) error) *StateStore_Ping_Call { + _c.Call.Return(run) + return _c +} + +// WriteState provides a mock function with given fields: ctx, state +func (_m *StateStore) WriteState(ctx context.Context, state *egopb.DurableState) error { + ret := _m.Called(ctx, state) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *egopb.DurableState) error); ok { + r0 = rf(ctx, state) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// StateStore_WriteState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WriteState' +type StateStore_WriteState_Call struct { + *mock.Call +} + +// WriteState is a helper method to define mock.On call +// - ctx context.Context +// - state *egopb.DurableState +func (_e *StateStore_Expecter) WriteState(ctx interface{}, state interface{}) *StateStore_WriteState_Call { + return &StateStore_WriteState_Call{Call: _e.mock.On("WriteState", ctx, state)} +} + +func (_c *StateStore_WriteState_Call) Run(run func(ctx context.Context, state *egopb.DurableState)) *StateStore_WriteState_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*egopb.DurableState)) + }) + return _c +} + +func (_c *StateStore_WriteState_Call) Return(_a0 error) *StateStore_WriteState_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *StateStore_WriteState_Call) RunAndReturn(run func(context.Context, *egopb.DurableState) error) *StateStore_WriteState_Call { + _c.Call.Return(run) + return _c +} + +// NewStateStore creates a new instance of StateStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewStateStore(t interface { + mock.TestingT + Cleanup(func()) +}) *StateStore { + mock := &StateStore{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/offsetstore/iface.go b/offsetstore/offset_store.go similarity index 100% rename from offsetstore/iface.go rename to offsetstore/offset_store.go diff --git a/option.go b/option.go index 6679938..613d105 100644 --- a/option.go +++ b/option.go @@ -29,6 +29,8 @@ import ( "github.com/tochemey/goakt/v2/discovery" "github.com/tochemey/goakt/v2/log" + + "github.com/tochemey/ego/v3/persistence" ) // Option is the interface that applies a configuration option. @@ -67,3 +69,10 @@ func WithLogger(logger log.Logger) Option { e.logger = logger }) } + +// WithStateStore sets the durable store. This is necessary when creating a durable state entity +func WithStateStore(stateStore persistence.StateStore) Option { + return OptionFunc(func(e *Engine) { + e.stateStore = stateStore + }) +} diff --git a/eventstore/iface.go b/persistence/events_store.go similarity index 97% rename from eventstore/iface.go rename to persistence/events_store.go index 004a885..34c509c 100644 --- a/eventstore/iface.go +++ b/persistence/events_store.go @@ -22,7 +22,7 @@ * SOFTWARE. */ -package eventstore +package persistence import ( "context" @@ -36,7 +36,7 @@ type EventsStore interface { Connect(ctx context.Context) error // Disconnect disconnect the journal store Disconnect(ctx context.Context) error - // WriteEvents persist store in batches for a given persistenceID. + // WriteEvents persist event in batches for a given persistenceID. // Note: persistence id and the sequence number make a record in the journal store unique. Failure to ensure that // can lead to some un-wanted behaviors and data inconsistency WriteEvents(ctx context.Context, events []*egopb.Event) error diff --git a/persistence/state_store.go b/persistence/state_store.go new file mode 100644 index 0000000..1593f3a --- /dev/null +++ b/persistence/state_store.go @@ -0,0 +1,45 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package persistence + +import ( + "context" + + "github.com/tochemey/ego/v3/egopb" +) + +// StateStore defines the API to interact with the durable state store +type StateStore interface { + // Connect connects to the journal store + Connect(ctx context.Context) error + // Disconnect disconnect the journal store + Disconnect(ctx context.Context) error + // Ping verifies a connection to the database is still alive, establishing a connection if necessary. + Ping(ctx context.Context) error + // WriteState persist durable state for a given persistenceID. + WriteState(ctx context.Context, state *egopb.DurableState) error + // GetLatestState fetches the latest durable state + GetLatestState(ctx context.Context, persistenceID string) (*egopb.DurableState, error) +} diff --git a/eventstore/memory/memory.go b/plugins/eventstore/memory/memory.go similarity index 99% rename from eventstore/memory/memory.go rename to plugins/eventstore/memory/memory.go index 872554d..148a7b0 100644 --- a/eventstore/memory/memory.go +++ b/plugins/eventstore/memory/memory.go @@ -40,7 +40,7 @@ import ( "google.golang.org/protobuf/types/known/anypb" "github.com/tochemey/ego/v3/egopb" - "github.com/tochemey/ego/v3/eventstore" + "github.com/tochemey/ego/v3/persistence" ) // EventsStore keep in memory every journal @@ -56,7 +56,7 @@ type EventsStore struct { } // enforce interface implementation -var _ eventstore.EventsStore = (*EventsStore)(nil) +var _ persistence.EventsStore = (*EventsStore)(nil) // NewEventsStore creates a new instance of EventsStore func NewEventsStore() *EventsStore { diff --git a/eventstore/memory/memory_test.go b/plugins/eventstore/memory/memory_test.go similarity index 98% rename from eventstore/memory/memory_test.go rename to plugins/eventstore/memory/memory_test.go index a75669a..b1b553d 100644 --- a/eventstore/memory/memory_test.go +++ b/plugins/eventstore/memory/memory_test.go @@ -36,7 +36,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "github.com/tochemey/ego/v3/egopb" - "github.com/tochemey/ego/v3/eventstore" + "github.com/tochemey/ego/v3/persistence" testpb "github.com/tochemey/ego/v3/test/data/pb/v3" ) @@ -45,7 +45,7 @@ func TestEventsStore(t *testing.T) { eventsStore := NewEventsStore() assert.NotNil(t, eventsStore) var p interface{} = eventsStore - _, ok := p.(eventstore.EventsStore) + _, ok := p.(persistence.EventsStore) assert.True(t, ok) }) t.Run("testConnect", func(t *testing.T) { diff --git a/eventstore/memory/schemas.go b/plugins/eventstore/memory/schemas.go similarity index 100% rename from eventstore/memory/schemas.go rename to plugins/eventstore/memory/schemas.go diff --git a/eventstore/postgres/config.go b/plugins/eventstore/postgres/config.go similarity index 100% rename from eventstore/postgres/config.go rename to plugins/eventstore/postgres/config.go diff --git a/eventstore/postgres/helper_test.go b/plugins/eventstore/postgres/helper_test.go similarity index 100% rename from eventstore/postgres/helper_test.go rename to plugins/eventstore/postgres/helper_test.go diff --git a/eventstore/postgres/postgres.go b/plugins/eventstore/postgres/postgres.go similarity index 99% rename from eventstore/postgres/postgres.go rename to plugins/eventstore/postgres/postgres.go index a4c1290..537a5bb 100644 --- a/eventstore/postgres/postgres.go +++ b/plugins/eventstore/postgres/postgres.go @@ -35,8 +35,8 @@ import ( "google.golang.org/protobuf/proto" "github.com/tochemey/ego/v3/egopb" - "github.com/tochemey/ego/v3/eventstore" "github.com/tochemey/ego/v3/internal/postgres" + "github.com/tochemey/ego/v3/persistence" ) var ( @@ -72,7 +72,7 @@ type EventsStore struct { } // enforce interface implementation -var _ eventstore.EventsStore = (*EventsStore)(nil) +var _ persistence.EventsStore = (*EventsStore)(nil) // NewEventsStore creates a new instance of PostgresEventStore func NewEventsStore(config *Config) *EventsStore { diff --git a/eventstore/postgres/postgres_test.go b/plugins/eventstore/postgres/postgres_test.go similarity index 99% rename from eventstore/postgres/postgres_test.go rename to plugins/eventstore/postgres/postgres_test.go index b7b513a..0741b57 100644 --- a/eventstore/postgres/postgres_test.go +++ b/plugins/eventstore/postgres/postgres_test.go @@ -36,7 +36,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "github.com/tochemey/ego/v3/egopb" - "github.com/tochemey/ego/v3/eventstore" + "github.com/tochemey/ego/v3/persistence" testpb "github.com/tochemey/ego/v3/test/data/pb/v3" ) @@ -54,7 +54,7 @@ func TestPostgresEventsStore(t *testing.T) { estore := NewEventsStore(config) assert.NotNil(t, estore) var p interface{} = estore - _, ok := p.(eventstore.EventsStore) + _, ok := p.(persistence.EventsStore) assert.True(t, ok) }) t.Run("testConnect:happy path", func(t *testing.T) { diff --git a/eventstore/postgres/row.go b/plugins/eventstore/postgres/row.go similarity index 100% rename from eventstore/postgres/row.go rename to plugins/eventstore/postgres/row.go diff --git a/eventstore/postgres/schema_utils.go b/plugins/eventstore/postgres/schema_utils.go similarity index 100% rename from eventstore/postgres/schema_utils.go rename to plugins/eventstore/postgres/schema_utils.go diff --git a/eventstore/postgres/schema_utils_test.go b/plugins/eventstore/postgres/schema_utils_test.go similarity index 100% rename from eventstore/postgres/schema_utils_test.go rename to plugins/eventstore/postgres/schema_utils_test.go diff --git a/plugins/statestore/memory/memory.go b/plugins/statestore/memory/memory.go new file mode 100644 index 0000000..22c7801 --- /dev/null +++ b/plugins/statestore/memory/memory.go @@ -0,0 +1,110 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package memory + +import ( + "context" + "errors" + "sync" + + "go.uber.org/atomic" + + "github.com/tochemey/ego/v3/egopb" + "github.com/tochemey/ego/v3/persistence" +) + +// StateStore keep in memory every durable state actor +// NOTE: NOT RECOMMENDED FOR PRODUCTION CODE because all records are in memory and there is no durability. +// This is recommended for tests or PoC +type StateStore struct { + db *sync.Map + connected *atomic.Bool +} + +// enforce compilation error +var _ persistence.StateStore = (*StateStore)(nil) + +// NewStateStore creates an instance StateStore +func NewStateStore() *StateStore { + return &StateStore{ + db: &sync.Map{}, + connected: atomic.NewBool(false), + } +} + +// Connect connects the durable store +// nolint +func (d *StateStore) Connect(ctx context.Context) error { + if d.connected.Load() { + return nil + } + d.connected.Store(true) + return nil +} + +// Disconnect disconnect the durable store +// nolint +func (d *StateStore) Disconnect(ctx context.Context) error { + if !d.connected.Load() { + return nil + } + d.db.Range(func(key interface{}, value interface{}) bool { + d.db.Delete(key) + return true + }) + d.connected.Store(false) + return nil +} + +// Ping verifies a connection to the database is still alive, establishing a connection if necessary. +func (d *StateStore) Ping(ctx context.Context) error { + if !d.connected.Load() { + return d.Connect(ctx) + } + return nil +} + +// WriteState persist durable state for a given persistenceID. +// nolint +func (d *StateStore) WriteState(ctx context.Context, state *egopb.DurableState) error { + if !d.connected.Load() { + return errors.New("durable store is not connected") + } + d.db.Store(state.GetPersistenceId(), state) + return nil +} + +// GetLatestState fetches the latest durable state +// nolint +func (d *StateStore) GetLatestState(ctx context.Context, persistenceID string) (*egopb.DurableState, error) { + if !d.connected.Load() { + return nil, errors.New("durable store is not connected") + } + value, ok := d.db.Load(persistenceID) + if !ok { + return nil, nil + } + return value.(*egopb.DurableState), nil +} diff --git a/plugins/statestore/postgres/config.go b/plugins/statestore/postgres/config.go new file mode 100644 index 0000000..d407d6b --- /dev/null +++ b/plugins/statestore/postgres/config.go @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package postgres + +// Config defines the postgres events store configuration +type Config struct { + DBHost string // DBHost represents the database host + DBPort int // DBPort is the database port + DBName string // DBName is the database name + DBUser string // DBUser is the database user used to connect + DBPassword string // DBPassword is the database password + DBSchema string // DBSchema represents the database schema +} diff --git a/plugins/statestore/postgres/postgres.go b/plugins/statestore/postgres/postgres.go new file mode 100644 index 0000000..ca1b741 --- /dev/null +++ b/plugins/statestore/postgres/postgres.go @@ -0,0 +1,218 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package postgres + +import ( + "context" + "errors" + "fmt" + + sq "github.com/Masterminds/squirrel" + "github.com/golang/protobuf/proto" + "github.com/jackc/pgx/v5" + "go.uber.org/atomic" + + "github.com/tochemey/ego/v3/egopb" + "github.com/tochemey/ego/v3/internal/postgres" + "github.com/tochemey/ego/v3/persistence" +) + +var ( + columns = []string{ + "persistence_id", + "version_number", + "state_payload", + "state_manifest", + "timestamp", + "shard_number", + } + + tableName = "states_store" +) + +// DurableStore implements the DurableStore interface +// and helps persist events in a Postgres database +type DurableStore struct { + db postgres.Postgres + sb sq.StatementBuilderType + // insertBatchSize represents the chunk of data to bulk insert. + // This helps avoid the postgres 65535 parameter limit. + // This is necessary because Postgres uses a 32-bit int for binding input parameters and + // is not able to track anything larger. + // Note: Change this value when you know the size of data to bulk insert at once. Otherwise, you + // might encounter the postgres 65535 parameter limit error. + insertBatchSize int + // hold the connection state to avoid multiple connection of the same instance + connected *atomic.Bool +} + +// enforce interface implementation +var _ persistence.StateStore = (*DurableStore)(nil) + +// NewStateStore creates a new instance of StateStore +func NewStateStore(config *Config) *DurableStore { + // create the underlying db connection + db := postgres.New(postgres.NewConfig(config.DBHost, config.DBPort, config.DBUser, config.DBPassword, config.DBName)) + return &DurableStore{ + db: db, + sb: sq.StatementBuilder.PlaceholderFormat(sq.Dollar), + insertBatchSize: 500, + connected: atomic.NewBool(false), + } +} + +// Connect connects to the underlying postgres database +func (s *DurableStore) Connect(ctx context.Context) error { + // check whether this instance of the journal is connected or not + if s.connected.Load() { + return nil + } + + // connect to the underlying db + if err := s.db.Connect(ctx); err != nil { + return err + } + + // set the connection status + s.connected.Store(true) + + return nil +} + +// Disconnect disconnects from the underlying postgres database +func (s *DurableStore) Disconnect(ctx context.Context) error { + // check whether this instance of the journal is connected or not + if !s.connected.Load() { + return nil + } + + // disconnect the underlying database + if err := s.db.Disconnect(ctx); err != nil { + return err + } + // set the connection status + s.connected.Store(false) + + return nil +} + +// Ping verifies a connection to the database is still alive, establishing a connection if necessary. +func (s *DurableStore) Ping(ctx context.Context) error { + // check whether we are connected or not + if !s.connected.Load() { + return s.Connect(ctx) + } + + return nil +} + +// WriteState writes a durable state into the underlying postgres database +func (s *DurableStore) WriteState(ctx context.Context, state *egopb.DurableState) error { + if !s.connected.Load() { + return errors.New("durable store is not connected") + } + + if state == nil || proto.Equal(state, &egopb.DurableState{}) { + return nil + } + + tx, err := s.db.BeginTx(ctx, pgx.TxOptions{IsoLevel: pgx.ReadCommitted}) + if err != nil { + return fmt.Errorf("failed to obtain a database transaction: %w", err) + } + + query, args, err := s.sb.Delete(tableName).Where(sq.Eq{"persistence_id": state.GetPersistenceId()}).ToSql() + if err != nil { + return fmt.Errorf("failed to build the delete sql statement: %w", err) + } + + if _, err := s.db.Exec(ctx, query, args...); err != nil { + return fmt.Errorf("failed to delete durable state from the database: %w", err) + } + + bytea, _ := proto.Marshal(state.GetResultingState()) + manifest := string(state.GetResultingState().ProtoReflect().Descriptor().FullName()) + + statement := s.sb. + Insert(tableName). + Columns(columns...). + Values( + state.GetPersistenceId(), + state.GetVersionNumber(), + bytea, + manifest, + state.GetTimestamp(), + state.GetShard(), + ) + + query, args, err = statement.ToSql() + if err != nil { + return fmt.Errorf("unable to build sql insert statement: %w", err) + } + + _, execErr := tx.Exec(ctx, query, args...) + if execErr != nil { + if err = tx.Rollback(ctx); err != nil { + return fmt.Errorf("unable to rollback db transaction: %w", err) + } + return fmt.Errorf("failed to record durable state: %w", execErr) + } + + // commit the transaction + if commitErr := tx.Commit(ctx); commitErr != nil { + return fmt.Errorf("failed to record durable state: %w", commitErr) + } + return nil +} + +// GetLatestState fetches the latest durable state of a persistenceID +func (s *DurableStore) GetLatestState(ctx context.Context, persistenceID string) (*egopb.DurableState, error) { + if !s.connected.Load() { + return nil, errors.New("durable store is not connected") + } + + statement := s.sb. + Select(columns...). + From(tableName). + Where(sq.Eq{"persistence_id": persistenceID}). + Limit(1) + + query, args, err := statement.ToSql() + if err != nil { + return nil, fmt.Errorf("failed to build the select sql statement: %w", err) + } + + row := new(row) + err = s.db.Select(ctx, row, query, args...) + if err != nil { + return nil, fmt.Errorf("failed to fetch the latest event from the database: %w", err) + } + + if row.PersistenceID == "" { + return nil, nil + } + + return row.ToDurableState() +} diff --git a/plugins/statestore/postgres/row.go b/plugins/statestore/postgres/row.go new file mode 100644 index 0000000..660d400 --- /dev/null +++ b/plugins/statestore/postgres/row.go @@ -0,0 +1,82 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package postgres + +import ( + "fmt" + + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protoreflect" + "google.golang.org/protobuf/reflect/protoregistry" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/tochemey/ego/v3/egopb" +) + +// row represents the durable state store row +type row struct { + PersistenceID string + VersionNumber uint64 + StatePayload []byte + StateManifest string + Timestamp int64 + ShardNumber uint64 +} + +// ToDurableState convert row to durable state +func (x row) ToDurableState() (*egopb.DurableState, error) { + // unmarshal the event and the state + state, err := toProto(x.StateManifest, x.StatePayload) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal the durable state: %w", err) + } + + return &egopb.DurableState{ + PersistenceId: x.PersistenceID, + VersionNumber: x.VersionNumber, + ResultingState: state, + Timestamp: x.Timestamp, + Shard: x.ShardNumber, + }, nil +} + +// toProto converts a byte array given its manifest into a valid proto message +func toProto(manifest string, bytea []byte) (*anypb.Any, error) { + mt, err := protoregistry.GlobalTypes.FindMessageByName(protoreflect.FullName(manifest)) + if err != nil { + return nil, err + } + + pm := mt.New().Interface() + err = proto.Unmarshal(bytea, pm) + if err != nil { + return nil, err + } + + if cast, ok := pm.(*anypb.Any); ok { + return cast, nil + } + return nil, fmt.Errorf("failed to unpack message=%s", manifest) +} diff --git a/plugins/statestore/postgres/schema_utils.go b/plugins/statestore/postgres/schema_utils.go new file mode 100644 index 0000000..4cc6c70 --- /dev/null +++ b/plugins/statestore/postgres/schema_utils.go @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package postgres + +import ( + "context" + + "github.com/tochemey/ego/v3/internal/postgres" +) + +// SchemaUtils help create the various test tables in unit/integration tests +type SchemaUtils struct { + db *postgres.TestDB +} + +// NewSchemaUtils creates an instance of SchemaUtils +func NewSchemaUtils(db *postgres.TestDB) *SchemaUtils { + return &SchemaUtils{db: db} +} + +// CreateTable creates the event store table used for unit tests +func (d SchemaUtils) CreateTable(ctx context.Context) error { + schemaDDL := ` + DROP TABLE IF EXISTS states_store; + CREATE TABLE IF NOT EXISTS states_store + ( + persistence_id VARCHAR(255) NOT NULL, + version_number BIGINT NOT NULL, + state_payload BYTEA NOT NULL, + state_manifest VARCHAR(255) NOT NULL, + timestamp BIGINT NOT NULL, + shard_number BIGINT NOT NULL , + + PRIMARY KEY (persistence_id, version_number) + ); + ` + _, err := d.db.Exec(ctx, schemaDDL) + return err +} + +// DropTable drop the table used in unit test +// This is useful for resource cleanup after a unit test +func (d SchemaUtils) DropTable(ctx context.Context) error { + return d.db.DropTable(ctx, tableName) +} diff --git a/projection/projection.go b/projection/projection.go index 736e619..cd22a3a 100644 --- a/projection/projection.go +++ b/projection/projection.go @@ -30,8 +30,8 @@ import ( "github.com/tochemey/goakt/v2/actors" "github.com/tochemey/goakt/v2/goaktpb" - "github.com/tochemey/ego/v3/eventstore" "github.com/tochemey/ego/v3/offsetstore" + "github.com/tochemey/ego/v3/persistence" ) // Projection defines the projection actor @@ -46,7 +46,7 @@ var _ actors.Actor = (*Projection)(nil) // New creates an instance of Projection func New(name string, handler Handler, - eventsStore eventstore.EventsStore, + eventsStore persistence.EventsStore, offsetStore offsetstore.OffsetStore, opts ...Option) *Projection { // create the instance of the runner diff --git a/projection/projection_test.go b/projection/projection_test.go index 914d7ab..fa111bd 100644 --- a/projection/projection_test.go +++ b/projection/projection_test.go @@ -40,9 +40,9 @@ import ( "github.com/tochemey/goakt/v2/log" "github.com/tochemey/ego/v3/egopb" - "github.com/tochemey/ego/v3/eventstore/memory" "github.com/tochemey/ego/v3/internal/lib" memoffsetstore "github.com/tochemey/ego/v3/offsetstore/memory" + "github.com/tochemey/ego/v3/plugins/eventstore/memory" testpb "github.com/tochemey/ego/v3/test/data/pb/v3" ) diff --git a/projection/runner.go b/projection/runner.go index 86acd78..3f7700b 100644 --- a/projection/runner.go +++ b/projection/runner.go @@ -39,9 +39,9 @@ import ( "github.com/tochemey/goakt/v2/log" "github.com/tochemey/ego/v3/egopb" - "github.com/tochemey/ego/v3/eventstore" "github.com/tochemey/ego/v3/internal/ticker" "github.com/tochemey/ego/v3/offsetstore" + "github.com/tochemey/ego/v3/persistence" ) // runner defines the projection runner @@ -53,7 +53,7 @@ type runner struct { // Handler specifies the projection handler handler Handler // JournalStore specifies the journal store for reading events - eventsStore eventstore.EventsStore + eventsStore persistence.EventsStore // OffsetStore specifies the offset store to commit offsets offsetsStore offsetstore.OffsetStore // Specifies the recovery setting @@ -78,7 +78,7 @@ type runner struct { // The name of the projection should be unique func newRunner(name string, handler Handler, - eventsStore eventstore.EventsStore, + eventsStore persistence.EventsStore, offsetStore offsetstore.OffsetStore, opts ...Option) *runner { runner := &runner{ diff --git a/projection/runner_test.go b/projection/runner_test.go index 178cf2c..f630806 100644 --- a/projection/runner_test.go +++ b/projection/runner_test.go @@ -42,11 +42,11 @@ import ( "github.com/tochemey/goakt/v2/log" "github.com/tochemey/ego/v3/egopb" - "github.com/tochemey/ego/v3/eventstore/memory" "github.com/tochemey/ego/v3/internal/lib" - mockseventstore "github.com/tochemey/ego/v3/mocks/eventstore" mocksoffsetstore "github.com/tochemey/ego/v3/mocks/offsetstore" + mockseventstore "github.com/tochemey/ego/v3/mocks/persistence" memoffsetstore "github.com/tochemey/ego/v3/offsetstore/memory" + "github.com/tochemey/ego/v3/plugins/eventstore/memory" testpb "github.com/tochemey/ego/v3/test/data/pb/v3" ) diff --git a/protos/ego/v3/ego.proto b/protos/ego/v3/ego.proto index 9a5671c..a8e569a 100644 --- a/protos/ego/v3/ego.proto +++ b/protos/ego/v3/ego.proto @@ -82,3 +82,18 @@ message ProjectionId { // Specifies the shard number uint64 shard_number = 2; } + +// DurableState defines the durable state behavior +// actor +message DurableState { + // Specifies the persistence unique identifier + string persistence_id = 1; + // Specifies the version number + uint64 version_number = 2; + // the state obtained from processing a command + google.protobuf.Any resulting_state = 3; + // Specifies the timestamp + int64 timestamp = 5; + // Specifies the shard number + uint64 shard = 6; +} diff --git a/readme.md b/readme.md index 0db0b35..06fb5cc 100644 --- a/readme.md +++ b/readme.md @@ -1,4 +1,4 @@ -## eGo +# eGo [![build](https://img.shields.io/github/actions/workflow/status/Tochemey/ego/build.yml?branch=main)](https://github.com/Tochemey/ego/actions/workflows/build.yml) [![Go Reference](https://pkg.go.dev/badge/github.com/tochemey/ego.svg)](https://pkg.go.dev/github.com/tochemey/ego) @@ -9,36 +9,37 @@ eGo is a minimal library that help build event-sourcing and CQRS application through a simple interface, and it allows developers to describe their **_commands_**, **_events_** and **_states_** **_are defined using google protocol buffers_**. Under the hood, ego leverages [Go-Akt](https://github.com/Tochemey/goakt) to scale out and guarantee performant, reliable persistence. -### Features +## Features -#### Domain Entity/Aggregate Root +### Event Sourced Behavior -The aggregate root is crucial for maintaining data consistency, especially in distributed systems. It defines how to handle the various commands (requests to perform actions) that are always directed at the aggregate root. -In eGo commands sent the aggregate root are processed in order. When a command is processed, it may result in the generation of events, which are then stored in an event store. Every event persisted has a revision number -and timestamp that can help track it. The aggregate root in eGo is responsible for defining how to handle events that are the result of command handlers. The end result of events handling is to build the new state of the aggregate root. -When running in cluster mode, aggregate root are sharded. +The [`EventSourcedBehavior`](./behavior.go) is crucial for maintaining data consistency, especially in distributed systems. It defines how to handle the various commands (requests to perform actions) that are always directed at the event sourced entity. +In eGo commands sent to the [`EventSourcedBehavior`](./behavior.go) are processed in order. When a command is processed, it may result in the generation of events, which are then stored in an event store. Every event persisted has a revision number +and timestamp that can help track it. The [`EventSourcedBehavior`](./behavior.go) in eGo is responsible for defining how to handle events that are the result of command handlers. +The end result of events handling is to build the new state of the event sourced entity. When running in cluster mode, aggregate root are sharded. - `Commands handler`: The command handlers define how to handle each incoming command, which validations must be applied, and finally, which events will be persisted if any. When there is no event to be persisted a nil can be returned as a no-op. Command handlers are the meat of the event sourced actor. - They encode the business rules of your event sourced actor and act as a guardian of the Aggregate consistency. + They encode the business rules of your event sourced actor and act as a guardian of the event sourced entity consistency. The command handler must first validate that the incoming command can be applied to the current model state. Any decision should be solely based on the data passed in the commands and the state of the Behavior. In case of successful validation, one or more events expressing the mutations are persisted. Once the events are persisted, they are applied to the state producing a new valid state. -- `Events handler`: The event handlers are used to mutate the state of the Aggregate by applying the events to it. - Event handlers must be pure functions as they will be used when instantiating the Aggregate and replaying the event store. +- `Events handler`: The event handlers are used to mutate the state of the event sourced entity by applying the events to it. + Event handlers must be pure functions as they will be used when instantiating the event sourced entity and replaying the event store. #### Howto -To define an Aggregate Root, one needs to: -1. the state of the aggregate root using google protocol buffers message -2. the various commands that will be handled by the aggregate root -3. the various events that are result of the command handlers and that will be handled by the aggregate root to return the new state of the aggregate root -2. implements the [`EntityBehavior`](./behavior.go) interface. +To define an event sourced entity, one needs to: +1. define the state of the event sourced entity using google protocol buffers message +2. define the various commands that will be handled by the event sourced entity +3. define the various events that are result of the command handlers and that will be handled by the event sourced entity to return the new state of the event sourced entity +4. implement the [`EventSourcedBehavior`](./behavior.go) interface. +5. call the `Entity` method of eGo [engine](./engine.go) #### Events Stream -Every event handled by Aggregate Root are pushed to an events stream. That enables real-time processing of events without having to interact with the events store +Every event handled by event sourced entity are pushed to an events stream. That enables real-time processing of events without having to interact with the events store #### Projection @@ -49,9 +50,9 @@ persisted by the write model. The offset used in eGo is a timestamp-based offset #### Events Store -One can implement a custom events store. See [EventsStore](./eventstore/iface.go). eGo comes packaged with two events store: -- [Postgres](./eventstore/postgres/postgres.go): Schema can be found [here](./resources/eventstore_postgres.sql) -- [Memory](./eventstore/memory/memory.go) (for testing purpose only) +One can implement a custom events store. See [EventsStore](persistence/events_store.go). eGo comes packaged with two events store: +- [Postgres](plugins/eventstore/postgres/postgres.go): Schema can be found [here](./resources/eventstore_postgres.sql) +- [Memory](plugins/eventstore/memory/memory.go) (for testing purpose only) #### Offsets Store @@ -59,11 +60,41 @@ One can implement a custom offsets store. See [OffsetStore](./offsetstore/iface. - [Postgres](./offsetstore/postgres/postgres.go): Schema can be found [here](./resources/offsetstore_postgres.sql) - [Memory](./offsetstore/memory/memory.go) (for testing purpose only) -#### Cluster +### Durable State Behavior + +The [`DurableStateBehavior`](./behavior.go) represents a type of Actor that persists its full state after processing each command instead of using event sourcing. +This type of Actor keeps its current state in memory during command handling and based upon the command response persists its full state into a durable store. The store can be a SQL or NoSQL database. +The whole concept is given the current state of the actor and a command produce a new state with a higher version as shown in this diagram: (State, Command) => State +[`DurableStateBehavior`](./behavior.go) reacts to commands which result in a new version of the actor state. Only the latest version of the actor state is persisted to the durable store. +There is no concept of history regarding the actor state since this is not an event sourced actor. +However, one can rely on the _version number_ of the actor state and exactly know how the actor state has evolved overtime. +[`DurableStateBehavior`](./behavior.go) version number are numerically incremented by the command handler which means it is imperative that the newer version of the state is greater than the current version by one. +[`DurableStateBehavior`](./behavior.go) will attempt to recover its state whenever available from the durable state. +During a normal shutdown process, it will persist its current state to the durable store prior to shutting down. This behavior help maintain some consistency across the actor state evolution. + +#### State Store + +One can implement a custom state store. See [StateStore](persistence/state_store.go). eGo comes packaged with two state stores: +- [Postgres](plugins/statestore/postgres/postgres.go): Schema can be found [here](./resources/durablestore_postgres.sql) +- [Memory](plugins/statestore/memory/memory.go) (for testing purpose only) + +#### Howto + +To define a durable state entity, one needs to: +1. define the state of the entity using google protocol buffers message +2. define the various commands that will be handled by the entity +3. implements the [`DurableStateBehavior`](./behavior.go) interface. +4. call the `DurableStateEntity` method of eGo [engine](./engine.go) + +#### Events Stream + +[`DurableStateBehavior`](./behavior.go) full state is pushed to an events stream. That enables real-time processing of events without having to interact with the events store + +### Cluster The cluster mode heavily relies on [Go-Akt](https://github.com/Tochemey/goakt#clustering) clustering. -#### Mocks +### Mocks eGo ships in some [mocks](./mocks) @@ -83,77 +114,77 @@ go get github.com/tochemey/ego package main import ( - "context" - "errors" - "log" - "os" - "os/signal" - "syscall" - "time" - - "github.com/google/uuid" - "google.golang.org/protobuf/proto" - - "github.com/tochemey/ego/v3" - "github.com/tochemey/ego/v3/eventstore/memory" - samplepb "github.com/tochemey/ego/v3/example/pbs/sample/pb/v1" + "context" + "errors" + "log" + "os" + "os/signal" + "syscall" + "time" + + "github.com/google/uuid" + "google.golang.org/protobuf/proto" + + "github.com/tochemey/ego/v3" + "github.com/tochemey/ego/v3/plugins/eventstore/memory" + samplepb "github.com/tochemey/ego/v3/example/pbs/sample/pb/v1" ) func main() { - // create the go context - ctx := context.Background() - // create the event store - eventStore := memory.NewEventsStore() - // connect the event store - _ = eventStore.Connect(ctx) - // create the ego engine - engine := ego.NewEngine("Sample", eventStore) - // start ego engine - _ = engine.Start(ctx) - // create a persistence id - entityID := uuid.NewString() - // create an entity behavior with a given id - behavior := NewAccountBehavior(entityID) - // create an entity - _ = engine.Entity(ctx, behavior) - - // send some commands to the pid - var command proto.Message - // create an account - command = &samplepb.CreateAccount{ - AccountId: entityID, - AccountBalance: 500.00, - } - // send the command to the actor. Please don't ignore the error in production grid code - reply, _, _ := engine.SendCommand(ctx, entityID, command, time.Minute) - account := reply.(*samplepb.Account) - log.Printf("current balance: %v", account.GetAccountBalance()) - - // send another command to credit the balance - command = &samplepb.CreditAccount{ - AccountId: entityID, - Balance: 250, - } - - reply, _, _ = engine.SendCommand(ctx, entityID, command, time.Minute) - account = reply.(*samplepb.Account) - log.Printf("current balance: %v", account.GetAccountBalance()) - - // capture ctrl+c - interruptSignal := make(chan os.Signal, 1) - signal.Notify(interruptSignal, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - <-interruptSignal - - // disconnect the event store - _ = eventStore.Disconnect(ctx) - // stop the actor system - _ = engine.Stop(ctx) - os.Exit(0) + // create the go context + ctx := context.Background() + // create the event store + eventStore := memory.NewEventsStore() + // connect the event store + _ = eventStore.Connect(ctx) + // create the ego engine + engine := ego.NewEngine("Sample", eventStore) + // start ego engine + _ = engine.Start(ctx) + // create a persistence id + entityID := uuid.NewString() + // create an entity behavior with a given id + behavior := NewAccountBehavior(entityID) + // create an entity + _ = engine.Entity(ctx, behavior) + + // send some commands to the pid + var command proto.Message + // create an account + command = &samplepb.CreateAccount{ + AccountId: entityID, + AccountBalance: 500.00, + } + // send the command to the actor. Please don't ignore the error in production grid code + reply, _, _ := engine.SendCommand(ctx, entityID, command, time.Minute) + account := reply.(*samplepb.Account) + log.Printf("current balance: %v", account.GetAccountBalance()) + + // send another command to credit the balance + command = &samplepb.CreditAccount{ + AccountId: entityID, + Balance: 250, + } + + reply, _, _ = engine.SendCommand(ctx, entityID, command, time.Minute) + account = reply.(*samplepb.Account) + log.Printf("current balance: %v", account.GetAccountBalance()) + + // capture ctrl+c + interruptSignal := make(chan os.Signal, 1) + signal.Notify(interruptSignal, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + <-interruptSignal + + // disconnect the event store + _ = eventStore.Disconnect(ctx) + // stop the actor system + _ = engine.Stop(ctx) + os.Exit(0) } // AccountBehavior implements EntityBehavior type AccountBehavior struct { - id string + id string } // make sure that AccountBehavior is a true persistence behavior @@ -161,65 +192,65 @@ var _ ego.EntityBehavior = (*AccountBehavior)(nil) // NewAccountBehavior creates an instance of AccountBehavior func NewAccountBehavior(id string) *AccountBehavior { - return &AccountBehavior{id: id} + return &AccountBehavior{id: id} } // ID returns the id func (a *AccountBehavior) ID() string { - return a.id + return a.id } // InitialState returns the initial state func (a *AccountBehavior) InitialState() ego.State { - return ego.State(new(samplepb.Account)) + return ego.State(new(samplepb.Account)) } // HandleCommand handles every command that is sent to the persistent behavior func (a *AccountBehavior) HandleCommand(_ context.Context, command ego.Command, _ ego.State) (events []ego.Event, err error) { - switch cmd := command.(type) { - case *samplepb.CreateAccount: - // TODO in production grid app validate the command using the prior state - return []ego.Event{ - &samplepb.AccountCreated{ - AccountId: cmd.GetAccountId(), - AccountBalance: cmd.GetAccountBalance(), - }, - }, nil - - case *samplepb.CreditAccount: - // TODO in production grid app validate the command using the prior state - return []ego.Event{ - &samplepb.AccountCredited{ - AccountId: cmd.GetAccountId(), - AccountBalance: cmd.GetBalance(), - }, - }, nil - - default: - return nil, errors.New("unhandled command") - } + switch cmd := command.(type) { + case *samplepb.CreateAccount: + // TODO in production grid app validate the command using the prior state + return []ego.Event{ + &samplepb.AccountCreated{ + AccountId: cmd.GetAccountId(), + AccountBalance: cmd.GetAccountBalance(), + }, + }, nil + + case *samplepb.CreditAccount: + // TODO in production grid app validate the command using the prior state + return []ego.Event{ + &samplepb.AccountCredited{ + AccountId: cmd.GetAccountId(), + AccountBalance: cmd.GetBalance(), + }, + }, nil + + default: + return nil, errors.New("unhandled command") + } } // HandleEvent handles every event emitted func (a *AccountBehavior) HandleEvent(_ context.Context, event ego.Event, priorState ego.State) (state ego.State, err error) { - switch evt := event.(type) { - case *samplepb.AccountCreated: - return &samplepb.Account{ - AccountId: evt.GetAccountId(), - AccountBalance: evt.GetAccountBalance(), - }, nil - - case *samplepb.AccountCredited: - account := priorState.(*samplepb.Account) - bal := account.GetAccountBalance() + evt.GetAccountBalance() - return &samplepb.Account{ - AccountId: evt.GetAccountId(), - AccountBalance: bal, - }, nil - - default: - return nil, errors.New("unhandled event") - } + switch evt := event.(type) { + case *samplepb.AccountCreated: + return &samplepb.Account{ + AccountId: evt.GetAccountId(), + AccountBalance: evt.GetAccountBalance(), + }, nil + + case *samplepb.AccountCredited: + account := priorState.(*samplepb.Account) + bal := account.GetAccountBalance() + evt.GetAccountBalance() + return &samplepb.Account{ + AccountId: evt.GetAccountId(), + AccountBalance: bal, + }, nil + + default: + return nil, errors.New("unhandled event") + } } ``` diff --git a/resources/durablestore_postgres.sql b/resources/durablestore_postgres.sql new file mode 100644 index 0000000..95f7a0a --- /dev/null +++ b/resources/durablestore_postgres.sql @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +CREATE TABLE IF NOT EXISTS states_store +( + persistence_id VARCHAR(255) NOT NULL, + version_number BIGINT NOT NULL, + state_payload BYTEA NOT NULL, + state_manifest VARCHAR(255) NOT NULL, + timestamp BIGINT NOT NULL, + shard_number BIGINT NOT NULL, + + PRIMARY KEY (persistence_id, version_number) +);