From 75743355186de9fdcb3039b5f2a1a781bb76906f Mon Sep 17 00:00:00 2001 From: Mohamad Fadhil Date: Tue, 17 Dec 2024 15:46:17 +0800 Subject: [PATCH 1/2] remove insertbatch in statestore plugin (#103) --- plugins/statestore/postgres/postgres.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/plugins/statestore/postgres/postgres.go b/plugins/statestore/postgres/postgres.go index ca1b741..c4f2058 100644 --- a/plugins/statestore/postgres/postgres.go +++ b/plugins/statestore/postgres/postgres.go @@ -57,13 +57,6 @@ var ( type DurableStore struct { db postgres.Postgres sb sq.StatementBuilderType - // insertBatchSize represents the chunk of data to bulk insert. - // This helps avoid the postgres 65535 parameter limit. - // This is necessary because Postgres uses a 32-bit int for binding input parameters and - // is not able to track anything larger. - // Note: Change this value when you know the size of data to bulk insert at once. Otherwise, you - // might encounter the postgres 65535 parameter limit error. - insertBatchSize int // hold the connection state to avoid multiple connection of the same instance connected *atomic.Bool } @@ -78,7 +71,6 @@ func NewStateStore(config *Config) *DurableStore { return &DurableStore{ db: db, sb: sq.StatementBuilder.PlaceholderFormat(sq.Dollar), - insertBatchSize: 500, connected: atomic.NewBool(false), } } From c7cf58d4550aa273df76249441220dc0876122ff Mon Sep 17 00:00:00 2001 From: Mohamad Fadhil Date: Tue, 17 Dec 2024 15:48:39 +0800 Subject: [PATCH 2/2] refactor: use upsert statement when inserting to postgres state store (#105) Signed-off-by: Mohamad Fadhil --- plugins/statestore/postgres/postgres.go | 19 ++++++++----------- resources/durablestore_postgres.sql | 2 ++ 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/plugins/statestore/postgres/postgres.go b/plugins/statestore/postgres/postgres.go index c4f2058..1c8a51b 100644 --- a/plugins/statestore/postgres/postgres.go +++ b/plugins/statestore/postgres/postgres.go @@ -135,15 +135,6 @@ func (s *DurableStore) WriteState(ctx context.Context, state *egopb.DurableState return fmt.Errorf("failed to obtain a database transaction: %w", err) } - query, args, err := s.sb.Delete(tableName).Where(sq.Eq{"persistence_id": state.GetPersistenceId()}).ToSql() - if err != nil { - return fmt.Errorf("failed to build the delete sql statement: %w", err) - } - - if _, err := s.db.Exec(ctx, query, args...); err != nil { - return fmt.Errorf("failed to delete durable state from the database: %w", err) - } - bytea, _ := proto.Marshal(state.GetResultingState()) manifest := string(state.GetResultingState().ProtoReflect().Descriptor().FullName()) @@ -157,9 +148,15 @@ func (s *DurableStore) WriteState(ctx context.Context, state *egopb.DurableState manifest, state.GetTimestamp(), state.GetShard(), - ) + ).Suffix("ON CONFLICT (persistence_id) " + + "DO UPDATE SET " + + "version_number = excluded.version_number," + + "state_payload = excluded.state_payload, " + + "state_manifest = excluded.state_manifest," + + "timestamp = excluded.timestamp", + ) - query, args, err = statement.ToSql() + query, args, err := statement.ToSql() if err != nil { return fmt.Errorf("unable to build sql insert statement: %w", err) } diff --git a/resources/durablestore_postgres.sql b/resources/durablestore_postgres.sql index 95f7a0a..83eefa9 100644 --- a/resources/durablestore_postgres.sql +++ b/resources/durablestore_postgres.sql @@ -32,4 +32,6 @@ CREATE TABLE IF NOT EXISTS states_store shard_number BIGINT NOT NULL, PRIMARY KEY (persistence_id, version_number) + CREATE INDEX IF NOT EXISTS idx_states_store_persistence_id ON events_store(persistence_id); + CREATE INDEX IF NOT EXISTS idx_states_store_version_number ON events_store(version_number); );