From 106e66f891fd771696f7316695d9c3f328b31fe6 Mon Sep 17 00:00:00 2001 From: Gabriel Guerra Date: Tue, 28 May 2024 00:27:07 -0400 Subject: [PATCH 01/10] feat(db-migrations): first commit --- .../app/subsystems/aio/store/migrations.go | 86 +++++++++++ .../postgres/migrations/migration-0001.sql | 13 ++ .../store/postgres/migrations/schema-0000.sql | 128 +++++++++++++++ .../subsystems/aio/store/postgres/postgres.go | 146 +++++------------- .../sqlite/migrations/migration-0001.sql | 13 ++ .../store/sqlite/migrations/schema-0000.sql | 123 +++++++++++++++ .../app/subsystems/aio/store/sqlite/sqlite.go | 141 +++++------------ 7 files changed, 441 insertions(+), 209 deletions(-) create mode 100644 internal/app/subsystems/aio/store/migrations.go create mode 100644 internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql create mode 100644 internal/app/subsystems/aio/store/postgres/migrations/schema-0000.sql create mode 100644 internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql create mode 100644 internal/app/subsystems/aio/store/sqlite/migrations/schema-0000.sql diff --git a/internal/app/subsystems/aio/store/migrations.go b/internal/app/subsystems/aio/store/migrations.go new file mode 100644 index 00000000..1392a378 --- /dev/null +++ b/internal/app/subsystems/aio/store/migrations.go @@ -0,0 +1,86 @@ +package store + +import ( + "database/sql" + "embed" + "fmt" + "path/filepath" + "regexp" + "sort" + "strconv" + "strings" +) + +type migration struct { + Version int + Content []byte +} + +func ReadVersion(db *sql.DB) (int, error) { + var version int + err := db.QueryRow("SELECT id FROM migrations").Scan(&version) + if err != nil { + if strings.Contains(err.Error(), "no such table") { + return -1, nil + } + return 0, err + } + + return version, nil +} + +func ReadMigrations(migrationsFS embed.FS, currentVersion int) ([]migration, error) { + migrations := []migration{} + + entries, err := migrationsFS.ReadDir("migrations") + if err != nil { + return nil, err + } + + for _, entry := range entries { + filename := entry.Name() + content, err := migrationsFS.ReadFile("migrations/" + filename) + if err != nil { + return nil, err + } + + version, err := filenameVersion(filename) + if err != nil { + return nil, err + } + + // Skip migrations that are at or below the current version. + if version <= currentVersion { + continue + } + + migrations = append(migrations, migration{ + Version: version, + Content: content, + }) + } + + sort.Slice(migrations, func(i, j int) bool { + return migrations[i].Version < migrations[j].Version + }) + + return migrations, nil +} + +func filenameVersion(filename string) (int, error) { + // Use a regular expression to extract the version number + // from the filename by matching the first sequence of digits. + re := regexp.MustCompile(`\d+`) + versionStr := re.FindString(filepath.Base(filename)) + + if versionStr == "" { + return 0, fmt.Errorf("could not extract version from filename: %s", filename) + } + + version, err := strconv.Atoi(versionStr) + if err != nil { + return 0, fmt.Errorf("could not convert version string to int: %v", err) + } + + return version, nil +} diff --git a/internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql b/internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql new file mode 100644 index 00000000..a7ea0adb --- /dev/null +++ b/internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql @@ -0,0 +1,13 @@ +-- Insert initial migration record +INSERT INTO migrations (id) VALUES (1) ON CONFLICT(id) DO NOTHING; + +-- -- DropInvocationColumn +-- ALTER TABLE promises DROP COLUMN invocation; + +-- -- DropOldInvocationIndex +-- DROP INDEX x; +-- DROP INDEX x; + +-- -- AddNewInvocationIndex +-- CREATE INDEX +-- CREATE INDEX \ No newline at end of file diff --git a/internal/app/subsystems/aio/store/postgres/migrations/schema-0000.sql b/internal/app/subsystems/aio/store/postgres/migrations/schema-0000.sql new file mode 100644 index 00000000..1a4e9148 --- /dev/null +++ b/internal/app/subsystems/aio/store/postgres/migrations/schema-0000.sql @@ -0,0 +1,128 @@ +-- CreateTable +CREATE TABLE IF NOT EXISTS promises ( + id TEXT, + sort_id SERIAL, + state INTEGER DEFAULT 1, + param_headers JSONB, + param_data BYTEA, + value_headers JSONB, + value_data BYTEA, + timeout BIGINT, + idempotency_key_for_create TEXT, + idempotency_key_for_complete TEXT, + tags JSONB, + created_on BIGINT, + completed_on BIGINT, + PRIMARY KEY(id) +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_promises_sort_id ON promises(sort_id); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_promises_invocation ON promises((tags->>'resonate:invocation')); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_promises_timeout ON promises((tags->>'resonate:timeout')); + +-- CreateTable +CREATE TABLE IF NOT EXISTS schedules ( + id TEXT, + sort_id SERIAL, + description TEXT, + cron TEXT, + tags JSONB, + promise_id TEXT, + promise_timeout BIGINT, + promise_param_headers JSONB, + promise_param_data BYTEA, + promise_tags JSONB, + last_run_time BIGINT, + next_run_time BIGINT, + idempotency_key TEXT, + created_on BIGINT, + PRIMARY KEY(id) +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_schedules_sort_id ON schedules(sort_id); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_schedules_next_run_time ON schedules(next_run_time); + +-- CreateTable +CREATE TABLE IF NOT EXISTS tasks ( + id TEXT, + counter INTEGER, + promise_id TEXT, + claim_timeout BIGINT, + complete_timeout BIGINT, + promise_timeout BIGINT, + created_on BIGINT, + completed_on BIGINT, + is_completed BOOLEAN, + PRIMARY KEY(id) +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_tasks_id ON tasks(id); + +-- CreateTable +CREATE TABLE IF NOT EXISTS locks ( + resource_id TEXT, + process_id TEXT, + execution_id TEXT, + expiry_in_milliseconds BIGINT, + timeout BIGINT, + PRIMARY KEY(resource_id) +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_locks_acquire_id ON locks(resource_id, execution_id); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_locks_heartbeat_id ON locks(process_id); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_locks_timeout ON locks(timeout); + +-- CreateTable +CREATE TABLE IF NOT EXISTS timeouts ( + id TEXT, + time BIGINT, + PRIMARY KEY(id) +); + +-- CreateTable +CREATE TABLE IF NOT EXISTS subscriptions ( + id TEXT, + sort_id SERIAL, + promise_id TEXT, + url TEXT, + retry_policy BYTEA, + created_on BIGINT, + PRIMARY KEY(id, promise_id) +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_subscriptions_sort_id ON subscriptions(sort_id); + +-- CreateTable +CREATE TABLE IF NOT EXISTS notifications ( + id TEXT, + promise_id TEXT, + url TEXT, + retry_policy BYTEA, + time BIGINT, + attempt INTEGER, + PRIMARY KEY(id, promise_id) +); + +-- CreateTable +CREATE TABLE IF NOT EXISTS migrations ( + id INTEGER, + PRIMARY KEY(id) +); + +--- CreateSchemaVersion +INSERT INTO migrations (id) VALUES (0) ON CONFLICT(id) DO NOTHING; \ No newline at end of file diff --git a/internal/app/subsystems/aio/store/postgres/postgres.go b/internal/app/subsystems/aio/store/postgres/postgres.go index 0ebe9622..695d2a35 100644 --- a/internal/app/subsystems/aio/store/postgres/postgres.go +++ b/internal/app/subsystems/aio/store/postgres/postgres.go @@ -3,6 +3,8 @@ package postgres import ( "context" "database/sql" + "embed" + _ "embed" "encoding/json" "fmt" "net/url" @@ -26,113 +28,10 @@ import ( _ "github.com/lib/pq" ) -const ( - CREATE_TABLE_STATEMENT = ` - CREATE TABLE IF NOT EXISTS promises ( - id TEXT, - sort_id SERIAL, - state INTEGER DEFAULT 1, - param_headers JSONB, - param_data BYTEA, - value_headers JSONB, - value_data BYTEA, - timeout BIGINT, - idempotency_key_for_create TEXT, - idempotency_key_for_complete TEXT, - tags JSONB, - created_on BIGINT, - completed_on BIGINT, - PRIMARY KEY(id) - ); - - CREATE INDEX IF NOT EXISTS idx_promises_sort_id ON promises(sort_id); - CREATE INDEX IF NOT EXISTS idx_promises_invocation ON promises((tags->>'resonate:invocation')); - CREATE INDEX IF NOT EXISTS idx_promises_timeout ON promises((tags->>'resonate:timeout')); - - CREATE TABLE IF NOT EXISTS schedules ( - id TEXT, - sort_id SERIAL, - description TEXT, - cron TEXT, - tags JSONB, - promise_id TEXT, - promise_timeout BIGINT, - promise_param_headers JSONB, - promise_param_data BYTEA, - promise_tags JSONB, - last_run_time BIGINT, - next_run_time BIGINT, - idempotency_key TEXT, - created_on BIGINT, - PRIMARY KEY(id) - ); - - CREATE INDEX IF NOT EXISTS idx_schedules_sort_id ON schedules(sort_id); - CREATE INDEX IF NOT EXISTS idx_schedules_next_run_time ON schedules(next_run_time); - - CREATE TABLE IF NOT EXISTS tasks ( - id TEXT, - counter INTEGER, - promise_id TEXT, - claim_timeout BIGINT, - complete_timeout BIGINT, - promise_timeout BIGINT, - created_on BIGINT, - completed_on BIGINT, - is_completed BOOLEAN, - PRIMARY KEY(id) - ); - - CREATE INDEX IF NOT EXISTS idx_tasks_id ON tasks(id); - - CREATE TABLE IF NOT EXISTS locks ( - resource_id TEXT, - process_id TEXT, - execution_id TEXT, - expiry_in_milliseconds BIGINT, - timeout BIGINT, - PRIMARY KEY(resource_id) - ); - - CREATE INDEX IF NOT EXISTS idx_locks_acquire_id ON locks(resource_id, execution_id); - CREATE INDEX IF NOT EXISTS idx_locks_heartbeat_id ON locks(process_id); - CREATE INDEX IF NOT EXISTS idx_locks_timeout ON locks(timeout); - - CREATE TABLE IF NOT EXISTS timeouts ( - id TEXT, - time BIGINT, - PRIMARY KEY(id) - ); - - CREATE TABLE IF NOT EXISTS subscriptions ( - id TEXT, - sort_id SERIAL, - promise_id TEXT, - url TEXT, - retry_policy BYTEA, - created_on BIGINT, - PRIMARY KEY(id, promise_id) - ); - - CREATE INDEX IF NOT EXISTS idx_subscriptions_sort_id ON subscriptions(sort_id); - - CREATE TABLE IF NOT EXISTS notifications ( - id TEXT, - promise_id TEXT, - url TEXT, - retry_policy BYTEA, - time BIGINT, - attempt INTEGER, - PRIMARY KEY(id, promise_id) - ); - - CREATE TABLE IF NOT EXISTS migrations ( - id INTEGER, - PRIMARY KEY(id) - ); - - INSERT INTO migrations (id) VALUES (1) ON CONFLICT(id) DO NOTHING;` +//go:embed migrations/* +var migrationsFS embed.FS +const ( DROP_TABLE_STATEMENT = ` DROP TABLE notifications; DROP TABLE subscriptions; @@ -470,7 +369,40 @@ func (s *PostgresStore) String() string { } func (s *PostgresStore) Start() error { - if _, err := s.db.Exec(CREATE_TABLE_STATEMENT); err != nil { + currentVersion, err := store.ReadVersion(s.db) + if err != nil { + return err + } + + migrationsToApply, err := store.ReadMigrations(migrationsFS, currentVersion) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), s.config.TxTimeout) + defer cancel() + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return err + } + + defer func() { + if err != nil { + if rbErr := tx.Rollback(); rbErr != nil { + err = fmt.Errorf("tx failed: %v, unable to rollback: %v", err, rbErr) + } + } + }() + + for _, m := range migrationsToApply { + _, err = tx.Exec(string(m.Content)) + if err != nil { + return fmt.Errorf("failed to execute migration version %d: %w", m.Version, err) + } + } + + if err = tx.Commit(); err != nil { return err } diff --git a/internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql b/internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql new file mode 100644 index 00000000..a7ea0adb --- /dev/null +++ b/internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql @@ -0,0 +1,13 @@ +-- Insert initial migration record +INSERT INTO migrations (id) VALUES (1) ON CONFLICT(id) DO NOTHING; + +-- -- DropInvocationColumn +-- ALTER TABLE promises DROP COLUMN invocation; + +-- -- DropOldInvocationIndex +-- DROP INDEX x; +-- DROP INDEX x; + +-- -- AddNewInvocationIndex +-- CREATE INDEX +-- CREATE INDEX \ No newline at end of file diff --git a/internal/app/subsystems/aio/store/sqlite/migrations/schema-0000.sql b/internal/app/subsystems/aio/store/sqlite/migrations/schema-0000.sql new file mode 100644 index 00000000..89c6ca8d --- /dev/null +++ b/internal/app/subsystems/aio/store/sqlite/migrations/schema-0000.sql @@ -0,0 +1,123 @@ +-- CreateTable +CREATE TABLE IF NOT EXISTS promises ( + id TEXT UNIQUE, + sort_id INTEGER PRIMARY KEY AUTOINCREMENT, + state INTEGER DEFAULT 1, + param_headers BLOB, + param_data BLOB, + value_headers BLOB, + value_data BLOB, + timeout INTEGER, + idempotency_key_for_create TEXT, + idempotency_key_for_complete TEXT, + tags BLOB, + created_on INTEGER, + completed_on INTEGER +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_promises_id ON promises(id); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_promises_invocation ON promises(json_extract(tags, '$.resonate:invocation')); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_promises_timeout ON promises(json_extract(tags, '$.resonate:timeout')); + +-- CreateTable +CREATE TABLE IF NOT EXISTS schedules ( + id TEXT UNIQUE, + sort_id INTEGER PRIMARY KEY AUTOINCREMENT, + description TEXT, + cron TEXT, + tags BLOB, + promise_id TEXT, + promise_timeout INTEGER, + promise_param_headers BLOB, + promise_param_data BLOB, + promise_tags BLOB, + last_run_time INTEGER, + next_run_time INTEGER, + idempotency_key TEXT, + created_on INTEGER +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_schedules_id ON schedules(id); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_schedules_next_run_time ON schedules(next_run_time); + +-- CreateTable +CREATE TABLE IF NOT EXISTS tasks ( + id TEXT UNIQUE, + counter INTEGER, + promise_id TEXT, + claim_timeout INTEGER, + complete_timeout INTEGER, + promise_timeout INTEGER, + created_on INTEGER, + completed_on INTEGER, + is_completed BOOLEAN +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_tasks_id ON tasks(id); + +-- CreateTable +CREATE TABLE IF NOT EXISTS locks ( + resource_id TEXT UNIQUE, + process_id TEXT, + execution_id TEXT, + expiry_in_milliseconds INTEGER, + timeout INTEGER +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_locks_acquire_id ON locks(resource_id, execution_id); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_locks_heartbeat_id ON locks(process_id); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_locks_timeout ON locks(timeout); + +-- CreateTable +CREATE TABLE IF NOT EXISTS timeouts ( + id TEXT, + time INTEGER, + PRIMARY KEY(id) +); + +-- CreateTable +CREATE TABLE IF NOT EXISTS subscriptions ( + id TEXT, + promise_id TEXT, + sort_id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT, + retry_policy BLOB, + created_on INTEGER, + UNIQUE(id, promise_id) +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_subscriptions_id ON subscriptions(id); + +-- CreateTable +CREATE TABLE IF NOT EXISTS notifications ( + id TEXT, + promise_id TEXT, + url TEXT, + retry_policy BLOB, + time INTEGER, + attempt INTEGER, + PRIMARY KEY(id, promise_id) +); + +-- CreateTable +CREATE TABLE IF NOT EXISTS migrations ( + id INTEGER PRIMARY KEY +); + +--- CreateSchemaVersion +INSERT INTO migrations (id) VALUES (0) ON CONFLICT(id) DO NOTHING; \ No newline at end of file diff --git a/internal/app/subsystems/aio/store/sqlite/sqlite.go b/internal/app/subsystems/aio/store/sqlite/sqlite.go index 83539ce6..a57e67a8 100644 --- a/internal/app/subsystems/aio/store/sqlite/sqlite.go +++ b/internal/app/subsystems/aio/store/sqlite/sqlite.go @@ -3,6 +3,8 @@ package sqlite import ( "context" "database/sql" + "embed" + _ "embed" "encoding/json" "fmt" "os" @@ -26,108 +28,10 @@ import ( _ "github.com/mattn/go-sqlite3" ) -const ( - CREATE_TABLE_STATEMENT = ` - CREATE TABLE IF NOT EXISTS promises ( - id TEXT UNIQUE, - sort_id INTEGER PRIMARY KEY AUTOINCREMENT, - state INTEGER DEFAULT 1, - param_headers BLOB, - param_data BLOB, - value_headers BLOB, - value_data BLOB, - timeout INTEGER, - idempotency_key_for_create TEXT, - idempotency_key_for_complete TEXT, - tags BLOB, - created_on INTEGER, - completed_on INTEGER - ); - - CREATE INDEX IF NOT EXISTS idx_promises_id ON promises(id); - CREATE INDEX IF NOT EXISTS idx_promises_invocation ON promises(json_extract(tags, '$.resonate:invocation')); - CREATE INDEX IF NOT EXISTS idx_promises_timeout ON promises(json_extract(tags, '$.resonate:timeout')); - - CREATE TABLE IF NOT EXISTS schedules ( - id TEXT UNIQUE, - sort_id INTEGER PRIMARY KEY AUTOINCREMENT, - description TEXT, - cron TEXT, - tags BLOB, - promise_id TEXT, - promise_timeout INTEGER, - promise_param_headers BLOB, - promise_param_data BLOB, - promise_tags BLOB, - last_run_time INTEGER, - next_run_time INTEGER, - idempotency_key TEXT, - created_on INTEGER - ); - - CREATE INDEX IF NOT EXISTS idx_schedules_id ON schedules(id); - CREATE INDEX IF NOT EXISTS idx_schedules_next_run_time ON schedules(next_run_time); - - CREATE TABLE IF NOT EXISTS tasks ( - id TEXT UNIQUE, - counter INTEGER, - promise_id TEXT, - claim_timeout INTEGER, - complete_timeout INTEGER, - promise_timeout INTEGER, - created_on INTEGER, - completed_on INTEGER, - is_completed BOOLEAN - ); - - CREATE INDEX IF NOT EXISTS idx_tasks_id ON tasks(id); - - CREATE TABLE IF NOT EXISTS locks ( - resource_id TEXT UNIQUE, - process_id TEXT, - execution_id TEXT, - expiry_in_milliseconds INTEGER, - timeout INTEGER - ); - - CREATE INDEX IF NOT EXISTS idx_locks_acquire_id ON locks(resource_id, execution_id); - CREATE INDEX IF NOT EXISTS idx_locks_heartbeat_id ON locks(process_id); - CREATE INDEX IF NOT EXISTS idx_locks_timeout ON locks(timeout); - - CREATE TABLE IF NOT EXISTS timeouts ( - id TEXT, - time INTEGER, - PRIMARY KEY(id) - ); - - CREATE TABLE IF NOT EXISTS subscriptions ( - id TEXT, - promise_id TEXT, - sort_id INTEGER PRIMARY KEY AUTOINCREMENT, - url TEXT, - retry_policy BLOB, - created_on INTEGER, - UNIQUE(id, promise_id) - ); - - CREATE INDEX IF NOT EXISTS idx_subscriptions_id ON subscriptions(id); - - CREATE TABLE IF NOT EXISTS notifications ( - id TEXT, - promise_id TEXT, - url TEXT, - retry_policy BLOB, - time INTEGER, - attempt INTEGER, - PRIMARY KEY(id, promise_id) - ); - - CREATE TABLE IF NOT EXISTS migrations ( - id INTEGER PRIMARY KEY - ); - - INSERT INTO migrations (id) VALUES (1) ON CONFLICT(id) DO NOTHING;` +//go:embed migrations/* +var migrationsFS embed.FS +const ( PROMISE_SELECT_STATEMENT = ` SELECT id, state, param_headers, param_data, value_headers, value_data, timeout, idempotency_key_for_create, idempotency_key_for_complete, tags, created_on, completed_on @@ -435,7 +339,40 @@ func (s *SqliteStore) String() string { } func (s *SqliteStore) Start() error { - if _, err := s.db.Exec(CREATE_TABLE_STATEMENT); err != nil { + currentVersion, err := store.ReadVersion(s.db) + if err != nil { + return err + } + + migrationsToApply, err := store.ReadMigrations(migrationsFS, currentVersion) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), s.config.TxTimeout) + defer cancel() + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return err + } + + defer func() { + if err != nil { + if rbErr := tx.Rollback(); rbErr != nil { + err = fmt.Errorf("tx failed: %v, unable to rollback: %v", err, rbErr) + } + } + }() + + for _, m := range migrationsToApply { + _, err = tx.Exec(string(m.Content)) + if err != nil { + return fmt.Errorf("failed to execute migration version %d: %w", m.Version, err) + } + } + + if err = tx.Commit(); err != nil { return err } From adf92538a7c9a4c2a9385a7a5aacf771c7503970 Mon Sep 17 00:00:00 2001 From: Gabriel Guerra Date: Tue, 28 May 2024 00:31:24 -0400 Subject: [PATCH 02/10] feat(db-migrations): for the linting gods --- internal/app/subsystems/api/grpc/grpc_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/app/subsystems/api/grpc/grpc_test.go b/internal/app/subsystems/api/grpc/grpc_test.go index e51da4a3..267552cc 100644 --- a/internal/app/subsystems/api/grpc/grpc_test.go +++ b/internal/app/subsystems/api/grpc/grpc_test.go @@ -45,7 +45,7 @@ func setup() (*grpcTest, error) { go subsystem.Start(errors) time.Sleep(100 * time.Millisecond) - conn, err := grpc.Dial("127.0.0.1:5555", grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.NewClient("127.0.0.1:5555", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } From 00dcc713be7b6f8c99ab7e9d7b77dc25557869fe Mon Sep 17 00:00:00 2001 From: Gabriel Guerra Date: Tue, 28 May 2024 09:39:18 -0400 Subject: [PATCH 03/10] fix(postgres): include expected error for no migrations table --- .../app/subsystems/aio/store/migrations.go | 60 +++++++++++++++++-- .../subsystems/aio/store/postgres/postgres.go | 39 +----------- .../app/subsystems/aio/store/sqlite/sqlite.go | 39 +----------- 3 files changed, 58 insertions(+), 80 deletions(-) diff --git a/internal/app/subsystems/aio/store/migrations.go b/internal/app/subsystems/aio/store/migrations.go index 1392a378..409edbdd 100644 --- a/internal/app/subsystems/aio/store/migrations.go +++ b/internal/app/subsystems/aio/store/migrations.go @@ -1,6 +1,7 @@ package store import ( + "context" "database/sql" "embed" "fmt" @@ -9,6 +10,7 @@ import ( "sort" "strconv" "strings" + "time" ) type migration struct { @@ -16,20 +18,70 @@ type migration struct { Content []byte } -func ReadVersion(db *sql.DB) (int, error) { +func Start(db *sql.DB, txTimeout time.Duration, migrationsFS embed.FS) error { + currentVersion, err := readVersion(db) + if err != nil { + return err + } + + migrationsToApply, err := readMigrations(migrationsFS, currentVersion) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), txTimeout) + defer cancel() + + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + + defer func() { + if err != nil { + if rbErr := tx.Rollback(); rbErr != nil { + err = fmt.Errorf("tx failed: %v, unable to rollback: %v", err, rbErr) + } + } + }() + + for _, m := range migrationsToApply { + _, err = tx.Exec(string(m.Content)) + if err != nil { + return fmt.Errorf("failed to execute migration version %d: %w", m.Version, err) + } + } + + if err = tx.Commit(); err != nil { + return err + } + + return nil +} + +func readVersion(db *sql.DB) (int, error) { var version int err := db.QueryRow("SELECT id FROM migrations").Scan(&version) if err != nil { - if strings.Contains(err.Error(), "no such table") { + if isTableNotFoundError(err) { return -1, nil } return 0, err } - return version, nil } -func ReadMigrations(migrationsFS embed.FS, currentVersion int) ([]migration, error) { +// db.QueryRow does not return a specific error type when the table does +// not exist so we need to check the error message. +func isTableNotFoundError(err error) bool { + errStr := err.Error() + if strings.Contains(errStr, "no such table") || strings.Contains(errStr, "does not exist") { + return true + } + return false +} + +func readMigrations(migrationsFS embed.FS, currentVersion int) ([]migration, error) { migrations := []migration{} entries, err := migrationsFS.ReadDir("migrations") diff --git a/internal/app/subsystems/aio/store/postgres/postgres.go b/internal/app/subsystems/aio/store/postgres/postgres.go index 695d2a35..f9d87233 100644 --- a/internal/app/subsystems/aio/store/postgres/postgres.go +++ b/internal/app/subsystems/aio/store/postgres/postgres.go @@ -369,44 +369,7 @@ func (s *PostgresStore) String() string { } func (s *PostgresStore) Start() error { - currentVersion, err := store.ReadVersion(s.db) - if err != nil { - return err - } - - migrationsToApply, err := store.ReadMigrations(migrationsFS, currentVersion) - if err != nil { - return err - } - - ctx, cancel := context.WithTimeout(context.Background(), s.config.TxTimeout) - defer cancel() - - tx, err := s.db.BeginTx(ctx, nil) - if err != nil { - return err - } - - defer func() { - if err != nil { - if rbErr := tx.Rollback(); rbErr != nil { - err = fmt.Errorf("tx failed: %v, unable to rollback: %v", err, rbErr) - } - } - }() - - for _, m := range migrationsToApply { - _, err = tx.Exec(string(m.Content)) - if err != nil { - return fmt.Errorf("failed to execute migration version %d: %w", m.Version, err) - } - } - - if err = tx.Commit(); err != nil { - return err - } - - return nil + return store.Start(s.db, s.config.TxTimeout, migrationsFS) } func (s *PostgresStore) Stop() error { diff --git a/internal/app/subsystems/aio/store/sqlite/sqlite.go b/internal/app/subsystems/aio/store/sqlite/sqlite.go index a57e67a8..f7ea564b 100644 --- a/internal/app/subsystems/aio/store/sqlite/sqlite.go +++ b/internal/app/subsystems/aio/store/sqlite/sqlite.go @@ -339,44 +339,7 @@ func (s *SqliteStore) String() string { } func (s *SqliteStore) Start() error { - currentVersion, err := store.ReadVersion(s.db) - if err != nil { - return err - } - - migrationsToApply, err := store.ReadMigrations(migrationsFS, currentVersion) - if err != nil { - return err - } - - ctx, cancel := context.WithTimeout(context.Background(), s.config.TxTimeout) - defer cancel() - - tx, err := s.db.BeginTx(ctx, nil) - if err != nil { - return err - } - - defer func() { - if err != nil { - if rbErr := tx.Rollback(); rbErr != nil { - err = fmt.Errorf("tx failed: %v, unable to rollback: %v", err, rbErr) - } - } - }() - - for _, m := range migrationsToApply { - _, err = tx.Exec(string(m.Content)) - if err != nil { - return fmt.Errorf("failed to execute migration version %d: %w", m.Version, err) - } - } - - if err = tx.Commit(); err != nil { - return err - } - - return nil + return store.Start(s.db, s.config.TxTimeout, migrationsFS) } func (s *SqliteStore) Stop() error { From d4a66fabd150eaf5cb3b025fbc5de808465a8e2a Mon Sep 17 00:00:00 2001 From: Gabriel Guerra Date: Tue, 28 May 2024 09:57:29 -0400 Subject: [PATCH 04/10] feat(error): better rollback error --- internal/app/subsystems/aio/store/migrations.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/app/subsystems/aio/store/migrations.go b/internal/app/subsystems/aio/store/migrations.go index 409edbdd..a3fc46ef 100644 --- a/internal/app/subsystems/aio/store/migrations.go +++ b/internal/app/subsystems/aio/store/migrations.go @@ -43,6 +43,8 @@ func Start(db *sql.DB, txTimeout time.Duration, migrationsFS embed.FS) error { err = fmt.Errorf("tx failed: %v, unable to rollback: %v", err, rbErr) } } + + err = fmt.Errorf("tx failed, performed a rollback: %v", err) }() for _, m := range migrationsToApply { From fe3551cba55cd6cd67b95ae32a67a2c32f2dfd55 Mon Sep 17 00:00:00 2001 From: Gabriel Guerra Date: Thu, 30 May 2024 10:33:32 -0400 Subject: [PATCH 05/10] feat(migrations): added command --- cmd/migrations/migrations.go | 89 +++++++++++ cmd/root.go | 2 + cmd/util/config.go | 2 + .../app/subsystems/aio/store/migrations.go | 150 ++++++++++++------ .../postgres/migrations/migration-0001.sql | 11 -- .../subsystems/aio/store/postgres/postgres.go | 3 +- .../aio/store/postgres/postgres_test.go | 1 + .../subsystems/aio/store/postgres/version.go | 4 + .../sqlite/migrations/migration-0001.sql | 11 -- .../app/subsystems/aio/store/sqlite/sqlite.go | 3 +- .../subsystems/aio/store/sqlite/version.go | 4 + 11 files changed, 204 insertions(+), 76 deletions(-) create mode 100644 cmd/migrations/migrations.go create mode 100644 internal/app/subsystems/aio/store/postgres/version.go create mode 100644 internal/app/subsystems/aio/store/sqlite/version.go diff --git a/cmd/migrations/migrations.go b/cmd/migrations/migrations.go new file mode 100644 index 00000000..ab42e0d0 --- /dev/null +++ b/cmd/migrations/migrations.go @@ -0,0 +1,89 @@ +package migrations + +import ( + "time" + + "github.com/resonatehq/resonate/cmd/util" + "github.com/resonatehq/resonate/internal/aio" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/store" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/postgres" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/sqlite" + "github.com/spf13/cobra" +) + +func NewCmd() *cobra.Command { + var ( + aioStore string + aioStoreSqlitePath string + aioStoreSqliteTxTimeout time.Duration + aioStorePostgresHost string + aioStorePostgresPort string + aioStorePostgresUsername string + aioStorePostgresPassword string + aioStorePostgresDatabase string + aioStorePostgresQuery map[string]string + aioStorePostgresTxTimeout time.Duration + plan bool + ) + + var cmd = &cobra.Command{ + Use: "migrate", + Short: "Synchronizes the database state with the current set of models and migrations", + RunE: func(cmd *cobra.Command, args []string) error { + cfg := &util.AIOSubsystemConfig[util.StoreConfig]{ + Subsystem: &aio.SubsystemConfig{ + Size: 1, + BatchSize: 1, + Workers: 1, // Only one connection will be opened to run migrations. + }, + Config: &util.StoreConfig{ + Kind: util.StoreKind(aioStore), + Plan: store.Apply, + Sqlite: &sqlite.Config{ + Path: aioStoreSqlitePath, + TxTimeout: aioStoreSqliteTxTimeout, + }, + Postgres: &postgres.Config{ + Host: aioStorePostgresHost, + Port: aioStorePostgresPort, + Username: aioStorePostgresUsername, + Password: aioStorePostgresPassword, + Database: aioStorePostgresDatabase, + Query: aioStorePostgresQuery, + TxTimeout: aioStorePostgresTxTimeout, + }, + }, + } + + if plan { + cfg.Config.Plan = store.DryRun + } + + store, err := util.NewStore(cfg) + if err != nil { + return err + } + + // Runs migrations. + if err := store.Start(); err != nil { + return err + } + + return store.Stop() + }, + } + + cmd.Flags().StringVar(&aioStore, "aio-store", "sqlite", "promise store type") + cmd.Flags().StringVar(&aioStoreSqlitePath, "aio-store-sqlite-path", "resonate.db", "sqlite database path") + cmd.Flags().DurationVar(&aioStoreSqliteTxTimeout, "aio-store-sqlite-tx-timeout", 10_000*time.Millisecond, "sqlite transaction timeout") + cmd.Flags().StringVar(&aioStorePostgresHost, "aio-store-postgres-host", "localhost", "postgres host") + cmd.Flags().StringVar(&aioStorePostgresPort, "aio-store-postgres-port", "5432", "postgres port") + cmd.Flags().StringVar(&aioStorePostgresUsername, "aio-store-postgres-username", "", "postgres username") + cmd.Flags().StringVar(&aioStorePostgresPassword, "aio-store-postgres-password", "", "postgres password") + cmd.Flags().StringVar(&aioStorePostgresDatabase, "aio-store-postgres-database", "resonate", "postgres database name") + cmd.Flags().StringToStringVar(&aioStorePostgresQuery, "aio-store-postgres-query", make(map[string]string, 0), "postgres query options") + cmd.Flags().DurationVar(&aioStorePostgresTxTimeout, "aio-store-postgres-tx-timeout", 10_000*time.Millisecond, "postgres transaction timeout") + cmd.Flags().BoolVarP(&plan, "plan", "p", true, "Dry run to check which migrations will be applied") + + return cmd +} diff --git a/cmd/root.go b/cmd/root.go index 238d52d4..5c22af1d 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/resonatehq/resonate/cmd/dst" + "github.com/resonatehq/resonate/cmd/migrations" "github.com/resonatehq/resonate/cmd/promises" "github.com/resonatehq/resonate/cmd/quickstart" "github.com/resonatehq/resonate/cmd/schedules" @@ -43,6 +44,7 @@ func init() { rootCmd.AddCommand(dst.NewCmd()) rootCmd.AddCommand(serve.ServeCmd()) rootCmd.AddCommand(quickstart.NewCmd()) + rootCmd.AddCommand(migrations.NewCmd()) // Set default output rootCmd.SetOut(os.Stdout) diff --git a/cmd/util/config.go b/cmd/util/config.go index 122697df..829ed7d0 100644 --- a/cmd/util/config.go +++ b/cmd/util/config.go @@ -8,6 +8,7 @@ import ( "github.com/resonatehq/resonate/internal/aio" "github.com/resonatehq/resonate/internal/app/subsystems/aio/network" "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/store" "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/postgres" "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/sqlite" "github.com/resonatehq/resonate/internal/app/subsystems/api/grpc" @@ -133,6 +134,7 @@ const ( type StoreConfig struct { Kind StoreKind + Plan store.Plan Sqlite *sqlite.Config Postgres *postgres.Config } diff --git a/internal/app/subsystems/aio/store/migrations.go b/internal/app/subsystems/aio/store/migrations.go index a3fc46ef..5981f32b 100644 --- a/internal/app/subsystems/aio/store/migrations.go +++ b/internal/app/subsystems/aio/store/migrations.go @@ -13,54 +13,83 @@ import ( "time" ) -type migration struct { +type Plan int + +const ( + Default Plan = iota // 0 is the default value + DryRun + Apply +) + +type Migration struct { Version int Content []byte } -func Start(db *sql.DB, txTimeout time.Duration, migrationsFS embed.FS) error { - currentVersion, err := readVersion(db) - if err != nil { - return err +type MigrationPlan []Migration + +func (m MigrationPlan) String() string { + var sb strings.Builder + sb.WriteString("Operations to perform:\n") + sb.WriteString("Apply all migrations:") + for _, migration := range m { + sb.WriteString(fmt.Sprintf(" %d", migration.Version)) } + return sb.String() +} - migrationsToApply, err := readMigrations(migrationsFS, currentVersion) +func Start(currVersion int, db *sql.DB, txTimeout time.Duration, migrationsFS embed.FS, plan Plan) error { + dbVersion, err := readVersion(db) if err != nil { return err } - ctx, cancel := context.WithTimeout(context.Background(), txTimeout) - defer cancel() + if currVersion < dbVersion { + return fmt.Errorf("current version %d is less than database version %d please updated to latest resonate release", currVersion, dbVersion) - tx, err := db.BeginTx(ctx, nil) - if err != nil { - return err + } + if currVersion == dbVersion { + return nil } - defer func() { + // If the database version is -1, it means the migrations table does not exist. + if dbVersion == -1 { // versioning with version.go + plan = Apply + } + + switch plan { + case Default: + return fmt.Errorf("database version %d does not match current version %d please run `resonate migrate --plan` to see migrations needed", dbVersion, currVersion) + case DryRun: + plan, err := generateMigrationPlan(migrationsFS, dbVersion) if err != nil { - if rbErr := tx.Rollback(); rbErr != nil { - err = fmt.Errorf("tx failed: %v, unable to rollback: %v", err, rbErr) - } + return err } - - err = fmt.Errorf("tx failed, performed a rollback: %v", err) - }() - - for _, m := range migrationsToApply { - _, err = tx.Exec(string(m.Content)) + fmt.Println("Migrations to apply:") + fmt.Printf("Migrations to apply: %v", plan) + case Apply: + plan, err := generateMigrationPlan(migrationsFS, dbVersion) if err != nil { - return fmt.Errorf("failed to execute migration version %d: %w", m.Version, err) + return err } - } - - if err = tx.Commit(); err != nil { - return err + return applyMigrationPlan(db, plan, txTimeout) + default: + return fmt.Errorf("invalid plan: %v", plan) } return nil } +// db.QueryRow does not return a specific error type when the table does not exist so we need to check the error message. +func isTableNotFoundError(err error) bool { + errStr := err.Error() + if strings.Contains(errStr, "no such table") || strings.Contains(errStr, "does not exist") { + return true + } + return false +} + +// readVersion reads the current schema version from the database. func readVersion(db *sql.DB) (int, error) { var version int err := db.QueryRow("SELECT id FROM migrations").Scan(&version) @@ -73,19 +102,23 @@ func readVersion(db *sql.DB) (int, error) { return version, nil } -// db.QueryRow does not return a specific error type when the table does -// not exist so we need to check the error message. -func isTableNotFoundError(err error) bool { - errStr := err.Error() - if strings.Contains(errStr, "no such table") || strings.Contains(errStr, "does not exist") { - return true +// filenameVersion extracts the version number from a migration filename. +func migrationVersion(filename string) (int, error) { + re := regexp.MustCompile(`\d+`) + versionStr := re.FindString(filepath.Base(filename)) + if versionStr == "" { + return 0, fmt.Errorf("could not extract version from filename: %s", filename) } - return false + version, err := strconv.Atoi(versionStr) + if err != nil { + return 0, fmt.Errorf("could not convert version string to int: %v", err) + } + return version, nil } -func readMigrations(migrationsFS embed.FS, currentVersion int) ([]migration, error) { - migrations := []migration{} - +// generateMigrationPlan reads the migrations from the filesystem and returns a plan of migrations to apply. +func generateMigrationPlan(migrationsFS embed.FS, currentVersion int) (MigrationPlan, error) { + migrations := []Migration{} entries, err := migrationsFS.ReadDir("migrations") if err != nil { return nil, err @@ -97,18 +130,15 @@ func readMigrations(migrationsFS embed.FS, currentVersion int) ([]migration, err if err != nil { return nil, err } - - version, err := filenameVersion(filename) + version, err := migrationVersion(filename) if err != nil { return nil, err } - // Skip migrations that are at or below the current version. if version <= currentVersion { continue } - - migrations = append(migrations, migration{ + migrations = append(migrations, Migration{ Version: version, Content: content, }) @@ -121,20 +151,36 @@ func readMigrations(migrationsFS embed.FS, currentVersion int) ([]migration, err return migrations, nil } -func filenameVersion(filename string) (int, error) { - // Use a regular expression to extract the version number - // from the filename by matching the first sequence of digits. - re := regexp.MustCompile(`\d+`) - versionStr := re.FindString(filepath.Base(filename)) +// applyMigrationPlan applies the migrations to the database as a single transaction. +func applyMigrationPlan(db *sql.DB, plan MigrationPlan, txTimeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), txTimeout) + defer cancel() - if versionStr == "" { - return 0, fmt.Errorf("could not extract version from filename: %s", filename) + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err } - version, err := strconv.Atoi(versionStr) - if err != nil { - return 0, fmt.Errorf("could not convert version string to int: %v", err) + defer func() { + if err != nil { + if rbErr := tx.Rollback(); rbErr != nil { + err = fmt.Errorf("tx failed: %v, unable to rollback: %v", err, rbErr) + } + } + + err = fmt.Errorf("tx failed, performed a rollback: %v", err) + }() + + for _, m := range plan { + _, err = tx.Exec(string(m.Content)) + if err != nil { + return fmt.Errorf("failed to execute migration version %d: %w", m.Version, err) + } } - return version, nil + if err = tx.Commit(); err != nil { + return err + } + + return nil } diff --git a/internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql b/internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql index a7ea0adb..bea3de80 100644 --- a/internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql +++ b/internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql @@ -1,13 +1,2 @@ -- Insert initial migration record INSERT INTO migrations (id) VALUES (1) ON CONFLICT(id) DO NOTHING; - --- -- DropInvocationColumn --- ALTER TABLE promises DROP COLUMN invocation; - --- -- DropOldInvocationIndex --- DROP INDEX x; --- DROP INDEX x; - --- -- AddNewInvocationIndex --- CREATE INDEX --- CREATE INDEX \ No newline at end of file diff --git a/internal/app/subsystems/aio/store/postgres/postgres.go b/internal/app/subsystems/aio/store/postgres/postgres.go index f9d87233..4a24ed64 100644 --- a/internal/app/subsystems/aio/store/postgres/postgres.go +++ b/internal/app/subsystems/aio/store/postgres/postgres.go @@ -323,6 +323,7 @@ type Config struct { Query map[string]string TxTimeout time.Duration Reset bool + Plan store.Plan } type PostgresStore struct { @@ -369,7 +370,7 @@ func (s *PostgresStore) String() string { } func (s *PostgresStore) Start() error { - return store.Start(s.db, s.config.TxTimeout, migrationsFS) + return store.Start(Version, s.db, 10*time.Second, migrationsFS, s.config.Plan) } func (s *PostgresStore) Stop() error { diff --git a/internal/app/subsystems/aio/store/postgres/postgres_test.go b/internal/app/subsystems/aio/store/postgres/postgres_test.go index 2912ef01..78eb4123 100644 --- a/internal/app/subsystems/aio/store/postgres/postgres_test.go +++ b/internal/app/subsystems/aio/store/postgres/postgres_test.go @@ -28,6 +28,7 @@ func TestPostgresStore(t *testing.T) { Database: database, Query: map[string]string{"sslmode": "disable"}, TxTimeout: 250 * time.Millisecond, + //... here + the 2x dst run.go and dst_test.go }, 1) if err != nil { t.Fatal(err) diff --git a/internal/app/subsystems/aio/store/postgres/version.go b/internal/app/subsystems/aio/store/postgres/version.go new file mode 100644 index 00000000..ddd54c22 --- /dev/null +++ b/internal/app/subsystems/aio/store/postgres/version.go @@ -0,0 +1,4 @@ +package postgres + +// Version is the Postgres database release version. +const Version = 1 diff --git a/internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql b/internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql index a7ea0adb..bea3de80 100644 --- a/internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql +++ b/internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql @@ -1,13 +1,2 @@ -- Insert initial migration record INSERT INTO migrations (id) VALUES (1) ON CONFLICT(id) DO NOTHING; - --- -- DropInvocationColumn --- ALTER TABLE promises DROP COLUMN invocation; - --- -- DropOldInvocationIndex --- DROP INDEX x; --- DROP INDEX x; - --- -- AddNewInvocationIndex --- CREATE INDEX --- CREATE INDEX \ No newline at end of file diff --git a/internal/app/subsystems/aio/store/sqlite/sqlite.go b/internal/app/subsystems/aio/store/sqlite/sqlite.go index f7ea564b..9b2bfbe5 100644 --- a/internal/app/subsystems/aio/store/sqlite/sqlite.go +++ b/internal/app/subsystems/aio/store/sqlite/sqlite.go @@ -311,6 +311,7 @@ type Config struct { Path string TxTimeout time.Duration Reset bool + Plan store.Plan } type SqliteStore struct { @@ -339,7 +340,7 @@ func (s *SqliteStore) String() string { } func (s *SqliteStore) Start() error { - return store.Start(s.db, s.config.TxTimeout, migrationsFS) + return store.Start(Version, s.db, 10*time.Second, migrationsFS, s.config.Plan) } func (s *SqliteStore) Stop() error { diff --git a/internal/app/subsystems/aio/store/sqlite/version.go b/internal/app/subsystems/aio/store/sqlite/version.go new file mode 100644 index 00000000..30ad1974 --- /dev/null +++ b/internal/app/subsystems/aio/store/sqlite/version.go @@ -0,0 +1,4 @@ +package sqlite + +// Version is the SQLite database release version. +const Version = 1 From f5a39a3186dbaf3a6be7251146bd4660da21f163 Mon Sep 17 00:00:00 2001 From: Gabriel Guerra Date: Fri, 31 May 2024 11:21:58 -0400 Subject: [PATCH 06/10] feat(db-migrations): clean up --- cmd/migrations/migrations.go | 6 +++--- cmd/util/config.go | 4 ++-- .../aio/store/{ => migrations}/migrations.go | 4 ++-- .../aio/store/postgres/migrations/migration-0001.sql | 10 ++++++++-- .../aio/store/postgres/migrations/schema-0000.sql | 9 --------- .../app/subsystems/aio/store/postgres/postgres.go | 12 ++++++++---- .../aio/store/sqlite/migrations/migration-0001.sql | 9 +++++++-- .../aio/store/sqlite/migrations/schema-0000.sql | 8 -------- internal/app/subsystems/aio/store/sqlite/sqlite.go | 11 +++++++---- 9 files changed, 37 insertions(+), 36 deletions(-) rename internal/app/subsystems/aio/store/{ => migrations}/migrations.go (97%) diff --git a/cmd/migrations/migrations.go b/cmd/migrations/migrations.go index ab42e0d0..6adc7496 100644 --- a/cmd/migrations/migrations.go +++ b/cmd/migrations/migrations.go @@ -5,7 +5,7 @@ import ( "github.com/resonatehq/resonate/cmd/util" "github.com/resonatehq/resonate/internal/aio" - "github.com/resonatehq/resonate/internal/app/subsystems/aio/store" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/migrations" "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/postgres" "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/sqlite" "github.com/spf13/cobra" @@ -38,7 +38,7 @@ func NewCmd() *cobra.Command { }, Config: &util.StoreConfig{ Kind: util.StoreKind(aioStore), - Plan: store.Apply, + Plan: migrations.Apply, Sqlite: &sqlite.Config{ Path: aioStoreSqlitePath, TxTimeout: aioStoreSqliteTxTimeout, @@ -56,7 +56,7 @@ func NewCmd() *cobra.Command { } if plan { - cfg.Config.Plan = store.DryRun + cfg.Config.Plan = migrations.DryRun } store, err := util.NewStore(cfg) diff --git a/cmd/util/config.go b/cmd/util/config.go index 829ed7d0..7bc9d99f 100644 --- a/cmd/util/config.go +++ b/cmd/util/config.go @@ -8,7 +8,7 @@ import ( "github.com/resonatehq/resonate/internal/aio" "github.com/resonatehq/resonate/internal/app/subsystems/aio/network" "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing" - "github.com/resonatehq/resonate/internal/app/subsystems/aio/store" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/migrations" "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/postgres" "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/sqlite" "github.com/resonatehq/resonate/internal/app/subsystems/api/grpc" @@ -134,7 +134,7 @@ const ( type StoreConfig struct { Kind StoreKind - Plan store.Plan + Plan migrations.Plan Sqlite *sqlite.Config Postgres *postgres.Config } diff --git a/internal/app/subsystems/aio/store/migrations.go b/internal/app/subsystems/aio/store/migrations/migrations.go similarity index 97% rename from internal/app/subsystems/aio/store/migrations.go rename to internal/app/subsystems/aio/store/migrations/migrations.go index 5981f32b..a4987ba9 100644 --- a/internal/app/subsystems/aio/store/migrations.go +++ b/internal/app/subsystems/aio/store/migrations/migrations.go @@ -1,4 +1,4 @@ -package store +package migrations import ( "context" @@ -38,7 +38,7 @@ func (m MigrationPlan) String() string { return sb.String() } -func Start(currVersion int, db *sql.DB, txTimeout time.Duration, migrationsFS embed.FS, plan Plan) error { +func Run(currVersion int, db *sql.DB, txTimeout time.Duration, migrationsFS embed.FS, plan Plan) error { dbVersion, err := readVersion(db) if err != nil { return err diff --git a/internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql b/internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql index bea3de80..cc53f6d5 100644 --- a/internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql +++ b/internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql @@ -1,2 +1,8 @@ --- Insert initial migration record -INSERT INTO migrations (id) VALUES (1) ON CONFLICT(id) DO NOTHING; +-- CreateTable +CREATE TABLE IF NOT EXISTS migrations ( + id INTEGER, + PRIMARY KEY(id) +); + +--- CreateSchemaVersion +INSERT INTO migrations (id) VALUES (1) ON CONFLICT(id) DO NOTHING; \ No newline at end of file diff --git a/internal/app/subsystems/aio/store/postgres/migrations/schema-0000.sql b/internal/app/subsystems/aio/store/postgres/migrations/schema-0000.sql index 1a4e9148..befdc364 100644 --- a/internal/app/subsystems/aio/store/postgres/migrations/schema-0000.sql +++ b/internal/app/subsystems/aio/store/postgres/migrations/schema-0000.sql @@ -117,12 +117,3 @@ CREATE TABLE IF NOT EXISTS notifications ( attempt INTEGER, PRIMARY KEY(id, promise_id) ); - --- CreateTable -CREATE TABLE IF NOT EXISTS migrations ( - id INTEGER, - PRIMARY KEY(id) -); - ---- CreateSchemaVersion -INSERT INTO migrations (id) VALUES (0) ON CONFLICT(id) DO NOTHING; \ No newline at end of file diff --git a/internal/app/subsystems/aio/store/postgres/postgres.go b/internal/app/subsystems/aio/store/postgres/postgres.go index 4a24ed64..f0420844 100644 --- a/internal/app/subsystems/aio/store/postgres/postgres.go +++ b/internal/app/subsystems/aio/store/postgres/postgres.go @@ -13,6 +13,7 @@ import ( "github.com/resonatehq/resonate/internal/aio" "github.com/resonatehq/resonate/internal/app/subsystems/aio/store" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/migrations" "github.com/resonatehq/resonate/internal/kernel/bus" "github.com/resonatehq/resonate/internal/kernel/t_aio" @@ -28,8 +29,10 @@ import ( _ "github.com/lib/pq" ) -//go:embed migrations/* -var migrationsFS embed.FS +var ( + //go:embed migrations/* + migrationsFS embed.FS +) const ( DROP_TABLE_STATEMENT = ` @@ -323,7 +326,7 @@ type Config struct { Query map[string]string TxTimeout time.Duration Reset bool - Plan store.Plan + Plan migrations.Plan } type PostgresStore struct { @@ -370,7 +373,8 @@ func (s *PostgresStore) String() string { } func (s *PostgresStore) Start() error { - return store.Start(Version, s.db, 10*time.Second, migrationsFS, s.config.Plan) + // If needed, apply migrations + return migrations.Run(Version, s.db, 10*time.Second, migrationsFS, s.config.Plan) } func (s *PostgresStore) Stop() error { diff --git a/internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql b/internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql index bea3de80..bb27917d 100644 --- a/internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql +++ b/internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql @@ -1,2 +1,7 @@ --- Insert initial migration record -INSERT INTO migrations (id) VALUES (1) ON CONFLICT(id) DO NOTHING; +-- CreateTable +CREATE TABLE IF NOT EXISTS migrations ( + id INTEGER PRIMARY KEY +); + +--- CreateSchemaVersion +INSERT INTO migrations (id) VALUES (1) ON CONFLICT(id) DO NOTHING; \ No newline at end of file diff --git a/internal/app/subsystems/aio/store/sqlite/migrations/schema-0000.sql b/internal/app/subsystems/aio/store/sqlite/migrations/schema-0000.sql index 89c6ca8d..4496960e 100644 --- a/internal/app/subsystems/aio/store/sqlite/migrations/schema-0000.sql +++ b/internal/app/subsystems/aio/store/sqlite/migrations/schema-0000.sql @@ -113,11 +113,3 @@ CREATE TABLE IF NOT EXISTS notifications ( attempt INTEGER, PRIMARY KEY(id, promise_id) ); - --- CreateTable -CREATE TABLE IF NOT EXISTS migrations ( - id INTEGER PRIMARY KEY -); - ---- CreateSchemaVersion -INSERT INTO migrations (id) VALUES (0) ON CONFLICT(id) DO NOTHING; \ No newline at end of file diff --git a/internal/app/subsystems/aio/store/sqlite/sqlite.go b/internal/app/subsystems/aio/store/sqlite/sqlite.go index 9b2bfbe5..91cb9923 100644 --- a/internal/app/subsystems/aio/store/sqlite/sqlite.go +++ b/internal/app/subsystems/aio/store/sqlite/sqlite.go @@ -13,6 +13,7 @@ import ( "github.com/resonatehq/resonate/internal/aio" "github.com/resonatehq/resonate/internal/app/subsystems/aio/store" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/migrations" "github.com/resonatehq/resonate/internal/kernel/bus" "github.com/resonatehq/resonate/internal/kernel/t_aio" @@ -28,8 +29,10 @@ import ( _ "github.com/mattn/go-sqlite3" ) -//go:embed migrations/* -var migrationsFS embed.FS +var ( + //go:embed migrations/* + migrationsFS embed.FS +) const ( PROMISE_SELECT_STATEMENT = ` @@ -311,7 +314,7 @@ type Config struct { Path string TxTimeout time.Duration Reset bool - Plan store.Plan + Plan migrations.Plan } type SqliteStore struct { @@ -340,7 +343,7 @@ func (s *SqliteStore) String() string { } func (s *SqliteStore) Start() error { - return store.Start(Version, s.db, 10*time.Second, migrationsFS, s.config.Plan) + return migrations.Run(Version, s.db, 10*time.Second, migrationsFS, s.config.Plan) } func (s *SqliteStore) Stop() error { From 4b40d2b45bdcb4f0b5a32ce78e13ff9ec8cbe27d Mon Sep 17 00:00:00 2001 From: Gabriel Guerra Date: Fri, 31 May 2024 11:53:44 -0400 Subject: [PATCH 07/10] feat(back-channel): clean up x2 --- .../aio/store/migrations/migrations.go | 120 +++++------------- .../subsystems/aio/store/postgres/postgres.go | 2 +- .../subsystems/aio/store/postgres/version.go | 69 ++++++++++ .../app/subsystems/aio/store/sqlite/sqlite.go | 2 +- .../subsystems/aio/store/sqlite/version.go | 63 +++++++++ 5 files changed, 165 insertions(+), 91 deletions(-) diff --git a/internal/app/subsystems/aio/store/migrations/migrations.go b/internal/app/subsystems/aio/store/migrations/migrations.go index a4987ba9..2826a96f 100644 --- a/internal/app/subsystems/aio/store/migrations/migrations.go +++ b/internal/app/subsystems/aio/store/migrations/migrations.go @@ -1,7 +1,7 @@ +// Shared package for handling database migrations. package migrations import ( - "context" "database/sql" "embed" "fmt" @@ -38,59 +38,8 @@ func (m MigrationPlan) String() string { return sb.String() } -func Run(currVersion int, db *sql.DB, txTimeout time.Duration, migrationsFS embed.FS, plan Plan) error { - dbVersion, err := readVersion(db) - if err != nil { - return err - } - - if currVersion < dbVersion { - return fmt.Errorf("current version %d is less than database version %d please updated to latest resonate release", currVersion, dbVersion) - - } - if currVersion == dbVersion { - return nil - } - - // If the database version is -1, it means the migrations table does not exist. - if dbVersion == -1 { // versioning with version.go - plan = Apply - } - - switch plan { - case Default: - return fmt.Errorf("database version %d does not match current version %d please run `resonate migrate --plan` to see migrations needed", dbVersion, currVersion) - case DryRun: - plan, err := generateMigrationPlan(migrationsFS, dbVersion) - if err != nil { - return err - } - fmt.Println("Migrations to apply:") - fmt.Printf("Migrations to apply: %v", plan) - case Apply: - plan, err := generateMigrationPlan(migrationsFS, dbVersion) - if err != nil { - return err - } - return applyMigrationPlan(db, plan, txTimeout) - default: - return fmt.Errorf("invalid plan: %v", plan) - } - - return nil -} - -// db.QueryRow does not return a specific error type when the table does not exist so we need to check the error message. -func isTableNotFoundError(err error) bool { - errStr := err.Error() - if strings.Contains(errStr, "no such table") || strings.Contains(errStr, "does not exist") { - return true - } - return false -} - // readVersion reads the current schema version from the database. -func readVersion(db *sql.DB) (int, error) { +func ReadVersion(db *sql.DB) (int, error) { var version int err := db.QueryRow("SELECT id FROM migrations").Scan(&version) if err != nil { @@ -102,22 +51,8 @@ func readVersion(db *sql.DB) (int, error) { return version, nil } -// filenameVersion extracts the version number from a migration filename. -func migrationVersion(filename string) (int, error) { - re := regexp.MustCompile(`\d+`) - versionStr := re.FindString(filepath.Base(filename)) - if versionStr == "" { - return 0, fmt.Errorf("could not extract version from filename: %s", filename) - } - version, err := strconv.Atoi(versionStr) - if err != nil { - return 0, fmt.Errorf("could not convert version string to int: %v", err) - } - return version, nil -} - // generateMigrationPlan reads the migrations from the filesystem and returns a plan of migrations to apply. -func generateMigrationPlan(migrationsFS embed.FS, currentVersion int) (MigrationPlan, error) { +func GenerateMigrationPlan(migrationsFS embed.FS, currentVersion int) (MigrationPlan, error) { migrations := []Migration{} entries, err := migrationsFS.ReadDir("migrations") if err != nil { @@ -152,35 +87,42 @@ func generateMigrationPlan(migrationsFS embed.FS, currentVersion int) (Migration } // applyMigrationPlan applies the migrations to the database as a single transaction. -func applyMigrationPlan(db *sql.DB, plan MigrationPlan, txTimeout time.Duration) error { - ctx, cancel := context.WithTimeout(context.Background(), txTimeout) - defer cancel() - - tx, err := db.BeginTx(ctx, nil) - if err != nil { - return err - } - - defer func() { - if err != nil { - if rbErr := tx.Rollback(); rbErr != nil { - err = fmt.Errorf("tx failed: %v, unable to rollback: %v", err, rbErr) - } - } - - err = fmt.Errorf("tx failed, performed a rollback: %v", err) - }() - +func ApplyMigrationPlan(tx *sql.Tx, plan MigrationPlan, txTimeout time.Duration) error { for _, m := range plan { - _, err = tx.Exec(string(m.Content)) + _, err := tx.Exec(string(m.Content)) if err != nil { return fmt.Errorf("failed to execute migration version %d: %w", m.Version, err) } } - if err = tx.Commit(); err != nil { + if err := tx.Commit(); err != nil { return err } return nil } + +// private + +// db.QueryRow does not return a specific error type when the table does not exist so we need to check the error message. +func isTableNotFoundError(err error) bool { + errStr := err.Error() + if strings.Contains(errStr, "no such table") || strings.Contains(errStr, "does not exist") { + return true + } + return false +} + +// filenameVersion extracts the version number from a migration filename. +func migrationVersion(filename string) (int, error) { + re := regexp.MustCompile(`\d+`) + versionStr := re.FindString(filepath.Base(filename)) + if versionStr == "" { + return 0, fmt.Errorf("could not extract version from filename: %s", filename) + } + version, err := strconv.Atoi(versionStr) + if err != nil { + return 0, fmt.Errorf("could not convert version string to int: %v", err) + } + return version, nil +} diff --git a/internal/app/subsystems/aio/store/postgres/postgres.go b/internal/app/subsystems/aio/store/postgres/postgres.go index f0420844..6d5d7208 100644 --- a/internal/app/subsystems/aio/store/postgres/postgres.go +++ b/internal/app/subsystems/aio/store/postgres/postgres.go @@ -374,7 +374,7 @@ func (s *PostgresStore) String() string { func (s *PostgresStore) Start() error { // If needed, apply migrations - return migrations.Run(Version, s.db, 10*time.Second, migrationsFS, s.config.Plan) + return Run(Version, s.db, 10*time.Second, migrationsFS, s.config.Plan) } func (s *PostgresStore) Stop() error { diff --git a/internal/app/subsystems/aio/store/postgres/version.go b/internal/app/subsystems/aio/store/postgres/version.go index ddd54c22..46f6ca83 100644 --- a/internal/app/subsystems/aio/store/postgres/version.go +++ b/internal/app/subsystems/aio/store/postgres/version.go @@ -1,4 +1,73 @@ package postgres +import ( + "context" + "database/sql" + "embed" + "fmt" + "time" + + "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/migrations" +) + // Version is the Postgres database release version. const Version = 1 + +func Run(currVersion int, db *sql.DB, txTimeout time.Duration, migrationsFS embed.FS, plan migrations.Plan) error { + ctx, cancel := context.WithTimeout(context.Background(), txTimeout) + defer cancel() + + // Acquire a lock to check the database version. + tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) + if err != nil { + return err + } + defer tx.Rollback() + + _, err = tx.ExecContext(ctx, "SELECT pg_advisory_lock(1)") + if err != nil { + return err + } + defer tx.ExecContext(ctx, "SELECT pg_advisory_unlock(1)") + + // Check the database version again while holding the lock + dbVersion, err := migrations.ReadVersion(db) + if err != nil { + return err + } + + if currVersion < dbVersion { + return fmt.Errorf("current version %d is less than database version %d please updated to latest resonate release", currVersion, dbVersion) + + } + if currVersion == dbVersion { + return nil + } + + // If the database version is -1, it means the migrations table does not exist. + if dbVersion == -1 { + plan = migrations.Apply + } + + switch plan { + case migrations.Default: + return fmt.Errorf("database version %d does not match current version %d please run `resonate migrate --plan` to see migrations needed", dbVersion, currVersion) + case migrations.DryRun: + plan, err := migrations.GenerateMigrationPlan(migrationsFS, dbVersion) + if err != nil { + return err + } + fmt.Println("Migrations to apply:") + fmt.Printf("Migrations to apply: %v", plan) + case migrations.Apply: + plan, err := migrations.GenerateMigrationPlan(migrationsFS, dbVersion) + if err != nil { + return err + } + return migrations.ApplyMigrationPlan(tx, plan, txTimeout) + default: + return fmt.Errorf("invalid plan: %v", plan) + } + + return nil +} diff --git a/internal/app/subsystems/aio/store/sqlite/sqlite.go b/internal/app/subsystems/aio/store/sqlite/sqlite.go index 91cb9923..00a21bc3 100644 --- a/internal/app/subsystems/aio/store/sqlite/sqlite.go +++ b/internal/app/subsystems/aio/store/sqlite/sqlite.go @@ -343,7 +343,7 @@ func (s *SqliteStore) String() string { } func (s *SqliteStore) Start() error { - return migrations.Run(Version, s.db, 10*time.Second, migrationsFS, s.config.Plan) + return Run(Version, s.db, 10*time.Second, migrationsFS, s.config.Plan) } func (s *SqliteStore) Stop() error { diff --git a/internal/app/subsystems/aio/store/sqlite/version.go b/internal/app/subsystems/aio/store/sqlite/version.go index 30ad1974..47ff2bea 100644 --- a/internal/app/subsystems/aio/store/sqlite/version.go +++ b/internal/app/subsystems/aio/store/sqlite/version.go @@ -1,4 +1,67 @@ package sqlite +import ( + "context" + "database/sql" + "embed" + "fmt" + "time" + + "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/migrations" +) + // Version is the SQLite database release version. const Version = 1 + +func Run(currVersion int, db *sql.DB, txTimeout time.Duration, migrationsFS embed.FS, plan migrations.Plan) error { + ctx, cancel := context.WithTimeout(context.Background(), txTimeout) + defer cancel() + + // Acquire a lock to check the database version. + tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) + if err != nil { + return err + } + defer tx.Rollback() + + // Check the database version again while holding the lock + dbVersion, err := migrations.ReadVersion(db) + if err != nil { + return err + } + + if currVersion < dbVersion { + return fmt.Errorf("current version %d is less than database version %d please updated to latest resonate release", currVersion, dbVersion) + + } + if currVersion == dbVersion { + return tx.Commit() + } + + // If the database version is -1, it means the migrations table does not exist. + if dbVersion == -1 { + plan = migrations.Apply + } + + switch plan { + case migrations.Default: + return fmt.Errorf("database version %d does not match current version %d please run `resonate migrate --plan` to see migrations needed", dbVersion, currVersion) + case migrations.DryRun: + plan, err := migrations.GenerateMigrationPlan(migrationsFS, dbVersion) + if err != nil { + return err + } + fmt.Println("Migrations to apply:") + fmt.Printf("Migrations to apply: %v", plan) + case migrations.Apply: + plan, err := migrations.GenerateMigrationPlan(migrationsFS, dbVersion) + if err != nil { + return err + } + return migrations.ApplyMigrationPlan(tx, plan, txTimeout) + default: + return fmt.Errorf("invalid plan: %v", plan) + } + + return nil +} From e10591170c44f658bdb6a40b1b8159f4e94e1d5f Mon Sep 17 00:00:00 2001 From: Gabriel Guerra Date: Fri, 31 May 2024 14:43:36 -0400 Subject: [PATCH 08/10] feat(db-migrations): cleanup --- .../aio/store/migrations/migrations.go | 14 ++---- .../postgres/migrations/migration-0001.sql | 6 --- .../subsystems/aio/store/postgres/postgres.go | 11 ++++- .../aio/store/postgres/postgres_test.go | 1 - .../subsystems/aio/store/postgres/version.go | 43 ++++++++++++++----- .../sqlite/migrations/migration-0001.sql | 5 --- .../app/subsystems/aio/store/sqlite/sqlite.go | 13 +++++- .../subsystems/aio/store/sqlite/version.go | 25 ++++++++--- 8 files changed, 78 insertions(+), 40 deletions(-) diff --git a/internal/app/subsystems/aio/store/migrations/migrations.go b/internal/app/subsystems/aio/store/migrations/migrations.go index 2826a96f..fa0732ca 100644 --- a/internal/app/subsystems/aio/store/migrations/migrations.go +++ b/internal/app/subsystems/aio/store/migrations/migrations.go @@ -10,7 +10,6 @@ import ( "sort" "strconv" "strings" - "time" ) type Plan int @@ -39,11 +38,11 @@ func (m MigrationPlan) String() string { } // readVersion reads the current schema version from the database. -func ReadVersion(db *sql.DB) (int, error) { +func ReadVersion(tx *sql.Tx) (int, error) { var version int - err := db.QueryRow("SELECT id FROM migrations").Scan(&version) + err := tx.QueryRow("SELECT id FROM migrations").Scan(&version) if err != nil { - if isTableNotFoundError(err) { + if err == sql.ErrNoRows { return -1, nil } return 0, err @@ -87,18 +86,13 @@ func GenerateMigrationPlan(migrationsFS embed.FS, currentVersion int) (Migration } // applyMigrationPlan applies the migrations to the database as a single transaction. -func ApplyMigrationPlan(tx *sql.Tx, plan MigrationPlan, txTimeout time.Duration) error { +func ApplyMigrationPlan(tx *sql.Tx, plan MigrationPlan) error { for _, m := range plan { _, err := tx.Exec(string(m.Content)) if err != nil { return fmt.Errorf("failed to execute migration version %d: %w", m.Version, err) } } - - if err := tx.Commit(); err != nil { - return err - } - return nil } diff --git a/internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql b/internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql index cc53f6d5..7c1bb427 100644 --- a/internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql +++ b/internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql @@ -1,8 +1,2 @@ --- CreateTable -CREATE TABLE IF NOT EXISTS migrations ( - id INTEGER, - PRIMARY KEY(id) -); - --- CreateSchemaVersion INSERT INTO migrations (id) VALUES (1) ON CONFLICT(id) DO NOTHING; \ No newline at end of file diff --git a/internal/app/subsystems/aio/store/postgres/postgres.go b/internal/app/subsystems/aio/store/postgres/postgres.go index 6d5d7208..ac34d872 100644 --- a/internal/app/subsystems/aio/store/postgres/postgres.go +++ b/internal/app/subsystems/aio/store/postgres/postgres.go @@ -35,6 +35,11 @@ var ( ) const ( + CREATE_MIGRATIONS_TABLE_STATEMENT = ` + CREATE TABLE IF NOT EXISTS migrations ( + id INTEGER PRIMARY KEY + ); + ` DROP_TABLE_STATEMENT = ` DROP TABLE notifications; DROP TABLE subscriptions; @@ -373,8 +378,12 @@ func (s *PostgresStore) String() string { } func (s *PostgresStore) Start() error { + if _, err := s.db.Exec(CREATE_MIGRATIONS_TABLE_STATEMENT); err != nil { + return err + } + // If needed, apply migrations - return Run(Version, s.db, 10*time.Second, migrationsFS, s.config.Plan) + return Run(Version, s.db, 90*time.Second, migrationsFS, s.config.Plan) } func (s *PostgresStore) Stop() error { diff --git a/internal/app/subsystems/aio/store/postgres/postgres_test.go b/internal/app/subsystems/aio/store/postgres/postgres_test.go index 78eb4123..2912ef01 100644 --- a/internal/app/subsystems/aio/store/postgres/postgres_test.go +++ b/internal/app/subsystems/aio/store/postgres/postgres_test.go @@ -28,7 +28,6 @@ func TestPostgresStore(t *testing.T) { Database: database, Query: map[string]string{"sslmode": "disable"}, TxTimeout: 250 * time.Millisecond, - //... here + the 2x dst run.go and dst_test.go }, 1) if err != nil { t.Fatal(err) diff --git a/internal/app/subsystems/aio/store/postgres/version.go b/internal/app/subsystems/aio/store/postgres/version.go index 46f6ca83..fa31a355 100644 --- a/internal/app/subsystems/aio/store/postgres/version.go +++ b/internal/app/subsystems/aio/store/postgres/version.go @@ -18,29 +18,44 @@ func Run(currVersion int, db *sql.DB, txTimeout time.Duration, migrationsFS embe defer cancel() // Acquire a lock to check the database version. - tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) + tx, err := db.BeginTx(ctx, nil) if err != nil { return err } - defer tx.Rollback() + defer func() { + _ = tx.Rollback() + }() - _, err = tx.ExecContext(ctx, "SELECT pg_advisory_lock(1)") + var lockAcquired bool + err = tx.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(1)").Scan(&lockAcquired) if err != nil { return err } - defer tx.ExecContext(ctx, "SELECT pg_advisory_unlock(1)") + if !lockAcquired { + err = fmt.Errorf("could not acquire advisory lock") + return err + } + defer func() { + if _, unlockErr := tx.ExecContext(ctx, "SELECT pg_advisory_unlock(1)"); unlockErr != nil { + err = fmt.Errorf("%v; %v", err, unlockErr) + } + }() // Check the database version again while holding the lock - dbVersion, err := migrations.ReadVersion(db) + var dbVersion int + dbVersion, err = migrations.ReadVersion(tx) if err != nil { return err } if currVersion < dbVersion { - return fmt.Errorf("current version %d is less than database version %d please updated to latest resonate release", currVersion, dbVersion) - + err = fmt.Errorf("current version %d is less than database version %d please updated to latest resonate release", currVersion, dbVersion) + return err } if currVersion == dbVersion { + if err = tx.Commit(); err != nil { + return err + } return nil } @@ -53,18 +68,26 @@ func Run(currVersion int, db *sql.DB, txTimeout time.Duration, migrationsFS embe case migrations.Default: return fmt.Errorf("database version %d does not match current version %d please run `resonate migrate --plan` to see migrations needed", dbVersion, currVersion) case migrations.DryRun: - plan, err := migrations.GenerateMigrationPlan(migrationsFS, dbVersion) + var plan migrations.MigrationPlan + plan, err = migrations.GenerateMigrationPlan(migrationsFS, dbVersion) if err != nil { return err } fmt.Println("Migrations to apply:") fmt.Printf("Migrations to apply: %v", plan) case migrations.Apply: - plan, err := migrations.GenerateMigrationPlan(migrationsFS, dbVersion) + var plan migrations.MigrationPlan + plan, err = migrations.GenerateMigrationPlan(migrationsFS, dbVersion) if err != nil { return err } - return migrations.ApplyMigrationPlan(tx, plan, txTimeout) + if err = migrations.ApplyMigrationPlan(tx, plan); err != nil { + return err + } + + if err = tx.Commit(); err != nil { + return err + } default: return fmt.Errorf("invalid plan: %v", plan) } diff --git a/internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql b/internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql index bb27917d..7c1bb427 100644 --- a/internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql +++ b/internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql @@ -1,7 +1,2 @@ --- CreateTable -CREATE TABLE IF NOT EXISTS migrations ( - id INTEGER PRIMARY KEY -); - --- CreateSchemaVersion INSERT INTO migrations (id) VALUES (1) ON CONFLICT(id) DO NOTHING; \ No newline at end of file diff --git a/internal/app/subsystems/aio/store/sqlite/sqlite.go b/internal/app/subsystems/aio/store/sqlite/sqlite.go index 00a21bc3..a5b79154 100644 --- a/internal/app/subsystems/aio/store/sqlite/sqlite.go +++ b/internal/app/subsystems/aio/store/sqlite/sqlite.go @@ -35,6 +35,13 @@ var ( ) const ( + CREATE_MIGRATIONS_TABLE_STATEMENT = ` + CREATE TABLE IF NOT EXISTS migrations ( + id INTEGER, + PRIMARY KEY(id) + ); + ` + PROMISE_SELECT_STATEMENT = ` SELECT id, state, param_headers, param_data, value_headers, value_data, timeout, idempotency_key_for_create, idempotency_key_for_complete, tags, created_on, completed_on @@ -343,7 +350,11 @@ func (s *SqliteStore) String() string { } func (s *SqliteStore) Start() error { - return Run(Version, s.db, 10*time.Second, migrationsFS, s.config.Plan) + if _, err := s.db.Exec(CREATE_MIGRATIONS_TABLE_STATEMENT); err != nil { + return err + } + + return Run(Version, s.db, 90*time.Second, migrationsFS, s.config.Plan) } func (s *SqliteStore) Stop() error { diff --git a/internal/app/subsystems/aio/store/sqlite/version.go b/internal/app/subsystems/aio/store/sqlite/version.go index 47ff2bea..e483ce56 100644 --- a/internal/app/subsystems/aio/store/sqlite/version.go +++ b/internal/app/subsystems/aio/store/sqlite/version.go @@ -22,10 +22,13 @@ func Run(currVersion int, db *sql.DB, txTimeout time.Duration, migrationsFS embe if err != nil { return err } - defer tx.Rollback() + defer func() { + _ = tx.Rollback() + }() // Check the database version again while holding the lock - dbVersion, err := migrations.ReadVersion(db) + var dbVersion int + dbVersion, err = migrations.ReadVersion(tx) if err != nil { return err } @@ -35,7 +38,10 @@ func Run(currVersion int, db *sql.DB, txTimeout time.Duration, migrationsFS embe } if currVersion == dbVersion { - return tx.Commit() + if err = tx.Commit(); err != nil { + return err + } + return nil } // If the database version is -1, it means the migrations table does not exist. @@ -47,18 +53,25 @@ func Run(currVersion int, db *sql.DB, txTimeout time.Duration, migrationsFS embe case migrations.Default: return fmt.Errorf("database version %d does not match current version %d please run `resonate migrate --plan` to see migrations needed", dbVersion, currVersion) case migrations.DryRun: - plan, err := migrations.GenerateMigrationPlan(migrationsFS, dbVersion) + var plan migrations.MigrationPlan + plan, err = migrations.GenerateMigrationPlan(migrationsFS, dbVersion) if err != nil { return err } fmt.Println("Migrations to apply:") fmt.Printf("Migrations to apply: %v", plan) case migrations.Apply: - plan, err := migrations.GenerateMigrationPlan(migrationsFS, dbVersion) + var plan migrations.MigrationPlan + plan, err = migrations.GenerateMigrationPlan(migrationsFS, dbVersion) if err != nil { return err } - return migrations.ApplyMigrationPlan(tx, plan, txTimeout) + if err = migrations.ApplyMigrationPlan(tx, plan); err != nil { + return err + } + if err = tx.Commit(); err != nil { + return err + } default: return fmt.Errorf("invalid plan: %v", plan) } From d54ed9ee47bb8bd29934ac7682d97ed38204449b Mon Sep 17 00:00:00 2001 From: Gabriel Guerra Date: Sat, 1 Jun 2024 22:54:17 -0400 Subject: [PATCH 09/10] fix(linter): for the linting gods --- .../app/subsystems/aio/store/migrations/migrations.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/internal/app/subsystems/aio/store/migrations/migrations.go b/internal/app/subsystems/aio/store/migrations/migrations.go index fa0732ca..e27ab61d 100644 --- a/internal/app/subsystems/aio/store/migrations/migrations.go +++ b/internal/app/subsystems/aio/store/migrations/migrations.go @@ -96,17 +96,6 @@ func ApplyMigrationPlan(tx *sql.Tx, plan MigrationPlan) error { return nil } -// private - -// db.QueryRow does not return a specific error type when the table does not exist so we need to check the error message. -func isTableNotFoundError(err error) bool { - errStr := err.Error() - if strings.Contains(errStr, "no such table") || strings.Contains(errStr, "does not exist") { - return true - } - return false -} - // filenameVersion extracts the version number from a migration filename. func migrationVersion(filename string) (int, error) { re := regexp.MustCompile(`\d+`) From f48faf8c197c4aa1991774eb3e93f93217af073c Mon Sep 17 00:00:00 2001 From: Gabriel Guerra Date: Sat, 1 Jun 2024 23:16:44 -0400 Subject: [PATCH 10/10] fix(plan): dry run option needing cleaning up --- cmd/migrations/migrations.go | 14 ++++++++------ cmd/util/config.go | 2 -- .../subsystems/aio/store/migrations/migrations.go | 5 ++--- .../app/subsystems/aio/store/postgres/version.go | 2 +- .../app/subsystems/aio/store/sqlite/version.go | 5 ++--- 5 files changed, 13 insertions(+), 15 deletions(-) diff --git a/cmd/migrations/migrations.go b/cmd/migrations/migrations.go index 6adc7496..18addfb2 100644 --- a/cmd/migrations/migrations.go +++ b/cmd/migrations/migrations.go @@ -30,6 +30,11 @@ func NewCmd() *cobra.Command { Use: "migrate", Short: "Synchronizes the database state with the current set of models and migrations", RunE: func(cmd *cobra.Command, args []string) error { + desiredPlan := migrations.Apply + if plan { + desiredPlan = migrations.DryRun + } + cfg := &util.AIOSubsystemConfig[util.StoreConfig]{ Subsystem: &aio.SubsystemConfig{ Size: 1, @@ -38,10 +43,10 @@ func NewCmd() *cobra.Command { }, Config: &util.StoreConfig{ Kind: util.StoreKind(aioStore), - Plan: migrations.Apply, Sqlite: &sqlite.Config{ Path: aioStoreSqlitePath, TxTimeout: aioStoreSqliteTxTimeout, + Plan: desiredPlan, }, Postgres: &postgres.Config{ Host: aioStorePostgresHost, @@ -51,14 +56,11 @@ func NewCmd() *cobra.Command { Database: aioStorePostgresDatabase, Query: aioStorePostgresQuery, TxTimeout: aioStorePostgresTxTimeout, + Plan: desiredPlan, }, }, } - if plan { - cfg.Config.Plan = migrations.DryRun - } - store, err := util.NewStore(cfg) if err != nil { return err @@ -83,7 +85,7 @@ func NewCmd() *cobra.Command { cmd.Flags().StringVar(&aioStorePostgresDatabase, "aio-store-postgres-database", "resonate", "postgres database name") cmd.Flags().StringToStringVar(&aioStorePostgresQuery, "aio-store-postgres-query", make(map[string]string, 0), "postgres query options") cmd.Flags().DurationVar(&aioStorePostgresTxTimeout, "aio-store-postgres-tx-timeout", 10_000*time.Millisecond, "postgres transaction timeout") - cmd.Flags().BoolVarP(&plan, "plan", "p", true, "Dry run to check which migrations will be applied") + cmd.Flags().BoolVarP(&plan, "plan", "p", false, "Dry run to check which migrations will be applied") return cmd } diff --git a/cmd/util/config.go b/cmd/util/config.go index 7bc9d99f..122697df 100644 --- a/cmd/util/config.go +++ b/cmd/util/config.go @@ -8,7 +8,6 @@ import ( "github.com/resonatehq/resonate/internal/aio" "github.com/resonatehq/resonate/internal/app/subsystems/aio/network" "github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing" - "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/migrations" "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/postgres" "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/sqlite" "github.com/resonatehq/resonate/internal/app/subsystems/api/grpc" @@ -134,7 +133,6 @@ const ( type StoreConfig struct { Kind StoreKind - Plan migrations.Plan Sqlite *sqlite.Config Postgres *postgres.Config } diff --git a/internal/app/subsystems/aio/store/migrations/migrations.go b/internal/app/subsystems/aio/store/migrations/migrations.go index e27ab61d..6483dd40 100644 --- a/internal/app/subsystems/aio/store/migrations/migrations.go +++ b/internal/app/subsystems/aio/store/migrations/migrations.go @@ -29,10 +29,9 @@ type MigrationPlan []Migration func (m MigrationPlan) String() string { var sb strings.Builder - sb.WriteString("Operations to perform:\n") - sb.WriteString("Apply all migrations:") for _, migration := range m { - sb.WriteString(fmt.Sprintf(" %d", migration.Version)) + sb.WriteString(fmt.Sprintf("migration-%d...\n", migration.Version)) + sb.WriteString(string(migration.Content)) } return sb.String() } diff --git a/internal/app/subsystems/aio/store/postgres/version.go b/internal/app/subsystems/aio/store/postgres/version.go index fa31a355..7e09d17a 100644 --- a/internal/app/subsystems/aio/store/postgres/version.go +++ b/internal/app/subsystems/aio/store/postgres/version.go @@ -60,7 +60,7 @@ func Run(currVersion int, db *sql.DB, txTimeout time.Duration, migrationsFS embe } // If the database version is -1, it means the migrations table does not exist. - if dbVersion == -1 { + if dbVersion == -1 && plan != migrations.DryRun { plan = migrations.Apply } diff --git a/internal/app/subsystems/aio/store/sqlite/version.go b/internal/app/subsystems/aio/store/sqlite/version.go index e483ce56..2d4ae889 100644 --- a/internal/app/subsystems/aio/store/sqlite/version.go +++ b/internal/app/subsystems/aio/store/sqlite/version.go @@ -45,7 +45,7 @@ func Run(currVersion int, db *sql.DB, txTimeout time.Duration, migrationsFS embe } // If the database version is -1, it means the migrations table does not exist. - if dbVersion == -1 { + if dbVersion == -1 && plan != migrations.DryRun { plan = migrations.Apply } @@ -58,8 +58,7 @@ func Run(currVersion int, db *sql.DB, txTimeout time.Duration, migrationsFS embe if err != nil { return err } - fmt.Println("Migrations to apply:") - fmt.Printf("Migrations to apply: %v", plan) + fmt.Printf("Migrations to apply:\n%s\n", plan) case migrations.Apply: var plan migrations.MigrationPlan plan, err = migrations.GenerateMigrationPlan(migrationsFS, dbVersion)