Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add feature to enable atree inlining migration to fix references to non-existent registers #388

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:

- uses: actions/setup-go@v4
with:
go-version: '1.19'
go-version: '1.20'
check-latest: true

- name: Get dependencies
Expand Down
252 changes: 252 additions & 0 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package atree
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"sort"
"strings"
Expand Down Expand Up @@ -1088,3 +1089,254 @@ func storeSlab(storage SlabStorage, slab Slab) error {
}
return nil
}

// FixLoadedBrokenReferences traverses loaded slabs and fixes broken references in maps.
// A broken reference is a SlabID referencing a non-existent slab.
// To fix a map containing broken references, this function replaces broken map with
// empty map having the same SlabID and also removes all slabs in the old map.
// Limitations:
// - only fix broken references in map
// - only traverse loaded slabs in deltas and cache
// NOTE: The intended use case is to enable migration programs in onflow/flow-go to
// fix broken references. As of April 2024, only 10 registers in testnet (not mainnet)
// were found to have broken references and they seem to have resulted from a bug
// that was fixed 2 years ago by https://github.com/onflow/cadence/pull/1565.
func (s *PersistentSlabStorage) FixLoadedBrokenReferences(needToFix func(old Value) bool) (
fixedSlabIDs map[SlabID][]SlabID, // key: root slab ID, value: slab IDs containing broken refs
skippedSlabIDs map[SlabID][]SlabID, // key: root slab ID, value: slab IDs containing broken refs
err error,
) {

// parentOf is used to find root slab from non-root slab.
// Broken reference can be in non-root slab, and we need SlabID of root slab
// to replace broken map by creating an empty new map with same SlabID.
parentOf := make(map[SlabID]SlabID)

getRootSlabID := func(id SlabID) SlabID {
for {
parentID, ok := parentOf[id]
if ok {
id = parentID
} else {
return id
}
}
}

hasBrokenReferenceInSlab := func(id SlabID, slab Slab) bool {
if slab == nil {
return false
}

var isMetaDataSlab bool

switch slab.(type) {
case *ArrayMetaDataSlab, *MapMetaDataSlab:
isMetaDataSlab = true
}

var foundBrokenRef bool
for _, childStorable := range slab.ChildStorables() {

slabIDStorable, ok := childStorable.(SlabIDStorable)
if !ok {
continue
}

childID := SlabID(slabIDStorable)

// Track parent-child relationship of root slabs and non-root slabs.
if isMetaDataSlab {
parentOf[childID] = id
}

if s.existIfLoaded(childID) {
continue
}

foundBrokenRef = true

if !isMetaDataSlab {
return true
}
}

return foundBrokenRef
}

var brokenSlabIDs []SlabID

// Iterate delta slabs.
for id, slab := range s.deltas {
if hasBrokenReferenceInSlab(id, slab) {
brokenSlabIDs = append(brokenSlabIDs, id)
}
}

// Iterate cache slabs.
for id, slab := range s.cache {
if _, ok := s.deltas[id]; ok {
continue
}
if hasBrokenReferenceInSlab(id, slab) {
brokenSlabIDs = append(brokenSlabIDs, id)
}
}

if len(brokenSlabIDs) == 0 {
return nil, nil, nil
}

rootSlabIDsWithBrokenData := make(map[SlabID][]SlabID)
var errs []error

// Find SlabIDs of root slab for slabs containing broken references.
for _, id := range brokenSlabIDs {
rootID := getRootSlabID(id)
if rootID == SlabIDUndefined {
errs = append(errs, fmt.Errorf("failed to get root slab id for slab %s", id))
continue
}
rootSlabIDsWithBrokenData[rootID] = append(rootSlabIDsWithBrokenData[rootID], id)
}

for rootSlabID, brokenSlabIDs := range rootSlabIDsWithBrokenData {
rootSlab := s.RetrieveIfLoaded(rootSlabID)
if rootSlab == nil {
errs = append(errs, fmt.Errorf("failed to retrieve loaded root slab %s", rootSlabID))
continue
}

switch rootSlab := rootSlab.(type) {
case MapSlab:
value, err := rootSlab.StoredValue(s)
if err != nil {
errs = append(errs, fmt.Errorf("failed to convert slab %s into value", rootSlab.SlabID()))
continue
}

if needToFix(value) {
err := s.fixBrokenReferencesInMap(rootSlab)
if err != nil {
errs = append(errs, err)
continue
}
} else {
if skippedSlabIDs == nil {
skippedSlabIDs = make(map[SlabID][]SlabID)
}
skippedSlabIDs[rootSlabID] = brokenSlabIDs
}

default:
// IMPORTANT: Only handle map slabs for now. DO NOT silently fix currently unknown problems.
errs = append(errs, fmt.Errorf("failed to fix broken references in non-map slab %s (%T)", rootSlab.SlabID(), rootSlab))
}
}

for id := range skippedSlabIDs {
delete(rootSlabIDsWithBrokenData, id)
}

return rootSlabIDsWithBrokenData, skippedSlabIDs, errors.Join(errs...)
}

// fixBrokenReferencesInMap replaces replaces broken map with empty map
// having the same SlabID and also removes all slabs in the old map.
func (s *PersistentSlabStorage) fixBrokenReferencesInMap(old MapSlab) error {
id := old.SlabID()

oldExtraData := old.ExtraData()

// Create an empty map with the same StorgeID, type, and seed as the old map.
new := &MapDataSlab{
header: MapSlabHeader{
slabID: id,
size: mapRootDataSlabPrefixSize + hkeyElementsPrefixSize,
},
extraData: &MapExtraData{
TypeInfo: oldExtraData.TypeInfo,
Seed: oldExtraData.Seed,
},
elements: newHkeyElements(0),
}

// Store new empty map with the same SlabID.
err := s.Store(id, new)
if err != nil {
return err
}

// Remove all slabs and references in old map.
references, _, err := s.getAllChildReferences(old)
if err != nil {
return err
}

for _, childID := range references {
err = s.Remove(childID)
if err != nil {
return err
}
}

return nil
}

func (s *PersistentSlabStorage) existIfLoaded(id SlabID) bool {
// Check deltas.
if slab, ok := s.deltas[id]; ok {
return slab != nil
}

// Check read cache.
if slab, ok := s.cache[id]; ok {
return slab != nil
}

return false
}

// getAllChildReferences returns child references of given slab (all levels).
func (s *PersistentSlabStorage) getAllChildReferences(slab Slab) (
references []SlabID,
brokenReferences []SlabID,
err error,
) {
childStorables := slab.ChildStorables()

for len(childStorables) > 0 {

var nextChildStorables []Storable

for _, childStorable := range childStorables {

slabIDStorable, ok := childStorable.(SlabIDStorable)
if !ok {
continue
}

childID := SlabID(slabIDStorable)

childSlab, ok, err := s.Retrieve(childID)
if err != nil {
return nil, nil, err
}
if !ok {
brokenReferences = append(brokenReferences, childID)
continue
}

references = append(references, childID)

nextChildStorables = append(
nextChildStorables,
childSlab.ChildStorables()...,
)
}

childStorables = nextChildStorables
}

return references, brokenReferences, nil
}
Loading
Loading