From 0d98428f726db8313459a7e8204236293816fb93 Mon Sep 17 00:00:00 2001 From: Mohamad Fadhil Date: Tue, 17 Dec 2024 15:29:33 +0800 Subject: [PATCH 1/3] use upsert when inserting to postgres state store --- plugins/statestore/postgres/postgres.go | 19 ++++++++----------- resources/durablestore_postgres.sql | 1 + 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/plugins/statestore/postgres/postgres.go b/plugins/statestore/postgres/postgres.go index ca1b741..f4c7748 100644 --- a/plugins/statestore/postgres/postgres.go +++ b/plugins/statestore/postgres/postgres.go @@ -143,15 +143,6 @@ func (s *DurableStore) WriteState(ctx context.Context, state *egopb.DurableState return fmt.Errorf("failed to obtain a database transaction: %w", err) } - query, args, err := s.sb.Delete(tableName).Where(sq.Eq{"persistence_id": state.GetPersistenceId()}).ToSql() - if err != nil { - return fmt.Errorf("failed to build the delete sql statement: %w", err) - } - - if _, err := s.db.Exec(ctx, query, args...); err != nil { - return fmt.Errorf("failed to delete durable state from the database: %w", err) - } - bytea, _ := proto.Marshal(state.GetResultingState()) manifest := string(state.GetResultingState().ProtoReflect().Descriptor().FullName()) @@ -165,9 +156,15 @@ func (s *DurableStore) WriteState(ctx context.Context, state *egopb.DurableState manifest, state.GetTimestamp(), state.GetShard(), - ) + ).Suffix("ON CONFLICT (persistence_id) " + + "DO UPDATE SET " + + "version_number = excluded.version_number," + + "state_payload = excluded.state_payload, " + + "state_manifest = excluded.state_manifest," + + "timestamp = excluded.timestamp", + ) - query, args, err = statement.ToSql() + query, args, err := statement.ToSql() if err != nil { return fmt.Errorf("unable to build sql insert statement: %w", err) } diff --git a/resources/durablestore_postgres.sql b/resources/durablestore_postgres.sql index 95f7a0a..13bc73c 100644 --- a/resources/durablestore_postgres.sql +++ b/resources/durablestore_postgres.sql @@ -32,4 +32,5 @@ CREATE TABLE IF NOT EXISTS states_store shard_number BIGINT NOT NULL, PRIMARY KEY (persistence_id, version_number) + CONSTRAINT unique_persistence_id UNIQUE (persistence_id) ); From 1b2001af16220b516736137710a7715653cb4fe7 Mon Sep 17 00:00:00 2001 From: Mohamad Fadhil Date: Tue, 17 Dec 2024 15:45:01 +0800 Subject: [PATCH 2/3] Update resources/durablestore_postgres.sql Co-authored-by: Arsene Signed-off-by: Mohamad Fadhil --- resources/durablestore_postgres.sql | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resources/durablestore_postgres.sql b/resources/durablestore_postgres.sql index 13bc73c..83eefa9 100644 --- a/resources/durablestore_postgres.sql +++ b/resources/durablestore_postgres.sql @@ -32,5 +32,6 @@ CREATE TABLE IF NOT EXISTS states_store shard_number BIGINT NOT NULL, PRIMARY KEY (persistence_id, version_number) - CONSTRAINT unique_persistence_id UNIQUE (persistence_id) + CREATE INDEX IF NOT EXISTS idx_states_store_persistence_id ON events_store(persistence_id); + CREATE INDEX IF NOT EXISTS idx_states_store_version_number ON events_store(version_number); ); From 2df52573480ca6ac716e65b9ccdc7a5784404a86 Mon Sep 17 00:00:00 2001 From: Mohamad Fadhil Date: Tue, 17 Dec 2024 15:48:55 +0800 Subject: [PATCH 3/3] update sql migration script --- resources/durablestore_postgres.sql | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/resources/durablestore_postgres.sql b/resources/durablestore_postgres.sql index 83eefa9..d7da67b 100644 --- a/resources/durablestore_postgres.sql +++ b/resources/durablestore_postgres.sql @@ -32,6 +32,7 @@ CREATE TABLE IF NOT EXISTS states_store shard_number BIGINT NOT NULL, PRIMARY KEY (persistence_id, version_number) - CREATE INDEX IF NOT EXISTS idx_states_store_persistence_id ON events_store(persistence_id); - CREATE INDEX IF NOT EXISTS idx_states_store_version_number ON events_store(version_number); ); + +CREATE INDEX IF NOT EXISTS idx_states_store_persistence_id ON states_store(persistence_id); +CREATE INDEX IF NOT EXISTS idx_states_store_version_number ON states_store(version_number);