Skip to content

Commit

Permalink
Merge pull request #6363 from onflow/fxamacker/move-migration-funcs-t…
Browse files Browse the repository at this point in the history
…o-util-pkg-for-reuse

Move and export some migration funcs to util package for reuse
  • Loading branch information
fxamacker authored Aug 20, 2024
2 parents 6701e9e + 0dcea52 commit 2266069
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 162 deletions.
98 changes: 1 addition & 97 deletions cmd/util/cmd/execution-state-extract/execution_state_extract.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package extract

import (
"context"
"encoding/json"
"fmt"
"os"
syncAtomic "sync/atomic"
"time"

"github.com/rs/zerolog"
Expand All @@ -15,7 +13,6 @@ import (
migrators "github.com/onflow/flow-go/cmd/util/ledger/migrations"
"github.com/onflow/flow-go/cmd/util/ledger/reporters"
"github.com/onflow/flow-go/cmd/util/ledger/util"
"github.com/onflow/flow-go/cmd/util/ledger/util/registers"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/hash"
"github.com/onflow/flow-go/ledger/common/pathfinder"
Expand Down Expand Up @@ -268,7 +265,7 @@ func newMigration(
payloadCount,
)

registersByAccount, err := newByAccountRegistersFromPayloadAccountGrouping(payloadAccountGrouping, nWorker)
registersByAccount, err := util.NewByAccountRegistersFromPayloadAccountGrouping(payloadAccountGrouping, nWorker)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -335,99 +332,6 @@ func newMigration(
}
}

func newByAccountRegistersFromPayloadAccountGrouping(
payloadAccountGrouping *util.PayloadAccountGrouping,
nWorker int,
) (
*registers.ByAccount,
error,
) {
g, ctx := errgroup.WithContext(context.Background())

jobs := make(chan *util.PayloadAccountGroup, nWorker)
results := make(chan *registers.AccountRegisters, nWorker)

g.Go(func() error {
defer close(jobs)
for {
payloadAccountGroup, err := payloadAccountGrouping.Next()
if err != nil {
return fmt.Errorf("failed to group payloads by account: %w", err)
}

if payloadAccountGroup == nil {
return nil
}

select {
case <-ctx.Done():
return ctx.Err()
case jobs <- payloadAccountGroup:
}
}
})

workersLeft := int64(nWorker)
for i := 0; i < nWorker; i++ {
g.Go(func() error {
defer func() {
if syncAtomic.AddInt64(&workersLeft, -1) == 0 {
close(results)
}
}()

for payloadAccountGroup := range jobs {

// Convert address to owner
payloadGroupOwner := flow.AddressToRegisterOwner(payloadAccountGroup.Address)

accountRegisters, err := registers.NewAccountRegistersFromPayloads(
payloadGroupOwner,
payloadAccountGroup.Payloads,
)
if err != nil {
return fmt.Errorf("failed to create account registers from payloads: %w", err)
}
select {
case <-ctx.Done():
return ctx.Err()
case results <- accountRegisters:
}
}

return nil
})
}

registersByAccount := registers.NewByAccount()
g.Go(func() error {
for accountRegisters := range results {
oldAccountRegisters := registersByAccount.SetAccountRegisters(accountRegisters)
if oldAccountRegisters != nil {
// Account grouping should never create multiple groups for an account.
// In case it does anyway, merge the groups together,
// by merging the existing registers into the new ones.

log.Warn().Msgf(
"account registers already exist for account %x. merging %d existing registers into %d new",
accountRegisters.Owner(),
oldAccountRegisters.Count(),
accountRegisters.Count(),
)

err := accountRegisters.Merge(oldAccountRegisters)
if err != nil {
return fmt.Errorf("failed to merge account registers: %w", err)
}
}
}

return nil
})

return registersByAccount, g.Wait()
}

func createTrieFromPayloads(logger zerolog.Logger, payloads []*ledger.Payload) (*trie.MTrie, error) {
// get paths
paths, err := pathfinder.PathsFromPayloads(payloads, complete.DefaultPathFinderVersion)
Expand Down
4 changes: 2 additions & 2 deletions cmd/util/ledger/migrations/cadence_value_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ func (dr *CadenceValueDiffReporter) DiffStates(oldRegs, newRegs registers.Regist
var loadAtreeStorageGroup errgroup.Group

loadAtreeStorageGroup.Go(func() (err error) {
return loadAtreeSlabsInStorage(oldStorage, oldRegs, dr.nWorkers)
return util.LoadAtreeSlabsInStorage(oldStorage, oldRegs, dr.nWorkers)
})

err := loadAtreeSlabsInStorage(newStorage, newRegs, dr.nWorkers)
err := util.LoadAtreeSlabsInStorage(newStorage, newRegs, dr.nWorkers)
if err != nil {
dr.reportWriter.Write(
diffError{
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/ledger/migrations/cadence_values_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (m *CadenceBaseMigration) MigrateAccount(
var storageHealthErrorBefore error
if m.checkStorageHealthBeforeMigration {

storageHealthErrorBefore = checkStorageHealth(address, storage, accountRegisters, m.nWorkers)
storageHealthErrorBefore = util.CheckStorageHealth(address, storage, accountRegisters, AllStorageMapDomains, m.nWorkers)
if storageHealthErrorBefore != nil {
m.log.Warn().
Err(storageHealthErrorBefore).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (m *FilterUnreferencedSlabsMigration) MigrateAccount(
nil,
)

err := checkStorageHealth(address, storage, accountRegisters, m.nWorkers)
err := util.CheckStorageHealth(address, storage, accountRegisters, AllStorageMapDomains, m.nWorkers)
if err == nil {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/ledger/migrations/fix_broken_data_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (m *FixSlabsWithBrokenReferencesMigration) MigrateAccount(
storage := migrationRuntime.Storage

// Load all atree registers in storage
err := loadAtreeSlabsInStorage(storage, accountRegisters, m.nWorkers)
err := util.LoadAtreeSlabsInStorage(storage, accountRegisters, m.nWorkers)
if err != nil {
return err
}
Expand Down
60 changes: 0 additions & 60 deletions cmd/util/ledger/migrations/utils.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package migrations

import (
"github.com/onflow/atree"
"github.com/onflow/cadence/runtime"
"github.com/onflow/cadence/runtime/common"
"github.com/onflow/cadence/runtime/stdlib"

"github.com/onflow/flow-go/cmd/util/ledger/util/registers"
"github.com/onflow/flow-go/model/flow"
)

type RegistersMigration func(registersByAccount *registers.ByAccount) error
Expand All @@ -30,61 +28,3 @@ func init() {
allStorageMapDomainsSet[domain] = struct{}{}
}
}

func getSlabIDsFromRegisters(registers registers.Registers) ([]atree.SlabID, error) {
storageIDs := make([]atree.SlabID, 0, registers.Count())

err := registers.ForEach(func(owner string, key string, value []byte) error {

if !flow.IsSlabIndexKey(key) {
return nil
}

slabID := atree.NewSlabID(
atree.Address([]byte(owner)),
atree.SlabIndex([]byte(key[1:])),
)

storageIDs = append(storageIDs, slabID)

return nil
})
if err != nil {
return nil, err
}

return storageIDs, nil
}

func loadAtreeSlabsInStorage(
storage *runtime.Storage,
registers registers.Registers,
nWorkers int,
) error {

storageIDs, err := getSlabIDsFromRegisters(registers)
if err != nil {
return err
}

return storage.PersistentSlabStorage.BatchPreload(storageIDs, nWorkers)
}

func checkStorageHealth(
address common.Address,
storage *runtime.Storage,
registers registers.Registers,
nWorkers int,
) error {

err := loadAtreeSlabsInStorage(storage, registers, nWorkers)
if err != nil {
return err
}

for _, domain := range AllStorageMapDomains {
_ = storage.GetStorageMap(address, domain, false)
}

return storage.CheckHealth()
}
65 changes: 65 additions & 0 deletions cmd/util/ledger/util/atree_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ package util
import (
"fmt"

"github.com/onflow/atree"

"github.com/onflow/cadence/runtime"
"github.com/onflow/cadence/runtime/common"

"github.com/onflow/flow-go/cmd/util/ledger/util/registers"
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/convert"
"github.com/onflow/flow-go/model/flow"
Expand Down Expand Up @@ -57,3 +63,62 @@ func newHeadFromData(data []byte) (head, error) {
func (h *head) version() byte {
return (h[0] & maskVersion) >> 4
}

func getSlabIDsFromRegisters(registers registers.Registers) ([]atree.SlabID, error) {
storageIDs := make([]atree.SlabID, 0, registers.Count())

err := registers.ForEach(func(owner string, key string, _ []byte) error {

if !flow.IsSlabIndexKey(key) {
return nil
}

slabID := atree.NewSlabID(
atree.Address([]byte(owner)),
atree.SlabIndex([]byte(key[1:])),
)

storageIDs = append(storageIDs, slabID)

return nil
})
if err != nil {
return nil, err
}

return storageIDs, nil
}

func LoadAtreeSlabsInStorage(
storage *runtime.Storage,
registers registers.Registers,
nWorkers int,
) error {

storageIDs, err := getSlabIDsFromRegisters(registers)
if err != nil {
return err
}

return storage.PersistentSlabStorage.BatchPreload(storageIDs, nWorkers)
}

func CheckStorageHealth(
address common.Address,
storage *runtime.Storage,
registers registers.Registers,
domains []string,
nWorkers int,
) error {

err := LoadAtreeSlabsInStorage(storage, registers, nWorkers)
if err != nil {
return err
}

for _, domain := range domains {
_ = storage.GetStorageMap(address, domain, false)
}

return storage.CheckHealth()
}
Loading

0 comments on commit 2266069

Please sign in to comment.