diff --git a/plugins/statestore/postgres/postgres.go b/plugins/statestore/postgres/postgres.go index c4f2058..1c8a51b 100644 --- a/plugins/statestore/postgres/postgres.go +++ b/plugins/statestore/postgres/postgres.go @@ -135,15 +135,6 @@ func (s *DurableStore) WriteState(ctx context.Context, state *egopb.DurableState return fmt.Errorf("failed to obtain a database transaction: %w", err) } - query, args, err := s.sb.Delete(tableName).Where(sq.Eq{"persistence_id": state.GetPersistenceId()}).ToSql() - if err != nil { - return fmt.Errorf("failed to build the delete sql statement: %w", err) - } - - if _, err := s.db.Exec(ctx, query, args...); err != nil { - return fmt.Errorf("failed to delete durable state from the database: %w", err) - } - bytea, _ := proto.Marshal(state.GetResultingState()) manifest := string(state.GetResultingState().ProtoReflect().Descriptor().FullName()) @@ -157,9 +148,15 @@ func (s *DurableStore) WriteState(ctx context.Context, state *egopb.DurableState manifest, state.GetTimestamp(), state.GetShard(), - ) + ).Suffix("ON CONFLICT (persistence_id) " + + "DO UPDATE SET " + + "version_number = excluded.version_number," + + "state_payload = excluded.state_payload, " + + "state_manifest = excluded.state_manifest," + + "timestamp = excluded.timestamp", + ) - query, args, err = statement.ToSql() + query, args, err := statement.ToSql() if err != nil { return fmt.Errorf("unable to build sql insert statement: %w", err) } diff --git a/resources/durablestore_postgres.sql b/resources/durablestore_postgres.sql index 95f7a0a..83eefa9 100644 --- a/resources/durablestore_postgres.sql +++ b/resources/durablestore_postgres.sql @@ -32,4 +32,6 @@ CREATE TABLE IF NOT EXISTS states_store shard_number BIGINT NOT NULL, PRIMARY KEY (persistence_id, version_number) + CREATE INDEX IF NOT EXISTS idx_states_store_persistence_id ON events_store(persistence_id); + CREATE INDEX IF NOT EXISTS idx_states_store_version_number ON events_store(version_number); );