diff --git a/cmd/migrations/migrations.go b/cmd/migrations/migrations.go new file mode 100644 index 00000000..18addfb2 --- /dev/null +++ b/cmd/migrations/migrations.go @@ -0,0 +1,91 @@ +package migrations + +import ( + "time" + + "github.com/resonatehq/resonate/cmd/util" + "github.com/resonatehq/resonate/internal/aio" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/migrations" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/postgres" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/sqlite" + "github.com/spf13/cobra" +) + +func NewCmd() *cobra.Command { + var ( + aioStore string + aioStoreSqlitePath string + aioStoreSqliteTxTimeout time.Duration + aioStorePostgresHost string + aioStorePostgresPort string + aioStorePostgresUsername string + aioStorePostgresPassword string + aioStorePostgresDatabase string + aioStorePostgresQuery map[string]string + aioStorePostgresTxTimeout time.Duration + plan bool + ) + + var cmd = &cobra.Command{ + Use: "migrate", + Short: "Synchronizes the database state with the current set of models and migrations", + RunE: func(cmd *cobra.Command, args []string) error { + desiredPlan := migrations.Apply + if plan { + desiredPlan = migrations.DryRun + } + + cfg := &util.AIOSubsystemConfig[util.StoreConfig]{ + Subsystem: &aio.SubsystemConfig{ + Size: 1, + BatchSize: 1, + Workers: 1, // Only one connection will be opened to run migrations. + }, + Config: &util.StoreConfig{ + Kind: util.StoreKind(aioStore), + Sqlite: &sqlite.Config{ + Path: aioStoreSqlitePath, + TxTimeout: aioStoreSqliteTxTimeout, + Plan: desiredPlan, + }, + Postgres: &postgres.Config{ + Host: aioStorePostgresHost, + Port: aioStorePostgresPort, + Username: aioStorePostgresUsername, + Password: aioStorePostgresPassword, + Database: aioStorePostgresDatabase, + Query: aioStorePostgresQuery, + TxTimeout: aioStorePostgresTxTimeout, + Plan: desiredPlan, + }, + }, + } + + store, err := util.NewStore(cfg) + if err != nil { + return err + } + + // Runs migrations. + if err := store.Start(); err != nil { + return err + } + + return store.Stop() + }, + } + + cmd.Flags().StringVar(&aioStore, "aio-store", "sqlite", "promise store type") + cmd.Flags().StringVar(&aioStoreSqlitePath, "aio-store-sqlite-path", "resonate.db", "sqlite database path") + cmd.Flags().DurationVar(&aioStoreSqliteTxTimeout, "aio-store-sqlite-tx-timeout", 10_000*time.Millisecond, "sqlite transaction timeout") + cmd.Flags().StringVar(&aioStorePostgresHost, "aio-store-postgres-host", "localhost", "postgres host") + cmd.Flags().StringVar(&aioStorePostgresPort, "aio-store-postgres-port", "5432", "postgres port") + cmd.Flags().StringVar(&aioStorePostgresUsername, "aio-store-postgres-username", "", "postgres username") + cmd.Flags().StringVar(&aioStorePostgresPassword, "aio-store-postgres-password", "", "postgres password") + cmd.Flags().StringVar(&aioStorePostgresDatabase, "aio-store-postgres-database", "resonate", "postgres database name") + cmd.Flags().StringToStringVar(&aioStorePostgresQuery, "aio-store-postgres-query", make(map[string]string, 0), "postgres query options") + cmd.Flags().DurationVar(&aioStorePostgresTxTimeout, "aio-store-postgres-tx-timeout", 10_000*time.Millisecond, "postgres transaction timeout") + cmd.Flags().BoolVarP(&plan, "plan", "p", false, "Dry run to check which migrations will be applied") + + return cmd +} diff --git a/cmd/root.go b/cmd/root.go index 238d52d4..5c22af1d 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/resonatehq/resonate/cmd/dst" + "github.com/resonatehq/resonate/cmd/migrations" "github.com/resonatehq/resonate/cmd/promises" "github.com/resonatehq/resonate/cmd/quickstart" "github.com/resonatehq/resonate/cmd/schedules" @@ -43,6 +44,7 @@ func init() { rootCmd.AddCommand(dst.NewCmd()) rootCmd.AddCommand(serve.ServeCmd()) rootCmd.AddCommand(quickstart.NewCmd()) + rootCmd.AddCommand(migrations.NewCmd()) // Set default output rootCmd.SetOut(os.Stdout) diff --git a/internal/app/subsystems/aio/store/migrations/migrations.go b/internal/app/subsystems/aio/store/migrations/migrations.go new file mode 100644 index 00000000..6483dd40 --- /dev/null +++ b/internal/app/subsystems/aio/store/migrations/migrations.go @@ -0,0 +1,110 @@ +// Shared package for handling database migrations. +package migrations + +import ( + "database/sql" + "embed" + "fmt" + "path/filepath" + "regexp" + "sort" + "strconv" + "strings" +) + +type Plan int + +const ( + Default Plan = iota // 0 is the default value + DryRun + Apply +) + +type Migration struct { + Version int + Content []byte +} + +type MigrationPlan []Migration + +func (m MigrationPlan) String() string { + var sb strings.Builder + for _, migration := range m { + sb.WriteString(fmt.Sprintf("migration-%d...\n", migration.Version)) + sb.WriteString(string(migration.Content)) + } + return sb.String() +} + +// readVersion reads the current schema version from the database. +func ReadVersion(tx *sql.Tx) (int, error) { + var version int + err := tx.QueryRow("SELECT id FROM migrations").Scan(&version) + if err != nil { + if err == sql.ErrNoRows { + return -1, nil + } + return 0, err + } + return version, nil +} + +// generateMigrationPlan reads the migrations from the filesystem and returns a plan of migrations to apply. +func GenerateMigrationPlan(migrationsFS embed.FS, currentVersion int) (MigrationPlan, error) { + migrations := []Migration{} + entries, err := migrationsFS.ReadDir("migrations") + if err != nil { + return nil, err + } + + for _, entry := range entries { + filename := entry.Name() + content, err := migrationsFS.ReadFile("migrations/" + filename) + if err != nil { + return nil, err + } + version, err := migrationVersion(filename) + if err != nil { + return nil, err + } + // Skip migrations that are at or below the current version. + if version <= currentVersion { + continue + } + migrations = append(migrations, Migration{ + Version: version, + Content: content, + }) + } + + sort.Slice(migrations, func(i, j int) bool { + return migrations[i].Version < migrations[j].Version + }) + + return migrations, nil +} + +// applyMigrationPlan applies the migrations to the database as a single transaction. +func ApplyMigrationPlan(tx *sql.Tx, plan MigrationPlan) error { + for _, m := range plan { + _, err := tx.Exec(string(m.Content)) + if err != nil { + return fmt.Errorf("failed to execute migration version %d: %w", m.Version, err) + } + } + return nil +} + +// filenameVersion extracts the version number from a migration filename. +func migrationVersion(filename string) (int, error) { + re := regexp.MustCompile(`\d+`) + versionStr := re.FindString(filepath.Base(filename)) + if versionStr == "" { + return 0, fmt.Errorf("could not extract version from filename: %s", filename) + } + version, err := strconv.Atoi(versionStr) + if err != nil { + return 0, fmt.Errorf("could not convert version string to int: %v", err) + } + return version, nil +} diff --git a/internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql b/internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql new file mode 100644 index 00000000..7c1bb427 --- /dev/null +++ b/internal/app/subsystems/aio/store/postgres/migrations/migration-0001.sql @@ -0,0 +1,2 @@ +--- CreateSchemaVersion +INSERT INTO migrations (id) VALUES (1) ON CONFLICT(id) DO NOTHING; \ No newline at end of file diff --git a/internal/app/subsystems/aio/store/postgres/migrations/schema-0000.sql b/internal/app/subsystems/aio/store/postgres/migrations/schema-0000.sql new file mode 100644 index 00000000..befdc364 --- /dev/null +++ b/internal/app/subsystems/aio/store/postgres/migrations/schema-0000.sql @@ -0,0 +1,119 @@ +-- CreateTable +CREATE TABLE IF NOT EXISTS promises ( + id TEXT, + sort_id SERIAL, + state INTEGER DEFAULT 1, + param_headers JSONB, + param_data BYTEA, + value_headers JSONB, + value_data BYTEA, + timeout BIGINT, + idempotency_key_for_create TEXT, + idempotency_key_for_complete TEXT, + tags JSONB, + created_on BIGINT, + completed_on BIGINT, + PRIMARY KEY(id) +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_promises_sort_id ON promises(sort_id); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_promises_invocation ON promises((tags->>'resonate:invocation')); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_promises_timeout ON promises((tags->>'resonate:timeout')); + +-- CreateTable +CREATE TABLE IF NOT EXISTS schedules ( + id TEXT, + sort_id SERIAL, + description TEXT, + cron TEXT, + tags JSONB, + promise_id TEXT, + promise_timeout BIGINT, + promise_param_headers JSONB, + promise_param_data BYTEA, + promise_tags JSONB, + last_run_time BIGINT, + next_run_time BIGINT, + idempotency_key TEXT, + created_on BIGINT, + PRIMARY KEY(id) +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_schedules_sort_id ON schedules(sort_id); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_schedules_next_run_time ON schedules(next_run_time); + +-- CreateTable +CREATE TABLE IF NOT EXISTS tasks ( + id TEXT, + counter INTEGER, + promise_id TEXT, + claim_timeout BIGINT, + complete_timeout BIGINT, + promise_timeout BIGINT, + created_on BIGINT, + completed_on BIGINT, + is_completed BOOLEAN, + PRIMARY KEY(id) +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_tasks_id ON tasks(id); + +-- CreateTable +CREATE TABLE IF NOT EXISTS locks ( + resource_id TEXT, + process_id TEXT, + execution_id TEXT, + expiry_in_milliseconds BIGINT, + timeout BIGINT, + PRIMARY KEY(resource_id) +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_locks_acquire_id ON locks(resource_id, execution_id); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_locks_heartbeat_id ON locks(process_id); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_locks_timeout ON locks(timeout); + +-- CreateTable +CREATE TABLE IF NOT EXISTS timeouts ( + id TEXT, + time BIGINT, + PRIMARY KEY(id) +); + +-- CreateTable +CREATE TABLE IF NOT EXISTS subscriptions ( + id TEXT, + sort_id SERIAL, + promise_id TEXT, + url TEXT, + retry_policy BYTEA, + created_on BIGINT, + PRIMARY KEY(id, promise_id) +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_subscriptions_sort_id ON subscriptions(sort_id); + +-- CreateTable +CREATE TABLE IF NOT EXISTS notifications ( + id TEXT, + promise_id TEXT, + url TEXT, + retry_policy BYTEA, + time BIGINT, + attempt INTEGER, + PRIMARY KEY(id, promise_id) +); diff --git a/internal/app/subsystems/aio/store/postgres/postgres.go b/internal/app/subsystems/aio/store/postgres/postgres.go index 0ebe9622..ac34d872 100644 --- a/internal/app/subsystems/aio/store/postgres/postgres.go +++ b/internal/app/subsystems/aio/store/postgres/postgres.go @@ -3,6 +3,8 @@ package postgres import ( "context" "database/sql" + "embed" + _ "embed" "encoding/json" "fmt" "net/url" @@ -11,6 +13,7 @@ import ( "github.com/resonatehq/resonate/internal/aio" "github.com/resonatehq/resonate/internal/app/subsystems/aio/store" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/migrations" "github.com/resonatehq/resonate/internal/kernel/bus" "github.com/resonatehq/resonate/internal/kernel/t_aio" @@ -26,113 +29,17 @@ import ( _ "github.com/lib/pq" ) -const ( - CREATE_TABLE_STATEMENT = ` - CREATE TABLE IF NOT EXISTS promises ( - id TEXT, - sort_id SERIAL, - state INTEGER DEFAULT 1, - param_headers JSONB, - param_data BYTEA, - value_headers JSONB, - value_data BYTEA, - timeout BIGINT, - idempotency_key_for_create TEXT, - idempotency_key_for_complete TEXT, - tags JSONB, - created_on BIGINT, - completed_on BIGINT, - PRIMARY KEY(id) - ); - - CREATE INDEX IF NOT EXISTS idx_promises_sort_id ON promises(sort_id); - CREATE INDEX IF NOT EXISTS idx_promises_invocation ON promises((tags->>'resonate:invocation')); - CREATE INDEX IF NOT EXISTS idx_promises_timeout ON promises((tags->>'resonate:timeout')); - - CREATE TABLE IF NOT EXISTS schedules ( - id TEXT, - sort_id SERIAL, - description TEXT, - cron TEXT, - tags JSONB, - promise_id TEXT, - promise_timeout BIGINT, - promise_param_headers JSONB, - promise_param_data BYTEA, - promise_tags JSONB, - last_run_time BIGINT, - next_run_time BIGINT, - idempotency_key TEXT, - created_on BIGINT, - PRIMARY KEY(id) - ); - - CREATE INDEX IF NOT EXISTS idx_schedules_sort_id ON schedules(sort_id); - CREATE INDEX IF NOT EXISTS idx_schedules_next_run_time ON schedules(next_run_time); - - CREATE TABLE IF NOT EXISTS tasks ( - id TEXT, - counter INTEGER, - promise_id TEXT, - claim_timeout BIGINT, - complete_timeout BIGINT, - promise_timeout BIGINT, - created_on BIGINT, - completed_on BIGINT, - is_completed BOOLEAN, - PRIMARY KEY(id) - ); - - CREATE INDEX IF NOT EXISTS idx_tasks_id ON tasks(id); - - CREATE TABLE IF NOT EXISTS locks ( - resource_id TEXT, - process_id TEXT, - execution_id TEXT, - expiry_in_milliseconds BIGINT, - timeout BIGINT, - PRIMARY KEY(resource_id) - ); - - CREATE INDEX IF NOT EXISTS idx_locks_acquire_id ON locks(resource_id, execution_id); - CREATE INDEX IF NOT EXISTS idx_locks_heartbeat_id ON locks(process_id); - CREATE INDEX IF NOT EXISTS idx_locks_timeout ON locks(timeout); - - CREATE TABLE IF NOT EXISTS timeouts ( - id TEXT, - time BIGINT, - PRIMARY KEY(id) - ); - - CREATE TABLE IF NOT EXISTS subscriptions ( - id TEXT, - sort_id SERIAL, - promise_id TEXT, - url TEXT, - retry_policy BYTEA, - created_on BIGINT, - PRIMARY KEY(id, promise_id) - ); - - CREATE INDEX IF NOT EXISTS idx_subscriptions_sort_id ON subscriptions(sort_id); +var ( + //go:embed migrations/* + migrationsFS embed.FS +) - CREATE TABLE IF NOT EXISTS notifications ( - id TEXT, - promise_id TEXT, - url TEXT, - retry_policy BYTEA, - time BIGINT, - attempt INTEGER, - PRIMARY KEY(id, promise_id) - ); - +const ( + CREATE_MIGRATIONS_TABLE_STATEMENT = ` CREATE TABLE IF NOT EXISTS migrations ( - id INTEGER, - PRIMARY KEY(id) + id INTEGER PRIMARY KEY ); - - INSERT INTO migrations (id) VALUES (1) ON CONFLICT(id) DO NOTHING;` - + ` DROP_TABLE_STATEMENT = ` DROP TABLE notifications; DROP TABLE subscriptions; @@ -424,6 +331,7 @@ type Config struct { Query map[string]string TxTimeout time.Duration Reset bool + Plan migrations.Plan } type PostgresStore struct { @@ -470,11 +378,12 @@ func (s *PostgresStore) String() string { } func (s *PostgresStore) Start() error { - if _, err := s.db.Exec(CREATE_TABLE_STATEMENT); err != nil { + if _, err := s.db.Exec(CREATE_MIGRATIONS_TABLE_STATEMENT); err != nil { return err } - return nil + // If needed, apply migrations + return Run(Version, s.db, 90*time.Second, migrationsFS, s.config.Plan) } func (s *PostgresStore) Stop() error { diff --git a/internal/app/subsystems/aio/store/postgres/version.go b/internal/app/subsystems/aio/store/postgres/version.go new file mode 100644 index 00000000..7e09d17a --- /dev/null +++ b/internal/app/subsystems/aio/store/postgres/version.go @@ -0,0 +1,96 @@ +package postgres + +import ( + "context" + "database/sql" + "embed" + "fmt" + "time" + + "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/migrations" +) + +// Version is the Postgres database release version. +const Version = 1 + +func Run(currVersion int, db *sql.DB, txTimeout time.Duration, migrationsFS embed.FS, plan migrations.Plan) error { + ctx, cancel := context.WithTimeout(context.Background(), txTimeout) + defer cancel() + + // Acquire a lock to check the database version. + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer func() { + _ = tx.Rollback() + }() + + var lockAcquired bool + err = tx.QueryRowContext(ctx, "SELECT pg_try_advisory_lock(1)").Scan(&lockAcquired) + if err != nil { + return err + } + if !lockAcquired { + err = fmt.Errorf("could not acquire advisory lock") + return err + } + defer func() { + if _, unlockErr := tx.ExecContext(ctx, "SELECT pg_advisory_unlock(1)"); unlockErr != nil { + err = fmt.Errorf("%v; %v", err, unlockErr) + } + }() + + // Check the database version again while holding the lock + var dbVersion int + dbVersion, err = migrations.ReadVersion(tx) + if err != nil { + return err + } + + if currVersion < dbVersion { + err = fmt.Errorf("current version %d is less than database version %d please updated to latest resonate release", currVersion, dbVersion) + return err + } + if currVersion == dbVersion { + if err = tx.Commit(); err != nil { + return err + } + return nil + } + + // If the database version is -1, it means the migrations table does not exist. + if dbVersion == -1 && plan != migrations.DryRun { + plan = migrations.Apply + } + + switch plan { + case migrations.Default: + return fmt.Errorf("database version %d does not match current version %d please run `resonate migrate --plan` to see migrations needed", dbVersion, currVersion) + case migrations.DryRun: + var plan migrations.MigrationPlan + plan, err = migrations.GenerateMigrationPlan(migrationsFS, dbVersion) + if err != nil { + return err + } + fmt.Println("Migrations to apply:") + fmt.Printf("Migrations to apply: %v", plan) + case migrations.Apply: + var plan migrations.MigrationPlan + plan, err = migrations.GenerateMigrationPlan(migrationsFS, dbVersion) + if err != nil { + return err + } + if err = migrations.ApplyMigrationPlan(tx, plan); err != nil { + return err + } + + if err = tx.Commit(); err != nil { + return err + } + default: + return fmt.Errorf("invalid plan: %v", plan) + } + + return nil +} diff --git a/internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql b/internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql new file mode 100644 index 00000000..7c1bb427 --- /dev/null +++ b/internal/app/subsystems/aio/store/sqlite/migrations/migration-0001.sql @@ -0,0 +1,2 @@ +--- CreateSchemaVersion +INSERT INTO migrations (id) VALUES (1) ON CONFLICT(id) DO NOTHING; \ No newline at end of file diff --git a/internal/app/subsystems/aio/store/sqlite/migrations/schema-0000.sql b/internal/app/subsystems/aio/store/sqlite/migrations/schema-0000.sql new file mode 100644 index 00000000..4496960e --- /dev/null +++ b/internal/app/subsystems/aio/store/sqlite/migrations/schema-0000.sql @@ -0,0 +1,115 @@ +-- CreateTable +CREATE TABLE IF NOT EXISTS promises ( + id TEXT UNIQUE, + sort_id INTEGER PRIMARY KEY AUTOINCREMENT, + state INTEGER DEFAULT 1, + param_headers BLOB, + param_data BLOB, + value_headers BLOB, + value_data BLOB, + timeout INTEGER, + idempotency_key_for_create TEXT, + idempotency_key_for_complete TEXT, + tags BLOB, + created_on INTEGER, + completed_on INTEGER +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_promises_id ON promises(id); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_promises_invocation ON promises(json_extract(tags, '$.resonate:invocation')); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_promises_timeout ON promises(json_extract(tags, '$.resonate:timeout')); + +-- CreateTable +CREATE TABLE IF NOT EXISTS schedules ( + id TEXT UNIQUE, + sort_id INTEGER PRIMARY KEY AUTOINCREMENT, + description TEXT, + cron TEXT, + tags BLOB, + promise_id TEXT, + promise_timeout INTEGER, + promise_param_headers BLOB, + promise_param_data BLOB, + promise_tags BLOB, + last_run_time INTEGER, + next_run_time INTEGER, + idempotency_key TEXT, + created_on INTEGER +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_schedules_id ON schedules(id); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_schedules_next_run_time ON schedules(next_run_time); + +-- CreateTable +CREATE TABLE IF NOT EXISTS tasks ( + id TEXT UNIQUE, + counter INTEGER, + promise_id TEXT, + claim_timeout INTEGER, + complete_timeout INTEGER, + promise_timeout INTEGER, + created_on INTEGER, + completed_on INTEGER, + is_completed BOOLEAN +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_tasks_id ON tasks(id); + +-- CreateTable +CREATE TABLE IF NOT EXISTS locks ( + resource_id TEXT UNIQUE, + process_id TEXT, + execution_id TEXT, + expiry_in_milliseconds INTEGER, + timeout INTEGER +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_locks_acquire_id ON locks(resource_id, execution_id); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_locks_heartbeat_id ON locks(process_id); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_locks_timeout ON locks(timeout); + +-- CreateTable +CREATE TABLE IF NOT EXISTS timeouts ( + id TEXT, + time INTEGER, + PRIMARY KEY(id) +); + +-- CreateTable +CREATE TABLE IF NOT EXISTS subscriptions ( + id TEXT, + promise_id TEXT, + sort_id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT, + retry_policy BLOB, + created_on INTEGER, + UNIQUE(id, promise_id) +); + +-- CreateIndex +CREATE INDEX IF NOT EXISTS idx_subscriptions_id ON subscriptions(id); + +-- CreateTable +CREATE TABLE IF NOT EXISTS notifications ( + id TEXT, + promise_id TEXT, + url TEXT, + retry_policy BLOB, + time INTEGER, + attempt INTEGER, + PRIMARY KEY(id, promise_id) +); diff --git a/internal/app/subsystems/aio/store/sqlite/sqlite.go b/internal/app/subsystems/aio/store/sqlite/sqlite.go index 83539ce6..a5b79154 100644 --- a/internal/app/subsystems/aio/store/sqlite/sqlite.go +++ b/internal/app/subsystems/aio/store/sqlite/sqlite.go @@ -3,6 +3,8 @@ package sqlite import ( "context" "database/sql" + "embed" + _ "embed" "encoding/json" "fmt" "os" @@ -11,6 +13,7 @@ import ( "github.com/resonatehq/resonate/internal/aio" "github.com/resonatehq/resonate/internal/app/subsystems/aio/store" + "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/migrations" "github.com/resonatehq/resonate/internal/kernel/bus" "github.com/resonatehq/resonate/internal/kernel/t_aio" @@ -26,107 +29,18 @@ import ( _ "github.com/mattn/go-sqlite3" ) -const ( - CREATE_TABLE_STATEMENT = ` - CREATE TABLE IF NOT EXISTS promises ( - id TEXT UNIQUE, - sort_id INTEGER PRIMARY KEY AUTOINCREMENT, - state INTEGER DEFAULT 1, - param_headers BLOB, - param_data BLOB, - value_headers BLOB, - value_data BLOB, - timeout INTEGER, - idempotency_key_for_create TEXT, - idempotency_key_for_complete TEXT, - tags BLOB, - created_on INTEGER, - completed_on INTEGER - ); - - CREATE INDEX IF NOT EXISTS idx_promises_id ON promises(id); - CREATE INDEX IF NOT EXISTS idx_promises_invocation ON promises(json_extract(tags, '$.resonate:invocation')); - CREATE INDEX IF NOT EXISTS idx_promises_timeout ON promises(json_extract(tags, '$.resonate:timeout')); - - CREATE TABLE IF NOT EXISTS schedules ( - id TEXT UNIQUE, - sort_id INTEGER PRIMARY KEY AUTOINCREMENT, - description TEXT, - cron TEXT, - tags BLOB, - promise_id TEXT, - promise_timeout INTEGER, - promise_param_headers BLOB, - promise_param_data BLOB, - promise_tags BLOB, - last_run_time INTEGER, - next_run_time INTEGER, - idempotency_key TEXT, - created_on INTEGER - ); - - CREATE INDEX IF NOT EXISTS idx_schedules_id ON schedules(id); - CREATE INDEX IF NOT EXISTS idx_schedules_next_run_time ON schedules(next_run_time); - - CREATE TABLE IF NOT EXISTS tasks ( - id TEXT UNIQUE, - counter INTEGER, - promise_id TEXT, - claim_timeout INTEGER, - complete_timeout INTEGER, - promise_timeout INTEGER, - created_on INTEGER, - completed_on INTEGER, - is_completed BOOLEAN - ); - - CREATE INDEX IF NOT EXISTS idx_tasks_id ON tasks(id); - - CREATE TABLE IF NOT EXISTS locks ( - resource_id TEXT UNIQUE, - process_id TEXT, - execution_id TEXT, - expiry_in_milliseconds INTEGER, - timeout INTEGER - ); - - CREATE INDEX IF NOT EXISTS idx_locks_acquire_id ON locks(resource_id, execution_id); - CREATE INDEX IF NOT EXISTS idx_locks_heartbeat_id ON locks(process_id); - CREATE INDEX IF NOT EXISTS idx_locks_timeout ON locks(timeout); - - CREATE TABLE IF NOT EXISTS timeouts ( - id TEXT, - time INTEGER, - PRIMARY KEY(id) - ); - - CREATE TABLE IF NOT EXISTS subscriptions ( - id TEXT, - promise_id TEXT, - sort_id INTEGER PRIMARY KEY AUTOINCREMENT, - url TEXT, - retry_policy BLOB, - created_on INTEGER, - UNIQUE(id, promise_id) - ); - - CREATE INDEX IF NOT EXISTS idx_subscriptions_id ON subscriptions(id); +var ( + //go:embed migrations/* + migrationsFS embed.FS +) - CREATE TABLE IF NOT EXISTS notifications ( - id TEXT, - promise_id TEXT, - url TEXT, - retry_policy BLOB, - time INTEGER, - attempt INTEGER, - PRIMARY KEY(id, promise_id) - ); - +const ( + CREATE_MIGRATIONS_TABLE_STATEMENT = ` CREATE TABLE IF NOT EXISTS migrations ( - id INTEGER PRIMARY KEY + id INTEGER, + PRIMARY KEY(id) ); - - INSERT INTO migrations (id) VALUES (1) ON CONFLICT(id) DO NOTHING;` + ` PROMISE_SELECT_STATEMENT = ` SELECT @@ -407,6 +321,7 @@ type Config struct { Path string TxTimeout time.Duration Reset bool + Plan migrations.Plan } type SqliteStore struct { @@ -435,11 +350,11 @@ func (s *SqliteStore) String() string { } func (s *SqliteStore) Start() error { - if _, err := s.db.Exec(CREATE_TABLE_STATEMENT); err != nil { + if _, err := s.db.Exec(CREATE_MIGRATIONS_TABLE_STATEMENT); err != nil { return err } - return nil + return Run(Version, s.db, 90*time.Second, migrationsFS, s.config.Plan) } func (s *SqliteStore) Stop() error { diff --git a/internal/app/subsystems/aio/store/sqlite/version.go b/internal/app/subsystems/aio/store/sqlite/version.go new file mode 100644 index 00000000..2d4ae889 --- /dev/null +++ b/internal/app/subsystems/aio/store/sqlite/version.go @@ -0,0 +1,79 @@ +package sqlite + +import ( + "context" + "database/sql" + "embed" + "fmt" + "time" + + "github.com/resonatehq/resonate/internal/app/subsystems/aio/store/migrations" +) + +// Version is the SQLite database release version. +const Version = 1 + +func Run(currVersion int, db *sql.DB, txTimeout time.Duration, migrationsFS embed.FS, plan migrations.Plan) error { + ctx, cancel := context.WithTimeout(context.Background(), txTimeout) + defer cancel() + + // Acquire a lock to check the database version. + tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) + if err != nil { + return err + } + defer func() { + _ = tx.Rollback() + }() + + // Check the database version again while holding the lock + var dbVersion int + dbVersion, err = migrations.ReadVersion(tx) + if err != nil { + return err + } + + if currVersion < dbVersion { + return fmt.Errorf("current version %d is less than database version %d please updated to latest resonate release", currVersion, dbVersion) + + } + if currVersion == dbVersion { + if err = tx.Commit(); err != nil { + return err + } + return nil + } + + // If the database version is -1, it means the migrations table does not exist. + if dbVersion == -1 && plan != migrations.DryRun { + plan = migrations.Apply + } + + switch plan { + case migrations.Default: + return fmt.Errorf("database version %d does not match current version %d please run `resonate migrate --plan` to see migrations needed", dbVersion, currVersion) + case migrations.DryRun: + var plan migrations.MigrationPlan + plan, err = migrations.GenerateMigrationPlan(migrationsFS, dbVersion) + if err != nil { + return err + } + fmt.Printf("Migrations to apply:\n%s\n", plan) + case migrations.Apply: + var plan migrations.MigrationPlan + plan, err = migrations.GenerateMigrationPlan(migrationsFS, dbVersion) + if err != nil { + return err + } + if err = migrations.ApplyMigrationPlan(tx, plan); err != nil { + return err + } + if err = tx.Commit(); err != nil { + return err + } + default: + return fmt.Errorf("invalid plan: %v", plan) + } + + return nil +} diff --git a/internal/app/subsystems/api/grpc/grpc_test.go b/internal/app/subsystems/api/grpc/grpc_test.go index e51da4a3..267552cc 100644 --- a/internal/app/subsystems/api/grpc/grpc_test.go +++ b/internal/app/subsystems/api/grpc/grpc_test.go @@ -45,7 +45,7 @@ func setup() (*grpcTest, error) { go subsystem.Start(errors) time.Sleep(100 * time.Millisecond) - conn, err := grpc.Dial("127.0.0.1:5555", grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.NewClient("127.0.0.1:5555", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err }