diff --git a/.github/workflows/pr-validation.yml b/.github/workflows/pr-validation.yml index 91286f5..c927193 100644 --- a/.github/workflows/pr-validation.yml +++ b/.github/workflows/pr-validation.yml @@ -27,6 +27,26 @@ jobs: strategy: matrix: go-version: [ '1.22.x', '1.21.x', '1.20.x' ] + + # Boot up a local, clean postgres instance for the postgres tests + services: + # Label used to access the service container + postgres: + # Docker Hub image + image: postgres:13.18 + env: + POSTGRES_PASSWORD: postgres + POSTGRES_USER: postgres + POSTGRES_DB: postgres + # Set health checks to wait until postgres has started + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + # Maps tcp port 5432 on service container to the host + - 5432:5432 steps: # Checks-out your repository under $GITHUB_WORKSPACE @@ -57,4 +77,6 @@ jobs: run: protoc --go_out=. --go-grpc_out=. -I ./submodules/durabletask-protobuf/protos orchestrator_service.proto - name: Run integration tests + env: + POSTGRES_ENABLED: "true" run: go test ./tests/... -coverpkg ./api,./task,./client,./backend/...,./internal/helpers diff --git a/.gitignore b/.gitignore index fd3480c..e3219ac 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,6 @@ # sqlite databases generated by testing *.sqlite3 -__debug_bin \ No newline at end of file +__debug_bin + +.idea/ \ No newline at end of file diff --git a/backend/postgres/REAEDME.md b/backend/postgres/REAEDME.md new file mode 100644 index 0000000..7028284 --- /dev/null +++ b/backend/postgres/REAEDME.md @@ -0,0 +1,3 @@ +# Postgres Backend +### Testing +By default, the postgres tests are skipped. To run the tests, set the environment variable `POSTGRES_ENABLED` to `true` before running the tests and have a postgres server running on `localhost:5432` with a database named `postgres` and a user `postgres` with password `postgres`. diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go new file mode 100644 index 0000000..cf119de --- /dev/null +++ b/backend/postgres/postgres.go @@ -0,0 +1,1004 @@ +package postgres + +import ( + "context" + _ "embed" + "errors" + "fmt" + "os" + "strings" + "time" + + "github.com/google/uuid" + "github.com/microsoft/durabletask-go/api" + "github.com/microsoft/durabletask-go/backend" + "github.com/microsoft/durabletask-go/internal/helpers" + "github.com/microsoft/durabletask-go/internal/protos" + "google.golang.org/protobuf/proto" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" +) + +//go:embed schema.sql +var schema string + +var emptyString string = "" + +type PostgresOptions struct { + PgOptions *pgxpool.Config + OrchestrationLockTimeout time.Duration + ActivityLockTimeout time.Duration +} + +type postgresBackend struct { + db *pgxpool.Pool + workerName string + logger backend.Logger + options *PostgresOptions +} + +// NewPostgresOptions creates a new options object for the postgres backend provider. +func NewPostgresOptions(host string, port uint16, database string, user string, password string) *PostgresOptions { + conf, err := pgxpool.ParseConfig(fmt.Sprintf("postgresql://%s:%s@%s:%d/%s", user, password, host, port, database)) + if err != nil { + panic(fmt.Errorf("failed to parse the postgres connection string: %w", err)) + } + conf.ConnConfig.Config.ConnectTimeout = 2 * time.Minute + conf.MaxConnLifetime = 2 * time.Minute + conf.MaxConnIdleTime = 2 * time.Minute + conf.MaxConns = 1 + + return &PostgresOptions{ + PgOptions: conf, + OrchestrationLockTimeout: 2 * time.Minute, + ActivityLockTimeout: 2 * time.Minute, + } +} + +// NewPostgresBackend creates a new postgres-based Backend object. +func NewPostgresBackend(opts *PostgresOptions, logger backend.Logger) backend.Backend { + hostname, err := os.Hostname() + if err != nil { + hostname = "unknown" + } + + pid := os.Getpid() + uuidStr := uuid.NewString() + + if opts == nil { + opts = NewPostgresOptions("localhost", 5432, "postgres", "postgres", "postgres") + } + + return &postgresBackend{ + db: nil, + workerName: fmt.Sprintf("%s,%d,%s", hostname, pid, uuidStr), + options: opts, + logger: logger, + } +} + +// CreateTaskHub creates the postgres database and applies the schema +func (be *postgresBackend) CreateTaskHub(context.Context) error { + pool, err := pgxpool.NewWithConfig(context.Background(), be.options.PgOptions) + if err != nil { + be.logger.Error("CreateTaskHub", "failed to create a new postgres pool", err) + return err + } + be.db = pool + // Initialize database + if _, err := be.db.Exec(context.Background(), schema); err != nil { + panic(fmt.Errorf("failed to initialize the database: %w", err)) + } + + return nil +} + +func (be *postgresBackend) DeleteTaskHub(ctx context.Context) error { + if be.db == nil { + return nil + } + + _, err := be.db.Exec(ctx, "DROP TABLE IF EXISTS Instances CASCADE") + if err != nil { + be.logger.Error("DeleteTaskHub", "failed to drop Instances table", err) + return fmt.Errorf("failed to drop Instances table: %w", err) + } + _, err = be.db.Exec(ctx, "DROP TABLE IF EXISTS History CASCADE") + if err != nil { + be.logger.Error("DeleteTaskHub", "failed to drop History table", err) + return fmt.Errorf("failed to drop History table: %w", err) + } + _, err = be.db.Exec(ctx, "DROP TABLE IF EXISTS NewEvents CASCADE") + if err != nil { + be.logger.Error("DeleteTaskHub", "failed to drop NewEvents table", err) + return fmt.Errorf("failed to drop NewEvents table: %w", err) + } + _, err = be.db.Exec(ctx, "DROP TABLE IF EXISTS NewTasks CASCADE") + if err != nil { + be.logger.Error("DeleteTaskHub", "failed to drop NewTasks table", err) + return fmt.Errorf("failed to drop NewTasks table: %w", err) + } + + be.db.Close() + be.db = nil + + return nil +} + +// AbandonOrchestrationWorkItem implements backend.Backend +func (be *postgresBackend) AbandonOrchestrationWorkItem(ctx context.Context, wi *backend.OrchestrationWorkItem) error { + if err := be.ensureDB(); err != nil { + return err + } + + tx, err := be.db.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return err + } + defer tx.Rollback(ctx) + + var visibleTime *time.Time = nil + if delay := wi.GetAbandonDelay(); delay > 0 { + t := time.Now().UTC().Add(delay) + visibleTime = &t + } + + dbResult, err := tx.Exec( + ctx, + "UPDATE NewEvents SET LockedBy = NULL, VisibleTime = $1 WHERE InstanceID = $2 AND LockedBy = $3", + visibleTime, + string(wi.InstanceID), + wi.LockedBy, + ) + if err != nil { + return fmt.Errorf("failed to update NewEvents table: %w", err) + } + + rowsAffected := dbResult.RowsAffected() + if err != nil { + return fmt.Errorf("failed get rows affected by UPDATE NewEvents statement: %w", err) + } else if rowsAffected == 0 { + return backend.ErrWorkItemLockLost + } + + dbResult, err = tx.Exec( + ctx, + "UPDATE Instances SET LockedBy = NULL, LockExpiration = NULL WHERE InstanceID = $1 AND LockedBy = $2", + string(wi.InstanceID), + wi.LockedBy, + ) + + if err != nil { + return fmt.Errorf("failed to update Instances table: %w", err) + } + + rowsAffected = dbResult.RowsAffected() + if err != nil { + return fmt.Errorf("failed get rows affected by UPDATE Instances statement: %w", err) + } else if rowsAffected == 0 { + return backend.ErrWorkItemLockLost + } + + if err = tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + return nil +} + +// CompleteOrchestrationWorkItem implements backend.Backend +func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi *backend.OrchestrationWorkItem) error { + if err := be.ensureDB(); err != nil { + return err + } + + tx, err := be.db.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return err + } + defer tx.Rollback(ctx) + + now := time.Now().UTC() + + // Dynamically generate the UPDATE statement for the Instances table + var sqlSB strings.Builder + sqlSB.WriteString("UPDATE Instances SET ") + + sqlUpdateArgs := make([]interface{}, 0, 10) + isCreated := false + isCompleted := false + + currIndex := 1 + for _, e := range wi.State.NewEvents() { + if es := e.GetExecutionStarted(); es != nil { + if isCreated { + // TODO: Log warning about duplicate start event + continue + } + isCreated = true + sqlSB.WriteString(fmt.Sprintf("CreatedTime = $%d, Input = $%d, ", currIndex, currIndex+1)) + currIndex += 2 + sqlUpdateArgs = append(sqlUpdateArgs, e.Timestamp.AsTime()) + sqlUpdateArgs = append(sqlUpdateArgs, es.Input.GetValue()) + } else if ec := e.GetExecutionCompleted(); ec != nil { + if isCompleted { + // TODO: Log warning about duplicate completion event + continue + } + isCompleted = true + sqlSB.WriteString(fmt.Sprintf("CompletedTime = $%d, Output = $%d, FailureDetails = $%d, ", currIndex, currIndex+1, currIndex+2)) + currIndex += 3 + sqlUpdateArgs = append(sqlUpdateArgs, now) + sqlUpdateArgs = append(sqlUpdateArgs, ec.Result.GetValue()) + if ec.FailureDetails != nil { + bytes, err := proto.Marshal(ec.FailureDetails) + if err != nil { + return fmt.Errorf("failed to marshal FailureDetails: %w", err) + } + sqlUpdateArgs = append(sqlUpdateArgs, &bytes) + } else { + sqlUpdateArgs = append(sqlUpdateArgs, nil) + } + } + // TODO: Execution suspended & resumed + } + + if wi.State.CustomStatus != nil { + sqlSB.WriteString(fmt.Sprintf("CustomStatus = $%d, ", currIndex)) + currIndex++ + sqlUpdateArgs = append(sqlUpdateArgs, wi.State.CustomStatus.Value) + } + + // TODO: Support for stickiness, which would extend the LockExpiration + sqlSB.WriteString(fmt.Sprintf("RuntimeStatus = $%d, LastUpdatedTime = $%d, LockExpiration = NULL WHERE InstanceID = $%d AND LockedBy = $%d", currIndex, currIndex+1, currIndex+2, currIndex+3)) + currIndex += 4 + sqlUpdateArgs = append(sqlUpdateArgs, helpers.ToRuntimeStatusString(wi.State.RuntimeStatus()), now, string(wi.InstanceID), wi.LockedBy) + + result, err := tx.Exec(ctx, sqlSB.String(), sqlUpdateArgs...) + if err != nil { + return fmt.Errorf("failed to update Instances table: %w", err) + } + + count := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get the number of rows affected by the Instance table update: %w", err) + } else if count == 0 { + return fmt.Errorf("instance '%s' no longer exists or was locked by a different worker", string(wi.InstanceID)) + } + + // If continue-as-new, delete all existing history + if wi.State.ContinuedAsNew() { + if _, err := tx.Exec(ctx, "DELETE FROM History WHERE InstanceID = $1", string(wi.InstanceID)); err != nil { + return fmt.Errorf("failed to delete from History table: %w", err) + } + } + + // Save new history events + newHistoryCount := len(wi.State.NewEvents()) + if newHistoryCount > 0 { + builder := strings.Builder{} + builder.WriteString("INSERT INTO History (InstanceID, SequenceNumber, EventPayload) VALUES ") + for i := 0; i < newHistoryCount; i++ { + builder.WriteString(fmt.Sprintf("($%d, $%d, $%d)", 3*i+1, 3*i+2, 3*i+3)) + if i < newHistoryCount-1 { + builder.WriteString(", ") + } + } + query := builder.String() + + args := make([]interface{}, 0, newHistoryCount*3) + nextSequenceNumber := len(wi.State.OldEvents()) + for _, e := range wi.State.NewEvents() { + eventPayload, err := backend.MarshalHistoryEvent(e) + if err != nil { + return err + } + + args = append(args, string(wi.InstanceID), nextSequenceNumber, eventPayload) + nextSequenceNumber++ + } + + _, err = tx.Exec(ctx, query, args...) + if err != nil { + return fmt.Errorf("failed to insert into the History table: %w", err) + } + } + + // Save outbound activity tasks + newActivityCount := len(wi.State.PendingTasks()) + if newActivityCount > 0 { + builder := strings.Builder{} + builder.WriteString("INSERT INTO NewTasks (InstanceID, EventPayload) VALUES ") + for i := 0; i < newActivityCount; i++ { + builder.WriteString(fmt.Sprintf("($%d, $%d)", 3*i+1, 3*i+2)) + if i < newActivityCount-1 { + builder.WriteString(", ") + } + } + insertSql := builder.String() + + sqlInsertArgs := make([]interface{}, 0, newActivityCount*2) + for _, e := range wi.State.PendingTasks() { + eventPayload, err := backend.MarshalHistoryEvent(e) + if err != nil { + return err + } + + sqlInsertArgs = append(sqlInsertArgs, string(wi.InstanceID), eventPayload) + } + + _, err = tx.Exec(ctx, insertSql, sqlInsertArgs...) + if err != nil { + return fmt.Errorf("failed to insert into the NewTasks table: %w", err) + } + } + + // Save outbound orchestrator events + newEventCount := len(wi.State.PendingTimers()) + len(wi.State.PendingMessages()) + if newEventCount > 0 { + builder := strings.Builder{} + builder.WriteString("INSERT INTO NewEvents (InstanceID, EventPayload, VisibleTime) VALUES ") + for i := 0; i < newEventCount; i++ { + builder.WriteString(fmt.Sprintf("($%d, $%d, $%d)", 3*i+1, 3*i+2, 3*i+3)) + if i < newEventCount-1 { + builder.WriteString(", ") + } + } + insertSql := builder.String() + + sqlInsertArgs := make([]interface{}, 0, newEventCount*3) + for _, e := range wi.State.PendingTimers() { + eventPayload, err := backend.MarshalHistoryEvent(e) + if err != nil { + return err + } + + visibileTime := e.GetTimerFired().GetFireAt().AsTime() + sqlInsertArgs = append(sqlInsertArgs, string(wi.InstanceID), eventPayload, visibileTime) + } + + for _, msg := range wi.State.PendingMessages() { + if es := msg.HistoryEvent.GetExecutionStarted(); es != nil { + // Need to insert a new row into the DB + if _, err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx); err != nil { + if errors.Is(err, backend.ErrDuplicateEvent) { + be.logger.Warnf( + "%v: dropping sub-orchestration creation event because an instance with the target ID (%v) already exists.", + wi.InstanceID, + es.OrchestrationInstance.InstanceId) + } else { + return err + } + } + } + + eventPayload, err := backend.MarshalHistoryEvent(msg.HistoryEvent) + if err != nil { + return err + } + + sqlInsertArgs = append(sqlInsertArgs, msg.TargetInstanceID, eventPayload, nil) + } + + _, err = tx.Exec(ctx, insertSql, sqlInsertArgs...) + if err != nil { + return fmt.Errorf("failed to insert into the NewEvents table: %w", err) + } + } + + // Delete inbound events + dbResult, err := tx.Exec( + ctx, + "DELETE FROM NewEvents WHERE InstanceID = $1 AND LockedBy = $2", + string(wi.InstanceID), + wi.LockedBy, + ) + if err != nil { + return fmt.Errorf("failed to delete from NewEvents table: %w", err) + } + + rowsAffected := dbResult.RowsAffected() + if err != nil { + return fmt.Errorf("failed get rows affected by delete statement: %w", err) + } else if rowsAffected == 0 { + return backend.ErrWorkItemLockLost + } + + if err != nil { + return fmt.Errorf("failed to delete from the NewEvents table: %w", err) + } + + if err = tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + return nil +} + +// CreateOrchestrationInstance implements backend.Backend +func (be *postgresBackend) CreateOrchestrationInstance(ctx context.Context, e *backend.HistoryEvent, opts ...backend.OrchestrationIdReusePolicyOptions) error { + if err := be.ensureDB(); err != nil { + return err + } + + tx, err := be.db.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return fmt.Errorf("failed to start transaction: %w", err) + } + defer tx.Rollback(ctx) + + var instanceID string + if instanceID, err = be.createOrchestrationInstanceInternal(ctx, e, tx, opts...); errors.Is(err, api.ErrIgnoreInstance) { + // choose to ignore, do nothing + return nil + } else if err != nil { + return err + } + + eventPayload, err := backend.MarshalHistoryEvent(e) + if err != nil { + return err + } + + _, err = tx.Exec( + ctx, + `INSERT INTO NewEvents (InstanceID, EventPayload) VALUES ($1, $2)`, + instanceID, + eventPayload, + ) + + if err != nil { + return fmt.Errorf("failed to insert row into NewEvents table: %w", err) + } + + if err = tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to create orchestration: %w", err) + } + + return nil +} + +func (be *postgresBackend) createOrchestrationInstanceInternal(ctx context.Context, e *backend.HistoryEvent, tx pgx.Tx, opts ...backend.OrchestrationIdReusePolicyOptions) (string, error) { + if e == nil { + return "", errors.New("HistoryEvent must be non-nil") + } else if e.Timestamp == nil { + return "", errors.New("HistoryEvent must have a non-nil timestamp") + } + + startEvent := e.GetExecutionStarted() + if startEvent == nil { + return "", errors.New("HistoryEvent must be an ExecutionStartedEvent") + } + instanceID := startEvent.OrchestrationInstance.InstanceId + + policy := &protos.OrchestrationIdReusePolicy{} + + for _, opt := range opts { + opt(policy) + } + + rows, err := insertOrIgnoreInstanceTableInternal(ctx, tx, e, startEvent) + if err != nil { + return "", err + } + + // instance with same ID already exists + if rows <= 0 { + return instanceID, be.handleInstanceExists(ctx, tx, startEvent, policy, e) + } + return instanceID, nil +} + +func insertOrIgnoreInstanceTableInternal(ctx context.Context, tx pgx.Tx, e *backend.HistoryEvent, startEvent *protos.ExecutionStartedEvent) (int64, error) { + res, err := tx.Exec( + ctx, + `INSERT INTO Instances ( + Name, + Version, + InstanceID, + ExecutionID, + Input, + RuntimeStatus, + CreatedTime + ) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT DO NOTHING`, + startEvent.Name, + startEvent.Version.GetValue(), + startEvent.OrchestrationInstance.InstanceId, + startEvent.OrchestrationInstance.ExecutionId.GetValue(), + startEvent.Input.GetValue(), + "PENDING", + e.Timestamp.AsTime(), + ) + if err != nil { + return -1, fmt.Errorf("failed to insert into Instances table: %w", err) + } + + rows := res.RowsAffected() + if err != nil { + return -1, fmt.Errorf("failed to count the rows affected: %w", err) + } + return rows, nil +} + +func (be *postgresBackend) handleInstanceExists(ctx context.Context, tx pgx.Tx, startEvent *protos.ExecutionStartedEvent, policy *protos.OrchestrationIdReusePolicy, e *backend.HistoryEvent) error { + // query RuntimeStatus for the existing instance + queryRow := tx.QueryRow( + ctx, + `SELECT RuntimeStatus FROM Instances WHERE InstanceID = $1`, + startEvent.OrchestrationInstance.InstanceId, + ) + var runtimeStatus *string + err := queryRow.Scan(&runtimeStatus) + if errors.Is(err, pgx.ErrNoRows) { + return api.ErrInstanceNotFound + } else if err != nil { + return fmt.Errorf("failed to scan the Instances table result: %w", err) + } + + // status not match, return instance duplicate error + if !isStatusMatch(policy.OperationStatus, helpers.FromRuntimeStatusString(*runtimeStatus)) { + return api.ErrDuplicateInstance + } + + // status match + switch policy.Action { + case protos.CreateOrchestrationAction_IGNORE: + // Log an warning message and ignore creating new instance + be.logger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", startEvent.OrchestrationInstance.InstanceId) + return api.ErrIgnoreInstance + case protos.CreateOrchestrationAction_TERMINATE: + // terminate existing instance + if err := be.cleanupOrchestrationStateInternal(ctx, tx, api.InstanceID(startEvent.OrchestrationInstance.InstanceId), false); err != nil { + return fmt.Errorf("failed to cleanup orchestration status: %w", err) + } + // create a new instance + var rows int64 + if rows, err = insertOrIgnoreInstanceTableInternal(ctx, tx, e, startEvent); err != nil { + return err + } + + // should never happen, because we clean up instance before create new one + if rows <= 0 { + return fmt.Errorf("failed to insert into Instances table because entry already exists") + } + return nil + } + // default behavior + return api.ErrDuplicateInstance +} + +func isStatusMatch(statuses []protos.OrchestrationStatus, runtimeStatus protos.OrchestrationStatus) bool { + for _, status := range statuses { + if status == runtimeStatus { + return true + } + } + return false +} + +func (be *postgresBackend) cleanupOrchestrationStateInternal(ctx context.Context, tx pgx.Tx, id api.InstanceID, requireCompleted bool) error { + row := tx.QueryRow(ctx, "SELECT 1 FROM Instances WHERE InstanceID = $1", string(id)) + var unused int + if err := row.Scan(&unused); errors.Is(err, pgx.ErrNoRows) { + return api.ErrInstanceNotFound + } else if err != nil { + return fmt.Errorf("failed to scan instance existence: %w", err) + } + + if requireCompleted { + // purge orchestration in ['COMPLETED', 'FAILED', 'TERMINATED'] + dbResult, err := tx.Exec(ctx, "DELETE FROM Instances WHERE InstanceID = $1 AND RuntimeStatus IN ('COMPLETED', 'FAILED', 'TERMINATED')", string(id)) + if err != nil { + return fmt.Errorf("failed to delete from the Instances table: %w", err) + } + + rowsAffected := dbResult.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected in Instances delete operation: %w", err) + } + if rowsAffected == 0 { + return api.ErrNotCompleted + } + } else { + // clean up orchestration in all RuntimeStatus + _, err := tx.Exec(ctx, "DELETE FROM Instances WHERE InstanceID = $1", string(id)) + if err != nil { + return fmt.Errorf("failed to delete from the Instances table: %w", err) + } + } + + _, err := tx.Exec(ctx, "DELETE FROM History WHERE InstanceID = $1", string(id)) + if err != nil { + return fmt.Errorf("failed to delete from History table: %w", err) + } + + _, err = tx.Exec(ctx, "DELETE FROM NewEvents WHERE InstanceID = $1", string(id)) + if err != nil { + return fmt.Errorf("failed to delete from NewEvents table: %w", err) + } + + _, err = tx.Exec(ctx, "DELETE FROM NewTasks WHERE InstanceID = $1", string(id)) + if err != nil { + return fmt.Errorf("failed to delete from NewTasks table: %w", err) + } + return nil +} + +func (be *postgresBackend) AddNewOrchestrationEvent(ctx context.Context, iid api.InstanceID, e *backend.HistoryEvent) error { + if e == nil { + return errors.New("HistoryEvent must be non-nil") + } else if e.Timestamp == nil { + return errors.New("HistoryEvent must have a non-nil timestamp") + } + + eventPayload, err := backend.MarshalHistoryEvent(e) + if err != nil { + return err + } + + _, err = be.db.Exec( + ctx, + `INSERT INTO NewEvents (InstanceID, EventPayload) VALUES ($1, $2)`, + string(iid), + eventPayload, + ) + + if err != nil { + return fmt.Errorf("failed to insert row into NewEvents table: %w", err) + } + + return nil +} + +// GetOrchestrationMetadata implements backend.Backend +func (be *postgresBackend) GetOrchestrationMetadata(ctx context.Context, iid api.InstanceID) (*api.OrchestrationMetadata, error) { + if err := be.ensureDB(); err != nil { + return nil, err + } + + row := be.db.QueryRow( + ctx, + `SELECT InstanceID, Name, RuntimeStatus, CreatedTime, LastUpdatedTime, Input, Output, CustomStatus, FailureDetails + FROM Instances WHERE InstanceID = $1`, + string(iid), + ) + + var instanceID *string + var name *string + var runtimeStatus *string + var createdAt *time.Time + var lastUpdatedAt *time.Time + var input *string + var output *string + var customStatus *string + var failureDetails *protos.TaskFailureDetails + + var failureDetailsPayload []byte + err := row.Scan(&instanceID, &name, &runtimeStatus, &createdAt, &lastUpdatedAt, &input, &output, &customStatus, &failureDetailsPayload) + if errors.Is(err, pgx.ErrNoRows) { + return nil, api.ErrInstanceNotFound + } else if err != nil { + return nil, fmt.Errorf("failed to scan the Instances table result: %w", err) + } + + if input == nil { + input = &emptyString + } + + if output == nil { + output = &emptyString + } + + if customStatus == nil { + customStatus = &emptyString + } + + if len(failureDetailsPayload) > 0 { + failureDetails = new(protos.TaskFailureDetails) + if err := proto.Unmarshal(failureDetailsPayload, failureDetails); err != nil { + return nil, fmt.Errorf("failed to unmarshal failure details: %w", err) + } + } + + metadata := api.NewOrchestrationMetadata( + iid, + *name, + helpers.FromRuntimeStatusString(*runtimeStatus), + *createdAt, + *lastUpdatedAt, + *input, + *output, + *customStatus, + failureDetails, + ) + return metadata, nil +} + +// GetOrchestrationRuntimeState implements backend.Backend +func (be *postgresBackend) GetOrchestrationRuntimeState(ctx context.Context, wi *backend.OrchestrationWorkItem) (*backend.OrchestrationRuntimeState, error) { + if err := be.ensureDB(); err != nil { + return nil, err + } + + rows, err := be.db.Query( + ctx, + "SELECT EventPayload FROM History WHERE InstanceID = $1 ORDER BY SequenceNumber ASC", + string(wi.InstanceID), + ) + if err != nil { + return nil, err + } + defer rows.Close() + + existingEvents := make([]*protos.HistoryEvent, 0, 50) + for rows.Next() { + var eventPayload []byte + if err := rows.Scan(&eventPayload); err != nil { + return nil, fmt.Errorf("failed to read history event: %w", err) + } + + e, err := backend.UnmarshalHistoryEvent(eventPayload) + if err != nil { + return nil, err + } + + existingEvents = append(existingEvents, e) + } + + state := backend.NewOrchestrationRuntimeState(wi.InstanceID, existingEvents) + return state, nil +} + +// GetOrchestrationWorkItem implements backend.Backend +func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backend.OrchestrationWorkItem, error) { + if err := be.ensureDB(); err != nil { + return nil, err + } + + tx, err := be.db.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return nil, err + } + defer tx.Rollback(ctx) + + now := time.Now().UTC() + newLockExpiration := now.Add(be.options.OrchestrationLockTimeout) + + // Place a lock on an orchestration instance that has new events that are ready to be executed. + row := tx.QueryRow( + ctx, + `UPDATE Instances SET LockedBy = $1, LockExpiration = $2 + WHERE SequenceNumber = ( + SELECT SequenceNumber FROM Instances I + WHERE (I.LockExpiration IS NULL OR I.LockExpiration < $3) AND EXISTS ( + SELECT 1 FROM NewEvents E + WHERE E.InstanceID = I.InstanceID AND (E.VisibleTime IS NULL OR E.VisibleTime < $4) + ) + LIMIT 1 + ) RETURNING InstanceID`, + be.workerName, // LockedBy for Instances table + newLockExpiration, // Updated LockExpiration for Instances table + now, // LockExpiration for Instances table + now, // VisibleTime for NewEvents table + ) + + var instanceID string + if err := row.Scan(&instanceID); err != nil { + if errors.Is(err, pgx.ErrNoRows) { + // No new events to process + return nil, backend.ErrNoWorkItems + } + + return nil, fmt.Errorf("failed to scan the orchestration work-item: %w", err) + } + + // TODO: Get all the unprocessed events associated with the locked instance + events, err := tx.Query( + ctx, + `UPDATE NewEvents SET DequeueCount = DequeueCount + 1, LockedBy = $1 WHERE SequenceNumber IN ( + SELECT SequenceNumber FROM NewEvents + WHERE InstanceID = $2 AND (VisibleTime IS NULL OR VisibleTime <= $3) + LIMIT 1000 + ) + RETURNING EventPayload, DequeueCount`, + be.workerName, + instanceID, + now, + ) + if err != nil { + return nil, fmt.Errorf("failed to query for orchestration work-items: %w", err) + } + defer events.Close() + + maxDequeueCount := int32(0) + + newEvents := make([]*protos.HistoryEvent, 0, 10) + for events.Next() { + var eventPayload []byte + var dequeueCount int32 + if err := events.Scan(&eventPayload, &dequeueCount); err != nil { + return nil, fmt.Errorf("failed to read history event: %w", err) + } + + if dequeueCount > maxDequeueCount { + maxDequeueCount = dequeueCount + } + + e, err := backend.UnmarshalHistoryEvent(eventPayload) + if err != nil { + return nil, err + } + + newEvents = append(newEvents, e) + } + + if err = tx.Commit(ctx); err != nil { + return nil, fmt.Errorf("failed to update orchestration work-item: %w", err) + } + + wi := &backend.OrchestrationWorkItem{ + InstanceID: api.InstanceID(instanceID), + NewEvents: newEvents, + LockedBy: be.workerName, + RetryCount: maxDequeueCount - 1, + } + + return wi, nil +} + +func (be *postgresBackend) GetActivityWorkItem(ctx context.Context) (*backend.ActivityWorkItem, error) { + if err := be.ensureDB(); err != nil { + return nil, err + } + + now := time.Now().UTC() + newLockExpiration := now.Add(be.options.OrchestrationLockTimeout) + + row := be.db.QueryRow( + ctx, + `UPDATE NewTasks SET LockedBy = $1, LockExpiration = $2, DequeueCount = DequeueCount + 1 + WHERE SequenceNumber = ( + SELECT SequenceNumber FROM NewTasks T + WHERE T.LockExpiration IS NULL OR T.LockExpiration < $3 + LIMIT 1 + ) RETURNING SequenceNumber, InstanceID, EventPayload`, + be.workerName, + newLockExpiration, + now, + ) + + var sequenceNumber int64 + var instanceID string + var eventPayload []byte + + if err := row.Scan(&sequenceNumber, &instanceID, &eventPayload); err != nil { + if errors.Is(err, pgx.ErrNoRows) { + // No new activity tasks to process + return nil, backend.ErrNoWorkItems + } + + return nil, fmt.Errorf("failed to scan the activity work-item: %w", err) + } + + e, err := backend.UnmarshalHistoryEvent(eventPayload) + if err != nil { + return nil, err + } + + wi := &backend.ActivityWorkItem{ + SequenceNumber: sequenceNumber, + InstanceID: api.InstanceID(instanceID), + NewEvent: e, + LockedBy: be.workerName, + } + return wi, nil +} + +func (be *postgresBackend) CompleteActivityWorkItem(ctx context.Context, wi *backend.ActivityWorkItem) error { + if err := be.ensureDB(); err != nil { + return err + } + + tx, err := be.db.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return err + } + defer tx.Rollback(ctx) + + bytes, err := backend.MarshalHistoryEvent(wi.Result) + if err != nil { + return err + } + + _, err = tx.Exec(ctx, "INSERT INTO NewEvents (InstanceID, EventPayload) VALUES ($1, $2)", string(wi.InstanceID), bytes) + if err != nil { + return fmt.Errorf("failed to insert into NewEvents table: %w", err) + } + + dbResult, err := tx.Exec(ctx, "DELETE FROM NewTasks WHERE SequenceNumber = $1 AND LockedBy = $2", wi.SequenceNumber, wi.LockedBy) + if err != nil { + return fmt.Errorf("failed to delete from NewTasks table: %w", err) + } + + rowsAffected := dbResult.RowsAffected() + if err != nil { + return fmt.Errorf("failed get rows affected by delete statement: %w", err) + } else if rowsAffected == 0 { + return backend.ErrWorkItemLockLost + } + + if err = tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + return nil +} + +func (be *postgresBackend) AbandonActivityWorkItem(ctx context.Context, wi *backend.ActivityWorkItem) error { + if err := be.ensureDB(); err != nil { + return err + } + + dbResult, err := be.db.Exec( + ctx, + "UPDATE NewTasks SET LockedBy = NULL, LockExpiration = NULL WHERE SequenceNumber = $1 AND LockedBy = $2", + wi.SequenceNumber, + wi.LockedBy, + ) + if err != nil { + return fmt.Errorf("failed to update the NewTasks table for abandon: %w", err) + } + + rowsAffected := dbResult.RowsAffected() + if err != nil { + return fmt.Errorf("failed get rows affected by update statement for abandon: %w", err) + } else if rowsAffected == 0 { + return backend.ErrWorkItemLockLost + } + + return nil +} + +func (be *postgresBackend) PurgeOrchestrationState(ctx context.Context, id api.InstanceID) error { + if err := be.ensureDB(); err != nil { + return err + } + + tx, err := be.db.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return err + } + defer tx.Rollback(ctx) + + if err := be.cleanupOrchestrationStateInternal(ctx, tx, id, true); err != nil { + return err + } + + if err = tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + return nil +} + +// Start implements backend.Backend +func (*postgresBackend) Start(context.Context) error { + return nil +} + +// Stop implements backend.Backend +func (*postgresBackend) Stop(context.Context) error { + return nil +} + +func (be *postgresBackend) ensureDB() error { + if be.db == nil { + return backend.ErrNotInitialized + } + return nil +} + +func (be *postgresBackend) String() string { + connectionURI := fmt.Sprintf("postgresql://%s:%s@%s:%d/%s", be.options.PgOptions.ConnConfig.User, be.options.PgOptions.ConnConfig.Password, be.options.PgOptions.ConnConfig.Host, be.options.PgOptions.ConnConfig.Port, be.options.PgOptions.ConnConfig.Database) + return connectionURI +} diff --git a/backend/postgres/schema.sql b/backend/postgres/schema.sql new file mode 100644 index 0000000..153a5ff --- /dev/null +++ b/backend/postgres/schema.sql @@ -0,0 +1,57 @@ +CREATE TABLE IF NOT EXISTS Instances ( + SequenceNumber SERIAL, + + InstanceID TEXT PRIMARY KEY NOT NULL, + ExecutionID TEXT NOT NULL, + Name TEXT NOT NULL, -- the type name of the orchestration or entity + Version TEXT NULL, -- the version of the orchestration (optional) + RuntimeStatus TEXT NOT NULL, + CreatedTime TIMESTAMP NOT NULL DEFAULT NOW(), + LastUpdatedTime TIMESTAMP NOT NULL DEFAULT NOW(), + CompletedTime TIMESTAMP NULL, + LockedBy TEXT NULL, + LockExpiration TIMESTAMP NULL, + Input TEXT NULL, + Output TEXT NULL, + CustomStatus TEXT NULL, + FailureDetails BYTEA NULL, + ParentInstanceID TEXT NULL +); + +-- This index is used by LockNext and Purge logic +CREATE INDEX IF NOT EXISTS IX_Instances_RuntimeStatus ON Instances(RuntimeStatus); + +-- This index is intended to help the performance of multi-instance query +CREATE INDEX IF NOT EXISTS IX_Instances_CreatedTime ON Instances(CreatedTime); + +CREATE TABLE IF NOT EXISTS History ( + InstanceID TEXT NOT NULL, + SequenceNumber SERIAL NOT NULL, + EventPayload BYTEA NOT NULL, + + PRIMARY KEY (InstanceID, SequenceNumber) +); + +CREATE TABLE IF NOT EXISTS NewEvents ( + SequenceNumber SERIAL PRIMARY KEY, -- order is important for FIFO + InstanceID TEXT NOT NULL, + ExecutionID TEXT NULL, + Timestamp TIMESTAMP NOT NULL DEFAULT NOW(), + VisibleTime TIMESTAMP NULL, -- for scheduled or abandoned messages + DequeueCount INTEGER NOT NULL DEFAULT 0, + LockedBy TEXT NULL, + EventPayload BYTEA NOT NULL, + + UNIQUE (InstanceID, SequenceNumber) +); + +CREATE TABLE IF NOT EXISTS NewTasks ( + SequenceNumber SERIAL PRIMARY KEY, -- order is important for FIFO + InstanceID TEXT NOT NULL, + ExecutionID TEXT NULL, + Timestamp TIMESTAMP NOT NULL DEFAULT NOW(), + DequeueCount INTEGER NOT NULL DEFAULT 0, + LockedBy TEXT NULL, + LockExpiration TIMESTAMP NULL, + EventPayload BYTEA NOT NULL +); \ No newline at end of file diff --git a/go.mod b/go.mod index ddbf901..20a091f 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,12 @@ module github.com/microsoft/durabletask-go -go 1.19 +go 1.21 require ( github.com/cenkalti/backoff/v4 v4.1.3 github.com/golang/protobuf v1.5.3 github.com/google/uuid v1.3.0 + github.com/jackc/pgx/v5 v5.7.1 github.com/marusama/semaphore/v2 v2.5.0 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0 @@ -24,21 +25,24 @@ require ( github.com/felixge/httpsnoop v1.0.3 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/mattn/go-isatty v0.0.16 // indirect - github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/openzipkin/zipkin-go v0.4.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/stretchr/objx v0.5.0 // indirect go.opentelemetry.io/otel/metric v1.18.0 // indirect - golang.org/x/mod v0.8.0 // indirect - golang.org/x/net v0.23.0 // indirect - golang.org/x/sys v0.18.0 // indirect - golang.org/x/text v0.14.0 // indirect - golang.org/x/tools v0.6.0 // indirect + golang.org/x/crypto v0.27.0 // indirect + golang.org/x/mod v0.17.0 // indirect + golang.org/x/net v0.25.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect + golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect - gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/uint128 v1.2.0 // indirect modernc.org/cc/v3 v3.40.0 // indirect diff --git a/go.sum b/go.sum index 88b2464..e709105 100644 --- a/go.sum +++ b/go.sum @@ -16,22 +16,32 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ= +github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs= +github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM= github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ= github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= -github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= -github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/openzipkin/zipkin-go v0.4.1 h1:kNd/ST2yLLWhaWrkgchya40TJabe8Hioj9udfPcEO5A= github.com/openzipkin/zipkin-go v0.4.1/go.mod h1:qY0VqDSN1pOBN94dBc6w2GJlWLiovAyg7Qt6/I9HecM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -39,10 +49,14 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= @@ -59,18 +73,21 @@ go.opentelemetry.io/otel/sdk v1.11.1 h1:F7KmQgoHljhUuJyA+9BiU+EkJfyX5nVVF4wyzWZp go.opentelemetry.io/otel/sdk v1.11.1/go.mod h1:/l3FE4SupHJ12TduVjUkZtlfFqDCQJlOlithYrdktys= go.opentelemetry.io/otel/trace v1.18.0 h1:NY+czwbHbmndxojTEKiSMHkG2ClNH2PwmcHrdo0JY10= go.opentelemetry.io/otel/trace v1.18.0/go.mod h1:T2+SGJGuYZY3bjj5rgh/hN7KIrlpWC5nS8Mjvzckz+0= -golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= -golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= +golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= @@ -81,8 +98,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= -gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= @@ -93,7 +110,9 @@ modernc.org/cc/v3 v3.40.0/go.mod h1:/bTg4dnWkSXowUO6ssQKnOV0yMVxDYNIsIrzqTFDGH0= modernc.org/ccgo/v3 v3.16.13 h1:Mkgdzl46i5F/CNR/Kj80Ri59hC8TKAhZrYSaqvkwzUw= modernc.org/ccgo/v3 v3.16.13/go.mod h1:2Quk+5YgpImhPjv2Qsob1DnZ/4som1lJTodubIcoUkY= modernc.org/ccorpus v1.11.6 h1:J16RXiiqiCgua6+ZvQot4yUuUy8zxgqbqEEUuGPlISk= +modernc.org/ccorpus v1.11.6/go.mod h1:2gEUTrWqdpH2pXsmTM1ZkjeSrUWDpjMu2T6m29L/ErQ= modernc.org/httpfs v1.0.6 h1:AAgIpFZRXuYnkjftxTAZwMIiwEqAfk8aVB2/oA6nAeM= +modernc.org/httpfs v1.0.6/go.mod h1:7dosgurJGp0sPaRanU53W4xZYKh14wfzX420oZADeHM= modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE= modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY= modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ= @@ -107,6 +126,8 @@ modernc.org/sqlite v1.22.1/go.mod h1:OrDj17Mggn6MhE+iPbBNf7RGKODDE9NFT0f3EwDzJqk modernc.org/strutil v1.1.3 h1:fNMm+oJklMGYfU9Ylcywl0CO5O6nTfaowNsh2wpPjzY= modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw= modernc.org/tcl v1.15.2 h1:C4ybAYCGJw968e+Me18oW55kD/FexcHbqH2xak1ROSY= +modernc.org/tcl v1.15.2/go.mod h1:3+k/ZaEbKrC8ePv8zJWPtBSW0V7Gg9g8rkmhI1Kfs3c= modernc.org/token v1.0.1 h1:A3qvTqOwexpfZZeyI0FeGPDlSWX5pjZu9hF4lU+EKWg= modernc.org/token v1.0.1/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= modernc.org/z v1.7.3 h1:zDJf6iHjrnB+WRD88stbXokugjyc0/pB91ri1gO6LZY= +modernc.org/z v1.7.3/go.mod h1:Ipv4tsdxZRbQyLq9Q1M6gdbkxYzdlrciF2Hi/lS7nWE= diff --git a/tests/backend_test.go b/tests/backend_test.go index 2b3f414..ff9c22b 100644 --- a/tests/backend_test.go +++ b/tests/backend_test.go @@ -3,6 +3,8 @@ package tests import ( "context" "fmt" + "github.com/microsoft/durabletask-go/backend/postgres" + "os" "reflect" "runtime" "testing" @@ -25,11 +27,21 @@ var ( sqliteFileOptions = sqlite.NewSqliteOptions("test.sqlite3") ) -var backends = []backend.Backend{ - sqlite.NewSqliteBackend(sqliteFileOptions, logger), - sqlite.NewSqliteBackend(sqliteInMemoryOptions, logger), +func getRunnableBackends() []backend.Backend { + var runnableBackends []backend.Backend + + runnableBackends = append(runnableBackends, sqlite.NewSqliteBackend(sqliteFileOptions, logger)) + runnableBackends = append(runnableBackends, sqlite.NewSqliteBackend(sqliteInMemoryOptions, logger)) + + if os.Getenv("POSTGRES_ENABLED") == "true" { + runnableBackends = append(runnableBackends, postgres.NewPostgresBackend(nil, logger)) + } + + return runnableBackends } +var backends = getRunnableBackends() + var completionStatusValues = []protos.OrchestrationStatus{ protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, protos.OrchestrationStatus_ORCHESTRATION_STATUS_TERMINATED, @@ -458,12 +470,11 @@ func workItemProcessingTestLogic( // State should be initialized with only "old" events assert.Empty(t, state.NewEvents()) assert.NotEmpty(t, state.OldEvents()) - // Validate orchestration metadata if metadata, ok := getOrchestrationMetadata(t, be, state.InstanceID()); ok { assert.Equal(t, defaultName, metadata.Name) assert.Equal(t, defaultInput, metadata.SerializedInput) - assert.Equal(t, createdTime, metadata.CreatedAt) + assert.Less(t, createdTime.Sub(metadata.CreatedAt).Abs(), time.Microsecond) // Some database backends (like postgres) don't support sub-microsecond precision assert.Equal(t, state.RuntimeStatus(), metadata.RuntimeStatus) validateMetadata(metadata)