Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(db-migrations): add basic support for db migrations #340

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
89 changes: 89 additions & 0 deletions cmd/migrations/migrations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package migrations

import (
"time"

"github.com/resonatehq/resonate/cmd/util"
"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/store"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/store/postgres"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/store/sqlite"
"github.com/spf13/cobra"
)

func NewCmd() *cobra.Command {
var (
aioStore string
aioStoreSqlitePath string
aioStoreSqliteTxTimeout time.Duration
aioStorePostgresHost string
aioStorePostgresPort string
aioStorePostgresUsername string
aioStorePostgresPassword string
aioStorePostgresDatabase string
aioStorePostgresQuery map[string]string
aioStorePostgresTxTimeout time.Duration
plan bool
)

var cmd = &cobra.Command{
Use: "migrate",
Short: "Synchronizes the database state with the current set of models and migrations",
RunE: func(cmd *cobra.Command, args []string) error {
cfg := &util.AIOSubsystemConfig[util.StoreConfig]{
Subsystem: &aio.SubsystemConfig{
Size: 1,
BatchSize: 1,
Workers: 1, // Only one connection will be opened to run migrations.
},
Config: &util.StoreConfig{
Kind: util.StoreKind(aioStore),
Plan: store.Apply,
Sqlite: &sqlite.Config{
Path: aioStoreSqlitePath,
TxTimeout: aioStoreSqliteTxTimeout,
},
Postgres: &postgres.Config{
Host: aioStorePostgresHost,
Port: aioStorePostgresPort,
Username: aioStorePostgresUsername,
Password: aioStorePostgresPassword,
Database: aioStorePostgresDatabase,
Query: aioStorePostgresQuery,
TxTimeout: aioStorePostgresTxTimeout,
},
},
}

if plan {
cfg.Config.Plan = store.DryRun
}

store, err := util.NewStore(cfg)
if err != nil {
return err
}

// Runs migrations.
if err := store.Start(); err != nil {
return err
}

return store.Stop()
},
}

cmd.Flags().StringVar(&aioStore, "aio-store", "sqlite", "promise store type")
cmd.Flags().StringVar(&aioStoreSqlitePath, "aio-store-sqlite-path", "resonate.db", "sqlite database path")
cmd.Flags().DurationVar(&aioStoreSqliteTxTimeout, "aio-store-sqlite-tx-timeout", 10_000*time.Millisecond, "sqlite transaction timeout")
cmd.Flags().StringVar(&aioStorePostgresHost, "aio-store-postgres-host", "localhost", "postgres host")
cmd.Flags().StringVar(&aioStorePostgresPort, "aio-store-postgres-port", "5432", "postgres port")
cmd.Flags().StringVar(&aioStorePostgresUsername, "aio-store-postgres-username", "", "postgres username")
cmd.Flags().StringVar(&aioStorePostgresPassword, "aio-store-postgres-password", "", "postgres password")
cmd.Flags().StringVar(&aioStorePostgresDatabase, "aio-store-postgres-database", "resonate", "postgres database name")
cmd.Flags().StringToStringVar(&aioStorePostgresQuery, "aio-store-postgres-query", make(map[string]string, 0), "postgres query options")
cmd.Flags().DurationVar(&aioStorePostgresTxTimeout, "aio-store-postgres-tx-timeout", 10_000*time.Millisecond, "postgres transaction timeout")
cmd.Flags().BoolVarP(&plan, "plan", "p", true, "Dry run to check which migrations will be applied")

return cmd
}
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"

