From 0d98428f726db8313459a7e8204236293816fb93 Mon Sep 17 00:00:00 2001 From: Mohamad Fadhil Date: Tue, 17 Dec 2024 15:29:33 +0800 Subject: [PATCH] use upsert when inserting to postgres state store --- plugins/statestore/postgres/postgres.go | 19 ++++++++----------- resources/durablestore_postgres.sql | 1 + 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/plugins/statestore/postgres/postgres.go b/plugins/statestore/postgres/postgres.go index ca1b741..f4c7748 100644 --- a/plugins/statestore/postgres/postgres.go +++ b/plugins/statestore/postgres/postgres.go @@ -143,15 +143,6 @@ func (s *DurableStore) WriteState(ctx context.Context, state *egopb.DurableState return fmt.Errorf("failed to obtain a database transaction: %w", err) } - query, args, err := s.sb.Delete(tableName).Where(sq.Eq{"persistence_id": state.GetPersistenceId()}).ToSql() - if err != nil { - return fmt.Errorf("failed to build the delete sql statement: %w", err) - } - - if _, err := s.db.Exec(ctx, query, args...); err != nil { - return fmt.Errorf("failed to delete durable state from the database: %w", err) - } - bytea, _ := proto.Marshal(state.GetResultingState()) manifest := string(state.GetResultingState().ProtoReflect().Descriptor().FullName()) @@ -165,9 +156,15 @@ func (s *DurableStore) WriteState(ctx context.Context, state *egopb.DurableState manifest, state.GetTimestamp(), state.GetShard(), - ) + ).Suffix("ON CONFLICT (persistence_id) " + + "DO UPDATE SET " + + "version_number = excluded.version_number," + + "state_payload = excluded.state_payload, " + + "state_manifest = excluded.state_manifest," + + "timestamp = excluded.timestamp", + ) - query, args, err = statement.ToSql() + query, args, err := statement.ToSql() if err != nil { return fmt.Errorf("unable to build sql insert statement: %w", err) } diff --git a/resources/durablestore_postgres.sql b/resources/durablestore_postgres.sql index 95f7a0a..13bc73c 100644 --- a/resources/durablestore_postgres.sql +++ b/resources/durablestore_postgres.sql @@ -32,4 +32,5 @@ CREATE TABLE IF NOT EXISTS states_store shard_number BIGINT NOT NULL, PRIMARY KEY (persistence_id, version_number) + CONSTRAINT unique_persistence_id UNIQUE (persistence_id) );