diff --git a/vms/evm/database/blockdb/database.go b/vms/evm/database/blockdb/database.go index 6c3b37b01cc8..43f410803f73 100644 --- a/vms/evm/database/blockdb/database.go +++ b/vms/evm/database/blockdb/database.go @@ -24,6 +24,8 @@ import ( ) var ( + errUnexpectedKey = errors.New("unexpected database key") + errNotInitialized = errors.New("database not initialized") errAlreadyInitialized = errors.New("database already initialized") errInvalidEncodedLength = errors.New("invalid encoded length") ) @@ -47,6 +49,7 @@ type Database struct { dbPath string minHeight uint64 + migrator *migrator heightDBsReady bool reg prometheus.Registerer @@ -157,6 +160,10 @@ func New( // Load existing database min height. return databaseMinHeight(db.metaDB) }, + func() (uint64, bool, error) { + // Use the minimum block height of existing blocks to migrate. + return minBlockHeightToMigrate(evmDB) + }, func() (uint64, bool, error) { // Use min height 1 unless deferring initialization. return 1, !allowDeferredInit, nil @@ -212,6 +219,15 @@ func (db *Database) InitBlockDBs(minHeight uint64) error { db.bodyDB = bodyDB db.receiptsDB = receiptsDB + if err := db.initMigrator(); err != nil { + return errors.Join( + fmt.Errorf("failed to initialize migrator: %w", err), + headerDB.Close(), + bodyDB.Close(), + receiptsDB.Close(), + ) + } + db.heightDBsReady = true db.minHeight = minHeight @@ -250,6 +266,7 @@ func parseBlockKey(key []byte) (num uint64, hash common.Hash, ok bool) { } type parsedBlockKey struct { + key []byte db database.HeightIndex num uint64 hash common.Hash @@ -301,15 +318,19 @@ func (db *Database) parseKey(key []byte) (*parsedBlockKey, bool) { } return &parsedBlockKey{ + key: key, db: hdb, num: num, hash: hash, }, true } -func (*Database) readBlock(p *parsedBlockKey) ([]byte, error) { +func (db *Database) readBlock(p *parsedBlockKey) ([]byte, error) { data, err := p.db.Get(p.num) if err != nil { + if errors.Is(err, database.ErrNotFound) && !db.migrator.isCompleted() { + return db.Database.Get(p.key) + } return nil, err } @@ -376,6 +397,7 @@ func (db *Database) Delete(key []byte) error { } func (db *Database) Close() error { + db.migrator.stop() if !db.heightDBsReady { return db.Database.Close() } diff --git a/vms/evm/database/blockdb/database_test.go b/vms/evm/database/blockdb/database_test.go index 9c6679afb7a6..6f5759157e93 100644 --- a/vms/evm/database/blockdb/database_test.go +++ b/vms/evm/database/blockdb/database_test.go @@ -4,6 +4,7 @@ package blockdb import ( + "slices" "testing" "github.com/ava-labs/libevm/core/rawdb" @@ -275,6 +276,25 @@ func TestDatabaseInitialization(t *testing.T) { wantDBReady: true, wantMinHeight: 2, }, + { + name: "non genesis blocks to migrate", + evmDBBlocks: blocks[5:10], + wantDBReady: true, + wantMinHeight: 5, + }, + { + name: "blocks to migrate - including genesis", + evmDBBlocks: slices.Concat([]*types.Block{blocks[0]}, blocks[5:10]), + wantDBReady: true, + wantMinHeight: 5, + }, + { + name: "blocks to migrate and deferred init", + deferInit: true, + evmDBBlocks: blocks[5:10], + wantDBReady: true, + wantMinHeight: 5, + }, } for _, tc := range tests { diff --git a/vms/evm/database/blockdb/helpers_test.go b/vms/evm/database/blockdb/helpers_test.go index 39ff8ccb25df..1d3fe503eb62 100644 --- a/vms/evm/database/blockdb/helpers_test.go +++ b/vms/evm/database/blockdb/helpers_test.go @@ -4,8 +4,10 @@ package blockdb import ( + "fmt" "math/big" "testing" + "time" "github.com/ava-labs/libevm/common" "github.com/ava-labs/libevm/core/rawdb" @@ -17,6 +19,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/leveldb" "github.com/ava-labs/avalanchego/utils/logging" @@ -24,6 +27,20 @@ import ( heightindexdb "github.com/ava-labs/avalanchego/x/blockdb" ) +// blockingDatabase wraps a HeightIndex and blocks indefinitely on Put +// if shouldBlock returns true. +type blockingDatabase struct { + database.HeightIndex + shouldBlock func() bool +} + +func (b *blockingDatabase) Put(num uint64, encodedBlock []byte) error { + if b.shouldBlock == nil || b.shouldBlock() { + <-make(chan struct{}) + } + return b.HeightIndex.Put(num, encodedBlock) +} + func newDatabasesFromDir(t *testing.T, dataDir string) (*Database, ethdb.Database) { t.Helper() @@ -109,3 +126,34 @@ func logsFromReceipts(receipts types.Receipts) [][]*types.Log { } return logs } + +func startPartialMigration(t *testing.T, db *Database, blocksToMigrate uint64) { + t.Helper() + + n := uint64(0) + db.migrator.headerDB = &blockingDatabase{ + HeightIndex: db.headerDB, + shouldBlock: func() bool { + n++ + return n > blocksToMigrate + }, + } + startMigration(t, db, false) + require.Eventually(t, func() bool { + return db.migrator.processed.Load() >= blocksToMigrate + }, 5*time.Second, 100*time.Millisecond) +} + +func startMigration(t *testing.T, db *Database, waitForCompletion bool) { + t.Helper() + + db.migrator.completed.Store(false) + require.NoError(t, db.StartMigration(t.Context())) + + if waitForCompletion { + timeout := 5 * time.Second + msg := fmt.Sprintf("Migration did not complete within timeout: %v", timeout) + require.True(t, db.migrator.waitMigratorDone(timeout), msg) + require.True(t, db.migrator.isCompleted()) + } +} diff --git a/vms/evm/database/blockdb/migrator.go b/vms/evm/database/blockdb/migrator.go new file mode 100644 index 000000000000..bd4fa2636275 --- /dev/null +++ b/vms/evm/database/blockdb/migrator.go @@ -0,0 +1,532 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/ava-labs/libevm/common" + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/ethdb" + "github.com/ava-labs/libevm/rlp" + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/database/prefixdb" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/timer" +) + +type migrator struct { + // Databases + evmDB ethdb.Database + headerDB database.HeightIndex + bodyDB database.HeightIndex + receiptsDB database.HeightIndex + + // Concurrency control + mu sync.Mutex // protects cancel and done + cancel context.CancelFunc + done chan struct{} + + // Migration state + completed atomic.Bool + processed atomic.Uint64 + endHeight uint64 + + logger logging.Logger +} + +var migratorDBPrefix = []byte("migrator") + +func (db *Database) initMigrator() error { + mdb := prefixdb.New(migratorDBPrefix, db.metaDB) + migrator, err := newMigrator( + mdb, + db.headerDB, + db.bodyDB, + db.receiptsDB, + db.Database, + db.logger, + ) + if err != nil { + return err + } + db.migrator = migrator + return nil +} + +// StartMigration begins the background migration of block data from the +// [ethdb.Database] to the height-indexed block databases. +// +// Returns an error if the databases are not initialized. +// No error if already running. +func (db *Database) StartMigration(ctx context.Context) error { + if !db.heightDBsReady { + return errNotInitialized + } + db.migrator.start(ctx) + return nil +} + +// targetBlockHeightKey stores the head height captured at first run for ETA. +var targetBlockHeightKey = []byte("migration_target_block_height") + +func targetBlockHeight(db database.KeyValueReader) (uint64, bool, error) { + has, err := db.Has(targetBlockHeightKey) + if err != nil { + return 0, false, err + } + if !has { + return 0, false, nil + } + numBytes, err := db.Get(targetBlockHeightKey) + if err != nil { + return 0, false, err + } + if len(numBytes) != blockNumberSize { + return 0, false, fmt.Errorf("invalid block number encoding length: %d", len(numBytes)) + } + height := binary.BigEndian.Uint64(numBytes) + return height, true, nil +} + +func writeTargetBlockHeight(db database.KeyValueWriter, endHeight uint64) error { + return db.Put(targetBlockHeightKey, encodeBlockNumber(endHeight)) +} + +func headBlockNumber(db ethdb.KeyValueReader) (uint64, bool) { + hash := rawdb.ReadHeadHeaderHash(db) + num := rawdb.ReadHeaderNumber(db, hash) + if num == nil || *num == 0 { + return 0, false + } + return *num, true +} + +func isMigratableKey(db ethdb.Reader, key []byte) bool { + if key[0] != evmBlockBodyPrefix { + return false + } + num, hash, ok := parseBlockKey(key) + if !ok { + return false + } + + // Skip genesis since all vms have it and to benefit from being able to have a + // minimum height greater than 0 when state sync is enabled. + if num == 0 { + return false + } + + canonHash := rawdb.ReadCanonicalHash(db, num) + return canonHash == hash +} + +func minBlockHeightToMigrate(db ethdb.Database) (uint64, bool, error) { + iter := db.NewIterator([]byte{evmBlockBodyPrefix}, nil) + defer iter.Release() + + for iter.Next() { + key := iter.Key() + if !isMigratableKey(db, key) { + continue + } + num, _, ok := parseBlockKey(key) + if !ok { + return 0, false, errUnexpectedKey + } + return num, true, nil + } + return 0, false, iter.Error() +} + +func newMigrator( + db database.Database, + headerDB database.HeightIndex, + bodyDB database.HeightIndex, + receiptsDB database.HeightIndex, + evmDB ethdb.Database, + logger logging.Logger, +) (*migrator, error) { + m := &migrator{ + headerDB: headerDB, + bodyDB: bodyDB, + receiptsDB: receiptsDB, + evmDB: evmDB, + logger: logger, + } + + _, ok, err := minBlockHeightToMigrate(evmDB) + if err != nil { + return nil, err + } + if !ok { + m.completed.Store(true) + m.logger.Info("No block data to migrate; migration already complete") + return m, nil + } + + // load saved end block height + endHeight, ok, err := targetBlockHeight(db) + if err != nil { + return nil, err + } + if !ok { + // load and save head block number as end block height + if num, ok := headBlockNumber(evmDB); ok { + endHeight = num + if err := writeTargetBlockHeight(db, endHeight); err != nil { + return nil, err + } + m.logger.Info( + "Migration target height set", + zap.Uint64("targetHeight", endHeight), + ) + } + } + m.endHeight = endHeight + + return m, nil +} + +func (m *migrator) isCompleted() bool { + return m.completed.Load() +} + +// stopTimeout is the maximum time to wait for migration to stop gracefully. +// 5 seconds allows cleanup operations to complete without blocking shutdown indefinitely. +const stopTimeout = 5 * time.Second + +func (m *migrator) stop() { + // Snapshot cancel/done under lock to avoid data race with endRun. + // We must release the lock before waiting on done to prevent deadlock. + m.mu.Lock() + cancel := m.cancel + done := m.done + m.mu.Unlock() + + if cancel == nil { + return // no active migration + } + + cancel() + select { + case <-done: + // worker finished cleanup + case <-time.After(stopTimeout): + m.logger.Warn("Migration shutdown timeout exceeded") + } +} + +func (m *migrator) beginRun(ctx context.Context) (context.Context, bool) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.cancel != nil { + return nil, false // migration already running + } + ctx, cancel := context.WithCancel(ctx) + m.cancel = cancel + m.done = make(chan struct{}) + m.processed.Store(0) + return ctx, true +} + +func (m *migrator) endRun() { + m.mu.Lock() + defer m.mu.Unlock() + + m.cancel = nil + m.done = nil +} + +// start begins the migration process in a background goroutine. +// Returns immediately if migration is already completed or running. +func (m *migrator) start(ctx context.Context) { + if m.isCompleted() { + return + } + ctx, ok := m.beginRun(ctx) + if !ok { + m.logger.Warn("Migration already running") + return + } + + go func() { + defer func() { + close(m.done) + m.endRun() + }() + if err := m.run(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + m.logger.Error("Migration failed", zap.Error(err)) + } + } + }() +} + +// waitMigratorDone waits until the current migration run completes. +// If timeout <= 0, it waits indefinitely. +// Returns true if completed, false on timeout. +func (m *migrator) waitMigratorDone(timeout time.Duration) bool { + // Snapshot done to avoid race with goroutine cleanup + m.mu.Lock() + done := m.done + m.mu.Unlock() + + if done == nil { + return true + } + if timeout <= 0 { + <-done + return true + } + t := time.NewTimer(timeout) + defer t.Stop() + select { + case <-done: + return true + case <-t.C: + return false + } +} + +func (m *migrator) migrateHeader(num uint64, hash common.Hash) error { + header := rawdb.ReadHeader(m.evmDB, hash, num) + if header == nil { + return fmt.Errorf("header not found for block %d hash %s", num, hash) + } + hBytes, err := rlp.EncodeToBytes(header) + if err != nil { + return fmt.Errorf("failed to encode block header: %w", err) + } + if err := writeHashAndData(m.headerDB, num, hash, hBytes); err != nil { + return fmt.Errorf("failed to write header to headerDB: %w", err) + } + return nil +} + +func (m *migrator) migrateBody(num uint64, hash common.Hash, body []byte) error { + if err := writeHashAndData(m.bodyDB, num, hash, body); err != nil { + return fmt.Errorf("failed to write body to bodyDB: %w", err) + } + return nil +} + +func (m *migrator) migrateReceipts(num uint64, hash common.Hash) error { + receipts := rawdb.ReadReceiptsRLP(m.evmDB, hash, num) + if receipts == nil { + return nil + } + + if err := writeHashAndData(m.receiptsDB, num, hash, receipts); err != nil { + return fmt.Errorf("failed to write receipts to receiptsDB: %w", err) + } + return nil +} + +func deleteBlock(db ethdb.KeyValueWriter, num uint64, hash common.Hash) error { + // rawdb.DeleteHeader is not used to avoid deleting number/hash mappings. + headerKey := blockHeaderKey(num, hash) + if err := db.Delete(headerKey); err != nil { + return fmt.Errorf("failed to delete header from evmDB: %w", err) + } + rawdb.DeleteBody(db, hash, num) + rawdb.DeleteReceipts(db, hash, num) + return nil +} + +func compactRange( + db ethdb.Compacter, + keyFunc func(uint64, common.Hash) []byte, + startNum, endNum uint64, + logger logging.Logger, +) { + startKey := keyFunc(startNum, common.Hash{}) + endKey := keyFunc(endNum+1, common.Hash{}) + if err := db.Compact(startKey, endKey); err != nil { + logger.Error("Failed to compact data in range", + zap.Uint64("startHeight", startNum), + zap.Uint64("endHeight", endNum), + zap.Error(err)) + } +} + +func (m *migrator) compactBlockRange(startNum, endNum uint64) { + start := time.Now() + + compactRange(m.evmDB, blockHeaderKey, startNum, endNum, m.logger) + compactRange(m.evmDB, blockBodyKey, startNum, endNum, m.logger) + compactRange(m.evmDB, receiptsKey, startNum, endNum, m.logger) + + m.logger.Info("Compaction of block range completed", + zap.Uint64("startHeight", startNum), + zap.Uint64("endHeight", endNum), + zap.Duration("duration", time.Since(start))) +} + +const ( + // logProgressInterval controls how often migration progress is logged. + logProgressInterval = 30 * time.Second + // compactionInterval is the number of blocks to process before compacting the database. + compactionInterval = 250_000 +) + +func (m *migrator) run(ctx context.Context) error { + m.logger.Info( + "Block data migration started", + zap.Uint64("targetHeight", m.endHeight), + ) + + var ( + // Progress tracking + etaTarget uint64 // target # of blocks to process + etaTracker = timer.NewEtaTracker(10, 1) + start = time.Now() + nextLog = start.Add(logProgressInterval) + + // Batch to accumulate delete operations before writing + batch = m.evmDB.NewBatch() + lastCompact uint64 // blocks processed at last compaction + + // Compaction tracking + canCompact bool + startBlockNum uint64 + endBlockNum uint64 + + // Iterate over block bodies instead of headers since there are keys + // under the header prefix that we are not migrating (e.g., hash mappings). + iter = m.evmDB.NewIterator([]byte{evmBlockBodyPrefix}, nil) + ) + + defer func() { + iter.Release() + + if batch.ValueSize() > 0 { + if err := batch.Write(); err != nil { + m.logger.Error("Failed to write final delete batch", zap.Error(err)) + } + } + + // Compact final range if we processed any blocks after last interval compaction. + if canCompact { + m.compactBlockRange(startBlockNum, endBlockNum) + } + + duration := time.Since(start) + m.logger.Info( + "Block data migration ended", + zap.Uint64("targetHeight", m.endHeight), + zap.Uint64("blocksProcessed", m.processed.Load()), + zap.Uint64("lastProcessedHeight", endBlockNum), + zap.Duration("duration", duration), + zap.Bool("completed", m.isCompleted()), + ) + }() + + // Iterate over all block bodies in ascending order by block number. + for iter.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + key := iter.Key() + if !isMigratableKey(m.evmDB, key) { + continue + } + num, hash, ok := parseBlockKey(key) + if !ok { + return errUnexpectedKey + } + + if etaTarget == 0 && m.endHeight > 0 && num < m.endHeight { + etaTarget = m.endHeight - num + etaTracker.AddSample(0, etaTarget, start) + } + + // track the range of blocks for compaction + if !canCompact { + startBlockNum = num + canCompact = true + } + endBlockNum = num + + if err := m.migrateHeader(num, hash); err != nil { + return fmt.Errorf("failed to migrate header data: %w", err) + } + if err := m.migrateBody(num, hash, iter.Value()); err != nil { + return fmt.Errorf("failed to migrate body data: %w", err) + } + if err := m.migrateReceipts(num, hash); err != nil { + return fmt.Errorf("failed to migrate receipts data: %w", err) + } + if err := deleteBlock(batch, num, hash); err != nil { + return fmt.Errorf("failed to add block deletes to batch: %w", err) + } + processed := m.processed.Add(1) + + if batch.ValueSize() > ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + return fmt.Errorf("failed to write delete batch: %w", err) + } + batch.Reset() + } + + // compact every compactionInterval blocks + if canCompact && processed-lastCompact >= compactionInterval { + // write any remaining deletes in batch before compaction + if batch.ValueSize() > 0 { + if err := batch.Write(); err != nil { + return fmt.Errorf("failed to write delete batch before compaction: %w", err) + } + batch.Reset() + } + + iter.Release() + m.compactBlockRange(startBlockNum, endBlockNum) + startKey := encodeBlockNumber(num + 1) + newIter := m.evmDB.NewIterator([]byte{evmBlockBodyPrefix}, startKey) + iter = newIter + lastCompact = processed + canCompact = false + } + + // log progress every logProgressInterval + if now := time.Now(); now.After(nextLog) { + fields := []zap.Field{ + zap.Uint64("blocksProcessed", processed), + zap.Uint64("lastProcessedHeight", num), + zap.Duration("timeElapsed", time.Since(start)), + } + if etaTarget > 0 { + eta, pct := etaTracker.AddSample(processed, etaTarget, now) + if eta != nil { + fields = append(fields, + zap.Duration("eta", *eta), + zap.String("progress", fmt.Sprintf("%.2f%%", pct)), + ) + } + } + + m.logger.Info("Block data migration progress", fields...) + nextLog = now.Add(logProgressInterval) + } + } + + if iter.Error() != nil { + return fmt.Errorf("failed to iterate over evmDB: %w", iter.Error()) + } + + m.completed.Store(true) + return nil +} diff --git a/vms/evm/database/blockdb/migrator_test.go b/vms/evm/database/blockdb/migrator_test.go new file mode 100644 index 000000000000..810a3bf9eb5d --- /dev/null +++ b/vms/evm/database/blockdb/migrator_test.go @@ -0,0 +1,298 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package blockdb + +import ( + "slices" + "testing" + + "github.com/ava-labs/libevm/core/rawdb" + "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/params" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/database/leveldb" + "github.com/ava-labs/avalanchego/utils/logging" + + evmdb "github.com/ava-labs/avalanchego/vms/evm/database" + heightindexdb "github.com/ava-labs/avalanchego/x/blockdb" +) + +func TestMigrationCompletion(t *testing.T) { + tests := []struct { + name string + want bool + dataToMigrate bool + migrate bool + }{ + { + name: "completed when no data to migrate", + want: true, + }, + { + name: "not completed if data to migrate", + dataToMigrate: true, + }, + { + name: "completed after migration", + dataToMigrate: true, + migrate: true, + want: true, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + + if tc.dataToMigrate { + blocks, receipts := createBlocks(t, 5) + writeBlocks(evmDB, blocks, receipts) + } + + db, _, err := New( + base, + evmDB, + dataDir, + false, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + if tc.migrate { + startMigration(t, db, true) + } + require.Equal(t, tc.want, db.migrator.isCompleted()) + }) + } +} + +func TestMigrationInProcess(t *testing.T) { + // Verifies blocks are readable during migration for both migrated + // and un-migrated blocks. + // The test generates 21 blocks, migrates 20 but pauses after 5, + // writes block 21, and verifies migrated and un-migrated blocks are readable. + dataDir := t.TempDir() + db, evmDB := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 21) + + // add blocks 0-19 to KVDB to migrate + writeBlocks(evmDB, blocks[0:20], receipts[0:20]) + + // migrate blocks 1-6 + startPartialMigration(t, db, 6) + + // write block 20 to simulate new block being added during migration + writeBlocks(db, blocks[20:21], receipts[20:21]) + + // verify all 21 blocks are readable via the db + for i, block := range blocks { + num := block.NumberU64() + expReceipts := receipts[i] + + // We should be able to fetch block, receipts and logs. + actualBlock := rawdb.ReadBlock(db, block.Hash(), num) + requireRLPEqual(t, block, actualBlock) + actualReceipts := rawdb.ReadReceipts(db, block.Hash(), num, block.Time(), params.TestChainConfig) + requireRLPEqual(t, expReceipts, actualReceipts) + actualLogs := rawdb.ReadLogs(db, block.Hash(), num) + requireRLPEqual(t, logsFromReceipts(expReceipts), actualLogs) + + // header number should also be readable + actualNum := rawdb.ReadHeaderNumber(db, block.Hash()) + require.NotNil(t, actualNum) + require.Equal(t, num, *actualNum) + + // Block 1-6 and 20 should be migrated, others should not. + has, err := db.headerDB.Has(num) + require.NoError(t, err) + migrated := (num >= 1 && num <= 6) || num == 20 + require.Equal(t, migrated, has) + } +} + +func TestMigrationStart(t *testing.T) { + tests := []struct { + name string + toMigrateHeights []uint64 + migratedHeights []uint64 + }{ + { + name: "migrate blocks 0-4", + toMigrateHeights: []uint64{0, 1, 2, 3, 4}, + }, + { + name: "migrate blocks 20-24", + toMigrateHeights: []uint64{20, 21, 22, 23, 24}, + }, + { + name: "migrate non consecutive blocks", + toMigrateHeights: []uint64{20, 21, 22, 29, 30, 40}, + }, + { + name: "migrated 0-5 and to migrate 6-10", + toMigrateHeights: []uint64{6, 7, 8, 9, 10}, + migratedHeights: []uint64{0, 1, 2, 3, 4, 5}, + }, + { + name: "all blocks migrated", + migratedHeights: []uint64{0, 1, 2, 3, 4, 5}, + }, + { + name: "no blocks to migrate or migrated", + }, + { + name: "non consecutive blocks migrated and blocks to migrate", + toMigrateHeights: []uint64{2, 3, 7, 8, 10}, + migratedHeights: []uint64{0, 1, 4, 5, 9}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + dataDir := t.TempDir() + db, evmDB := newDatabasesFromDir(t, dataDir) + allHeights := slices.Concat(tc.toMigrateHeights, tc.migratedHeights) + var maxHeight uint64 + if len(allHeights) > 0 { + maxHeight = slices.Max(allHeights) + } + blocks, receipts := createBlocks(t, int(maxHeight)+1) + + // set initial db state + for _, height := range tc.toMigrateHeights { + writeBlocks(evmDB, []*types.Block{blocks[height]}, []types.Receipts{receipts[height]}) + } + for _, height := range tc.migratedHeights { + writeBlocks(db, []*types.Block{blocks[height]}, []types.Receipts{receipts[height]}) + } + + // Verify all blocks and receipts are accessible after migration. + startMigration(t, db, true) + for _, height := range allHeights { + expBlock := blocks[height] + expReceipts := receipts[height] + block := rawdb.ReadBlock(db, expBlock.Hash(), height) + requireRLPEqual(t, expBlock, block) + receipts := rawdb.ReadReceipts(db, expBlock.Hash(), height, expBlock.Time(), params.TestChainConfig) + requireRLPEqual(t, expReceipts, receipts) + logs := rawdb.ReadLogs(db, expBlock.Hash(), height) + requireRLPEqual(t, logsFromReceipts(expReceipts), logs) + + // Verify evmDB no longer has any blocks or receipts (except for genesis). + hasData := height == 0 + require.Equal(t, hasData, rawdb.HasHeader(evmDB, expBlock.Hash(), height)) + require.Equal(t, hasData, rawdb.HasBody(evmDB, expBlock.Hash(), height)) + require.Equal(t, hasData, rawdb.HasReceipts(evmDB, expBlock.Hash(), height)) + } + }) + } +} + +func TestMigrationResume(t *testing.T) { + // Verifies migration can be stopped mid-run and resumed. + dataDir := t.TempDir() + db, evmDB := newDatabasesFromDir(t, dataDir) + blocks, receipts := createBlocks(t, 10) + writeBlocks(evmDB, blocks, receipts) + + // block migration after 3 blocks + startPartialMigration(t, db, 3) + require.False(t, db.migrator.isCompleted()) + + for i := 0; i < 10; i++ { + migrated := i >= 1 && i <= 3 // blocks 1-3 are migrated + has, err := db.bodyDB.Has(uint64(i)) + require.NoError(t, err) + require.Equal(t, migrated, has) + } + + // stop migration and start again + require.NoError(t, db.Database.Close()) + db, _ = newDatabasesFromDir(t, dataDir) + require.False(t, db.migrator.isCompleted()) + startMigration(t, db, true) + + // verify all blocks are accessible after migration + for i, block := range blocks { + num := block.NumberU64() + hash := block.Hash() + actualBlock := rawdb.ReadBlock(db, hash, num) + requireRLPEqual(t, block, actualBlock) + actualReceipts := rawdb.ReadReceipts(db, hash, num, block.Time(), params.TestChainConfig) + requireRLPEqual(t, receipts[i], actualReceipts) + } +} + +func TestMigrationSkipsGenesis(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + blocks, receipts := createBlocks(t, 10) + writeBlocks(evmDB, blocks[0:1], receipts[0:1]) + writeBlocks(evmDB, blocks[5:10], receipts[5:10]) + + db, _, err := New( + base, + evmDB, + dataDir, + false, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + require.True(t, db.heightDBsReady) + require.Equal(t, uint64(5), db.minHeight) + + // migrate and verify genesis block is not migrated + startMigration(t, db, true) + require.True(t, db.migrator.isCompleted()) + genHash := rawdb.ReadCanonicalHash(evmDB, 0) + require.True(t, rawdb.HasHeader(evmDB, genHash, 0)) + has, err := db.bodyDB.Has(0) + require.NoError(t, err) + require.False(t, has) +} + +func TestMigrationWithoutReceipts(t *testing.T) { + dataDir := t.TempDir() + base, err := leveldb.New(dataDir, nil, logging.NoLog{}, prometheus.NewRegistry()) + require.NoError(t, err) + evmDB := rawdb.NewDatabase(evmdb.New(base)) + blocks, _ := createBlocks(t, 5) + + // write blocks without receipts to evmDB + for _, block := range blocks { + rawdb.WriteBlock(evmDB, block) + rawdb.WriteCanonicalHash(evmDB, block.Hash(), block.NumberU64()) + } + + db, initialized, err := New( + base, + evmDB, + dataDir, + false, + heightindexdb.DefaultConfig(), + logging.NoLog{}, + prometheus.NewRegistry(), + ) + require.NoError(t, err) + require.True(t, initialized) + startMigration(t, db, true) + require.True(t, db.migrator.isCompleted()) + + // verify all blocks are accessible and receipts are nil + for _, block := range blocks { + actualBlock := rawdb.ReadBlock(db, block.Hash(), block.NumberU64()) + requireRLPEqual(t, block, actualBlock) + recs := rawdb.ReadReceipts(db, block.Hash(), block.NumberU64(), block.Time(), params.TestChainConfig) + require.Nil(t, recs) + } +}