From f024f3e4a5bbbb72988730fda2b6c3279d2e09cc Mon Sep 17 00:00:00 2001 From: Ragot Geoffrey Date: Wed, 4 Sep 2024 16:24:15 +0200 Subject: [PATCH] feat(libs): add automatic migration dir discovery (#1673) --- .../internal/storage/ledgerstore/bucket.go | 194 +++++------------- libs/go-libs/migrations/collect.go | 35 ++++ 2 files changed, 84 insertions(+), 145 deletions(-) create mode 100644 libs/go-libs/migrations/collect.go diff --git a/components/ledger/internal/storage/ledgerstore/bucket.go b/components/ledger/internal/storage/ledgerstore/bucket.go index 0873e54112..51a702e15f 100644 --- a/components/ledger/internal/storage/ledgerstore/bucket.go +++ b/components/ledger/internal/storage/ledgerstore/bucket.go @@ -3,52 +3,20 @@ package ledgerstore import ( "context" "database/sql" - _ "embed" + "embed" "fmt" + "github.com/formancehq/stack/libs/go-libs/migrations" + "github.com/formancehq/stack/libs/go-libs/bun/bunconnect" "github.com/formancehq/ledger/internal/storage/sqlutils" - "github.com/formancehq/stack/libs/go-libs/migrations" "github.com/pkg/errors" "github.com/uptrace/bun" ) -//go:embed migrations/0-init-schema.sql -var initSchema string - -//go:embed migrations/1-fix-trigger.sql -var fixTrigger string - -//go:embed migrations/2-fix-volumes-aggregation.sql -var fixVolumesAggregation string - -// notes(gfyrag): This fix a bug where post_commit_effective_volumes are not properly initialized -// when inserting a transaction before any other in the timeline -// -//go:embed migrations/3-fix-trigger-inserting-backdated-transactions.sql -var fixTriggerBackdatedTransaction string - -//go:embed migrations/4-add-account-first-usage-column.sql -var addAccountFirstUsage string - -//go:embed migrations/5-add-idempotency-key-index.sql -var addIndexOnIdempotencyKey string - -//go:embed migrations/6-add-reference-index.sql -var addIndexOnReference string - -//go:embed migrations/7-add-ik-unique-index.sql -var addIKUniqueIndex string - -//go:embed migrations/8-ik-ledger-unique-index.sql -var updateIKUniqueIndex string - -//go:embed migrations/9-fix-incorrect-volumes-aggregation.sql -var fixIncorrectVolumes string - -//go:embed migrations/10-fillfactor-on-moves.sql -var fillFactorOnMoves string +//go:embed migrations +var migrationsDir embed.FS type Bucket struct { name string @@ -110,127 +78,63 @@ func (b *Bucket) IsInitialized(ctx context.Context) (bool, error) { } func registerMigrations(migrator *migrations.Migrator, name string) { - migrator.RegisterMigrations( - migrations.Migration{ - Name: "Init schema", - UpWithContext: func(ctx context.Context, tx bun.Tx) error { + ret, err := migrations.CollectMigrationFiles(migrationsDir) + if err != nil { + panic(err) + } + initSchema := ret[0] - needV1Upgrade := false - row := tx.QueryRowContext(ctx, `select exists ( + // notes(gfyrag): override default schema initialization to handle ledger v1 upgrades + ret[0] = migrations.Migration{ + Name: "Init schema", + UpWithContext: func(ctx context.Context, tx bun.Tx) error { + + needV1Upgrade := false + row := tx.QueryRowContext(ctx, `select exists ( select from pg_tables where schemaname = ? and tablename = 'log' )`, name) - if row.Err() != nil { - return row.Err() + if row.Err() != nil { + return row.Err() + } + var ret string + if err := row.Scan(&ret); err != nil { + panic(err) + } + needV1Upgrade = ret != "false" + + oldSchemaRenamed := fmt.Sprintf(name + oldSchemaRenameSuffix) + if needV1Upgrade { + _, err := tx.ExecContext(ctx, fmt.Sprintf(`alter schema "%s" rename to "%s"`, name, oldSchemaRenamed)) + if err != nil { + return errors.Wrap(err, "renaming old schema") } - var ret string - if err := row.Scan(&ret); err != nil { - panic(err) + _, err = tx.ExecContext(ctx, fmt.Sprintf(`create schema if not exists "%s"`, name)) + if err != nil { + return errors.Wrap(err, "creating new schema") } - needV1Upgrade = ret != "false" + } - oldSchemaRenamed := fmt.Sprintf(name + oldSchemaRenameSuffix) - if needV1Upgrade { - _, err := tx.ExecContext(ctx, fmt.Sprintf(`alter schema "%s" rename to "%s"`, name, oldSchemaRenamed)) - if err != nil { - return errors.Wrap(err, "renaming old schema") - } - _, err = tx.ExecContext(ctx, fmt.Sprintf(`create schema if not exists "%s"`, name)) - if err != nil { - return errors.Wrap(err, "creating new schema") - } - } + if err := initSchema.UpWithContext(ctx, tx); err != nil { + return errors.Wrap(err, "initializing new schema") + } - _, err := tx.ExecContext(ctx, initSchema) - if err != nil { - return errors.Wrap(err, "initializing new schema") + if needV1Upgrade { + if err := migrateLogs(ctx, oldSchemaRenamed, name, tx); err != nil { + return errors.Wrap(err, "migrating logs") } - if needV1Upgrade { - if err := migrateLogs(ctx, oldSchemaRenamed, name, tx); err != nil { - return errors.Wrap(err, "migrating logs") - } - - _, err = tx.ExecContext(ctx, fmt.Sprintf(`create table goose_db_version as table "%s".goose_db_version with no data`, oldSchemaRenamed)) - if err != nil { - return err - } + _, err = tx.ExecContext(ctx, fmt.Sprintf(`create table goose_db_version as table "%s".goose_db_version with no data`, oldSchemaRenamed)) + if err != nil { + return err } + } - return nil - }, - }, - migrations.Migration{ - Name: "Fix trigger", - UpWithContext: func(ctx context.Context, tx bun.Tx) error { - _, err := tx.ExecContext(ctx, fixTrigger) - return err - }, - }, - migrations.Migration{ - Name: "Fix volumes aggregation", - UpWithContext: func(ctx context.Context, tx bun.Tx) error { - _, err := tx.ExecContext(ctx, fixVolumesAggregation) - return err - }, - }, - migrations.Migration{ - Name: "Fix trigger backdated transaction", - UpWithContext: func(ctx context.Context, tx bun.Tx) error { - _, err := tx.ExecContext(ctx, fixTriggerBackdatedTransaction) - return err - }, - }, - migrations.Migration{ - Name: "Add `first_usage` column on accounts", - UpWithContext: func(ctx context.Context, tx bun.Tx) error { - _, err := tx.ExecContext(ctx, addAccountFirstUsage) - return err - }, + return nil }, - migrations.Migration{ - Name: "Add index on `idempotency_key`", - UpWithContext: func(ctx context.Context, tx bun.Tx) error { - _, err := tx.ExecContext(ctx, addIndexOnIdempotencyKey) - return err - }, - }, - migrations.Migration{ - Name: "Add index on `reference`", - UpWithContext: func(ctx context.Context, tx bun.Tx) error { - _, err := tx.ExecContext(ctx, addIndexOnReference) - return err - }, - }, - migrations.Migration{ - Name: "Add unique index on IK", - UpWithContext: func(ctx context.Context, tx bun.Tx) error { - _, err := tx.ExecContext(ctx, addIKUniqueIndex) - return err - }, - }, - migrations.Migration{ - Name: "Update unique index on IK", - UpWithContext: func(ctx context.Context, tx bun.Tx) error { - _, err := tx.ExecContext(ctx, updateIKUniqueIndex) - return err - }, - }, - migrations.Migration{ - Name: "Fix incorrect volumes", - UpWithContext: func(ctx context.Context, tx bun.Tx) error { - _, err := tx.ExecContext(ctx, fixIncorrectVolumes) - return err - }, - }, - migrations.Migration{ - Name: "Add fill factor on moves", - UpWithContext: func(ctx context.Context, tx bun.Tx) error { - _, err := tx.ExecContext(ctx, fillFactorOnMoves) - return err - }, - }, - ) + } + + migrator.RegisterMigrations(ret...) } func ConnectToBucket(ctx context.Context, connectionOptions bunconnect.ConnectionOptions, name string, hooks ...bun.QueryHook) (*Bucket, error) { diff --git a/libs/go-libs/migrations/collect.go b/libs/go-libs/migrations/collect.go new file mode 100644 index 0000000000..f04de51fb6 --- /dev/null +++ b/libs/go-libs/migrations/collect.go @@ -0,0 +1,35 @@ +package migrations + +import ( + "context" + "embed" + "path/filepath" + + "github.com/pkg/errors" + "github.com/uptrace/bun" +) + +func CollectMigrationFiles(fs embed.FS) ([]Migration, error) { + entries, err := fs.ReadDir("migrations") + if err != nil { + return nil, errors.Wrap(err, "collecting migration files") + } + + ret := make([]Migration, len(entries)) + for i, entry := range entries { + fileContent, err := fs.ReadFile(filepath.Join("migrations", entry.Name())) + if err != nil { + return nil, errors.Wrapf(err, "reading migration file %s", entry.Name()) + } + + ret[i] = Migration{ + Name: entry.Name(), + UpWithContext: func(ctx context.Context, tx bun.Tx) error { + _, err := tx.ExecContext(ctx, string(fileContent)) + return err + }, + } + } + + return ret, nil +}