Skip to content

Commit

Permalink
feat(libs): add automatic migration dir discovery (#1673)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored Sep 4, 2024
1 parent 6a7b96a commit f024f3e
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 145 deletions.
194 changes: 49 additions & 145 deletions components/ledger/internal/storage/ledgerstore/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,52 +3,20 @@ package ledgerstore
import (
"context"
"database/sql"
_ "embed"
"embed"
"fmt"

"github.com/formancehq/stack/libs/go-libs/migrations"

"github.com/formancehq/stack/libs/go-libs/bun/bunconnect"

"github.com/formancehq/ledger/internal/storage/sqlutils"
"github.com/formancehq/stack/libs/go-libs/migrations"
"github.com/pkg/errors"
"github.com/uptrace/bun"
)

//go:embed migrations/0-init-schema.sql
var initSchema string

//go:embed migrations/1-fix-trigger.sql
var fixTrigger string

//go:embed migrations/2-fix-volumes-aggregation.sql
var fixVolumesAggregation string

// notes(gfyrag): This fix a bug where post_commit_effective_volumes are not properly initialized
// when inserting a transaction before any other in the timeline
//
//go:embed migrations/3-fix-trigger-inserting-backdated-transactions.sql
var fixTriggerBackdatedTransaction string

//go:embed migrations/4-add-account-first-usage-column.sql
var addAccountFirstUsage string

//go:embed migrations/5-add-idempotency-key-index.sql
var addIndexOnIdempotencyKey string

//go:embed migrations/6-add-reference-index.sql
var addIndexOnReference string

//go:embed migrations/7-add-ik-unique-index.sql
var addIKUniqueIndex string

//go:embed migrations/8-ik-ledger-unique-index.sql
var updateIKUniqueIndex string

//go:embed migrations/9-fix-incorrect-volumes-aggregation.sql
var fixIncorrectVolumes string

//go:embed migrations/10-fillfactor-on-moves.sql
var fillFactorOnMoves string
//go:embed migrations
var migrationsDir embed.FS

type Bucket struct {
name string
Expand Down Expand Up @@ -110,127 +78,63 @@ func (b *Bucket) IsInitialized(ctx context.Context) (bool, error) {
}

func registerMigrations(migrator *migrations.Migrator, name string) {
migrator.RegisterMigrations(
migrations.Migration{
Name: "Init schema",
UpWithContext: func(ctx context.Context, tx bun.Tx) error {
ret, err := migrations.CollectMigrationFiles(migrationsDir)
if err != nil {
panic(err)
}
initSchema := ret[0]

needV1Upgrade := false
row := tx.QueryRowContext(ctx, `select exists (
// notes(gfyrag): override default schema initialization to handle ledger v1 upgrades
ret[0] = migrations.Migration{
Name: "Init schema",
UpWithContext: func(ctx context.Context, tx bun.Tx) error {

needV1Upgrade := false
row := tx.QueryRowContext(ctx, `select exists (
select from pg_tables
where schemaname = ? and tablename = 'log'
)`, name)
if row.Err() != nil {
return row.Err()
if row.Err() != nil {
return row.Err()
}
var ret string
if err := row.Scan(&ret); err != nil {
panic(err)
}
needV1Upgrade = ret != "false"

oldSchemaRenamed := fmt.Sprintf(name + oldSchemaRenameSuffix)
if needV1Upgrade {
_, err := tx.ExecContext(ctx, fmt.Sprintf(`alter schema "%s" rename to "%s"`, name, oldSchemaRenamed))
if err != nil {
return errors.Wrap(err, "renaming old schema")
}
var ret string
if err := row.Scan(&ret); err != nil {
panic(err)
_, err = tx.ExecContext(ctx, fmt.Sprintf(`create schema if not exists "%s"`, name))
if err != nil {
return errors.Wrap(err, "creating new schema")
}
needV1Upgrade = ret != "false"
}

oldSchemaRenamed := fmt.Sprintf(name + oldSchemaRenameSuffix)
if needV1Upgrade {
_, err := tx.ExecContext(ctx, fmt.Sprintf(`alter schema "%s" rename to "%s"`, name, oldSchemaRenamed))
if err != nil {
return errors.Wrap(err, "renaming old schema")
}
_, err = tx.ExecContext(ctx, fmt.Sprintf(`create schema if not exists "%s"`, name))
if err != nil {
return errors.Wrap(err, "creating new schema")
}
}
if err := initSchema.UpWithContext(ctx, tx); err != nil {
return errors.Wrap(err, "initializing new schema")
}

_, err := tx.ExecContext(ctx, initSchema)
if err != nil {
return errors.Wrap(err, "initializing new schema")
if needV1Upgrade {
if err := migrateLogs(ctx, oldSchemaRenamed, name, tx); err != nil {
return errors.Wrap(err, "migrating logs")
}

if needV1Upgrade {
if err := migrateLogs(ctx, oldSchemaRenamed, name, tx); err != nil {
return errors.Wrap(err, "migrating logs")
}

_, err = tx.ExecContext(ctx, fmt.Sprintf(`create table goose_db_version as table "%s".goose_db_version with no data`, oldSchemaRenamed))
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, fmt.Sprintf(`create table goose_db_version as table "%s".goose_db_version with no data`, oldSchemaRenamed))
if err != nil {
return err
}
}

return nil
},
},
migrations.Migration{
Name: "Fix trigger",
UpWithContext: func(ctx context.Context, tx bun.Tx) error {
_, err := tx.ExecContext(ctx, fixTrigger)
return err
},
},
migrations.Migration{
Name: "Fix volumes aggregation",
UpWithContext: func(ctx context.Context, tx bun.Tx) error {
_, err := tx.ExecContext(ctx, fixVolumesAggregation)
return err
},
},
migrations.Migration{
Name: "Fix trigger backdated transaction",
UpWithContext: func(ctx context.Context, tx bun.Tx) error {
_, err := tx.ExecContext(ctx, fixTriggerBackdatedTransaction)
return err
},
},
migrations.Migration{
Name: "Add `first_usage` column on accounts",
UpWithContext: func(ctx context.Context, tx bun.Tx) error {
_, err := tx.ExecContext(ctx, addAccountFirstUsage)
return err
},
return nil
},
migrations.Migration{
Name: "Add index on `idempotency_key`",
UpWithContext: func(ctx context.Context, tx bun.Tx) error {
_, err := tx.ExecContext(ctx, addIndexOnIdempotencyKey)
return err
},
},
migrations.Migration{
Name: "Add index on `reference`",
UpWithContext: func(ctx context.Context, tx bun.Tx) error {
_, err := tx.ExecContext(ctx, addIndexOnReference)
return err
},
},
migrations.Migration{
Name: "Add unique index on IK",
UpWithContext: func(ctx context.Context, tx bun.Tx) error {
_, err := tx.ExecContext(ctx, addIKUniqueIndex)
return err
},
},
migrations.Migration{
Name: "Update unique index on IK",
UpWithContext: func(ctx context.Context, tx bun.Tx) error {
_, err := tx.ExecContext(ctx, updateIKUniqueIndex)
return err
},
},
migrations.Migration{
Name: "Fix incorrect volumes",
UpWithContext: func(ctx context.Context, tx bun.Tx) error {
_, err := tx.ExecContext(ctx, fixIncorrectVolumes)
return err
},
},
migrations.Migration{
Name: "Add fill factor on moves",
UpWithContext: func(ctx context.Context, tx bun.Tx) error {
_, err := tx.ExecContext(ctx, fillFactorOnMoves)
return err
},
},
)
}

migrator.RegisterMigrations(ret...)
}

func ConnectToBucket(ctx context.Context, connectionOptions bunconnect.ConnectionOptions, name string, hooks ...bun.QueryHook) (*Bucket, error) {
Expand Down
35 changes: 35 additions & 0 deletions libs/go-libs/migrations/collect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package migrations

import (
"context"
"embed"
"path/filepath"

"github.com/pkg/errors"
"github.com/uptrace/bun"
)

func CollectMigrationFiles(fs embed.FS) ([]Migration, error) {
entries, err := fs.ReadDir("migrations")
if err != nil {
return nil, errors.Wrap(err, "collecting migration files")
}

ret := make([]Migration, len(entries))
for i, entry := range entries {
fileContent, err := fs.ReadFile(filepath.Join("migrations", entry.Name()))
if err != nil {
return nil, errors.Wrapf(err, "reading migration file %s", entry.Name())
}

ret[i] = Migration{
Name: entry.Name(),
UpWithContext: func(ctx context.Context, tx bun.Tx) error {
_, err := tx.ExecContext(ctx, string(fileContent))
return err
},
}
}

return ret, nil
}

0 comments on commit f024f3e

Please sign in to comment.