"github.com/resonatehq/resonate/cmd/dst"
"github.com/resonatehq/resonate/cmd/migrations"
"github.com/resonatehq/resonate/cmd/promises"
"github.com/resonatehq/resonate/cmd/quickstart"
"github.com/resonatehq/resonate/cmd/schedules"
Expand Down Expand Up @@ -43,6 +44,7 @@ func init() {
rootCmd.AddCommand(dst.NewCmd())
rootCmd.AddCommand(serve.ServeCmd())
rootCmd.AddCommand(quickstart.NewCmd())
rootCmd.AddCommand(migrations.NewCmd())

// Set default output
rootCmd.SetOut(os.Stdout)
Expand Down
2 changes: 2 additions & 0 deletions cmd/util/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/network"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/store"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/store/postgres"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/store/sqlite"
"github.com/resonatehq/resonate/internal/app/subsystems/api/grpc"
Expand Down Expand Up @@ -133,6 +134,7 @@ const (

type StoreConfig struct {
Kind StoreKind
Plan store.Plan
Sqlite *sqlite.Config
Postgres *postgres.Config
}
Expand Down
186 changes: 186 additions & 0 deletions internal/app/subsystems/aio/store/migrations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package store

import (
"context"
"database/sql"
"embed"
"fmt"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"time"
)

type Plan int

const (
Default Plan = iota // 0 is the default value
DryRun
Apply
)

type Migration struct {
Version int
Content []byte
}

type MigrationPlan []Migration

func (m MigrationPlan) String() string {
var sb strings.Builder
sb.WriteString("Operations to perform:\n")
sb.WriteString("Apply all migrations:")
for _, migration := range m {
sb.WriteString(fmt.Sprintf(" %d", migration.Version))

Check warning on line 36 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L31-L36

Added lines #L31 - L36 were not covered by tests
}
return sb.String()

Check warning on line 38 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L38

Added line #L38 was not covered by tests
}

func Start(currVersion int, db *sql.DB, txTimeout time.Duration, migrationsFS embed.FS, plan Plan) error {
dbVersion, err := readVersion(db)
if err != nil {
return err

Check warning on line 44 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L44

Added line #L44 was not covered by tests
}

if currVersion < dbVersion {
return fmt.Errorf("current version %d is less than database version %d please updated to latest resonate release", currVersion, dbVersion)

Check warning on line 48 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L48

Added line #L48 was not covered by tests

}
if currVersion == dbVersion {
return nil

Check warning on line 52 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L52

Added line #L52 was not covered by tests
}

// If the database version is -1, it means the migrations table does not exist.
if dbVersion == -1 { // versioning with version.go
plan = Apply
}

switch plan {
case Default:
return fmt.Errorf("database version %d does not match current version %d please run `resonate migrate --plan` to see migrations needed", dbVersion, currVersion)
case DryRun:
plan, err := generateMigrationPlan(migrationsFS, dbVersion)
if err != nil {
return err

Check warning on line 66 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L61-L66

Added lines #L61 - L66 were not covered by tests
}
fmt.Println("Migrations to apply:")
guergabo marked this conversation as resolved.
Show resolved Hide resolved
fmt.Printf("Migrations to apply: %v", plan)

Check warning on line 69 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L68-L69

Added lines #L68 - L69 were not covered by tests
case Apply:
plan, err := generateMigrationPlan(migrationsFS, dbVersion)
if err != nil {
return err

Check warning on line 73 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L73

Added line #L73 was not covered by tests
}
return applyMigrationPlan(db, plan, txTimeout)
default:
return fmt.Errorf("invalid plan: %v", plan)

Check warning on line 77 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L76-L77

Added lines #L76 - L77 were not covered by tests
}

return nil

Check warning on line 80 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L80

Added line #L80 was not covered by tests
}

// db.QueryRow does not return a specific error type when the table does not exist so we need to check the error message.
func isTableNotFoundError(err error) bool {
errStr := err.Error()
if strings.Contains(errStr, "no such table") || strings.Contains(errStr, "does not exist") {
return true
}
return false

Check warning on line 89 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L89

Added line #L89 was not covered by tests
}

// readVersion reads the current schema version from the database.
func readVersion(db *sql.DB) (int, error) {
var version int
err := db.QueryRow("SELECT id FROM migrations").Scan(&version)
if err != nil {
if isTableNotFoundError(err) {
return -1, nil
}
return 0, err

Check warning on line 100 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L100

Added line #L100 was not covered by tests
}
return version, nil

Check warning on line 102 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L102

Added line #L102 was not covered by tests
}

// filenameVersion extracts the version number from a migration filename.
func migrationVersion(filename string) (int, error) {
re := regexp.MustCompile(`\d+`)
versionStr := re.FindString(filepath.Base(filename))
if versionStr == "" {
return 0, fmt.Errorf("could not extract version from filename: %s", filename)

Check warning on line 110 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L110

Added line #L110 was not covered by tests
}
version, err := strconv.Atoi(versionStr)
if err != nil {
return 0, fmt.Errorf("could not convert version string to int: %v", err)

Check warning on line 114 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L114

Added line #L114 was not covered by tests
}
return version, nil
}

// generateMigrationPlan reads the migrations from the filesystem and returns a plan of migrations to apply.
func generateMigrationPlan(migrationsFS embed.FS, currentVersion int) (MigrationPlan, error) {
migrations := []Migration{}
entries, err := migrationsFS.ReadDir("migrations")
if err != nil {
return nil, err

Check warning on line 124 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L124

Added line #L124 was not covered by tests
}

for _, entry := range entries {
filename := entry.Name()
content, err := migrationsFS.ReadFile("migrations/" + filename)
if err != nil {
return nil, err

Check warning on line 131 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L131

Added line #L131 was not covered by tests
}
version, err := migrationVersion(filename)
if err != nil {
return nil, err

Check warning on line 135 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L135

Added line #L135 was not covered by tests
}
// Skip migrations that are at or below the current version.
if version <= currentVersion {
continue

Check warning on line 139 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L139

Added line #L139 was not covered by tests
}
migrations = append(migrations, Migration{
Version: version,
Content: content,
})
}

sort.Slice(migrations, func(i, j int) bool {
return migrations[i].Version < migrations[j].Version
})

return migrations, nil
}

// applyMigrationPlan applies the migrations to the database as a single transaction.
func applyMigrationPlan(db *sql.DB, plan MigrationPlan, txTimeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), txTimeout)
defer cancel()

tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err

Check warning on line 161 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L161

Added line #L161 was not covered by tests
}

defer func() {
if err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
err = fmt.Errorf("tx failed: %v, unable to rollback: %v", err, rbErr)

Check warning on line 167 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L166-L167

Added lines #L166 - L167 were not covered by tests
}
}

err = fmt.Errorf("tx failed, performed a rollback: %v", err)
}()

for _, m := range plan {
_, err = tx.Exec(string(m.Content))
if err != nil {
return fmt.Errorf("failed to execute migration version %d: %w", m.Version, err)

Check warning on line 177 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L177

Added line #L177 was not covered by tests
}
}

if err = tx.Commit(); err != nil {
guergabo marked this conversation as resolved.
Show resolved Hide resolved
return err

Check warning on line 182 in internal/app/subsystems/aio/store/migrations.go

View check run for this annotation

Codecov / codecov/patch

internal/app/subsystems/aio/store/migrations.go#L182

Added line #L182 was not covered by tests
}

return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Insert initial migration record
INSERT INTO migrations (id) VALUES (1) ON CONFLICT(id) DO NOTHING;
Loading