From 9b5be888b176ba7650872c22beec7d2736415e80 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Thu, 9 May 2024 08:28:39 -0500 Subject: [PATCH] Add BatchPreload to decode slabs in parallel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The intended use for BatchPreload is to speedup migrations. BatchPreload decodes slabs in parallel and stores decoded slabs in cache for later retrieval. This is useful for migration program when most or all slabs are expected to be migrated. │ before.txt │ after.txt │ │ sec/op │ sec/op vs base │ StorageRetrieve/10-12 36.23µ ± 3% 35.33µ ± 4% ~ (p=0.075 n=10) StorageRetrieve/100-12 469.6µ ± 8% 124.3µ ± 0% -73.52% (p=0.000 n=10) StorageRetrieve/1000-12 6.678m ± 7% 2.303m ± 20% -65.51% (p=0.000 n=10) StorageRetrieve/10000-12 29.81m ± 2% 12.26m ± 5% -58.86% (p=0.000 n=10) StorageRetrieve/100000-12 303.33m ± 1% 88.40m ± 1% -70.86% (p=0.000 n=10) StorageRetrieve/1000000-12 3.442 ± 1% 1.137 ± 3% -66.96% (p=0.000 n=10) geomean 12.34m 4.816m -60.98% │ before.txt │ after.txt │ │ B/op │ B/op vs base │ StorageRetrieve/10-12 21.59Ki ± 0% 21.59Ki ± 0% ~ (p=1.000 n=10) StorageRetrieve/100-12 219.8Ki ± 0% 224.7Ki ± 0% +2.24% (p=0.000 n=10) StorageRetrieve/1000-12 2.266Mi ± 0% 2.272Mi ± 0% +0.27% (p=0.000 n=10) StorageRetrieve/10000-12 21.94Mi ± 0% 22.14Mi ± 0% +0.91% (p=0.000 n=10) StorageRetrieve/100000-12 215.3Mi ± 0% 218.5Mi ± 0% +1.50% (p=0.000 n=10) StorageRetrieve/1000000-12 2.211Gi ± 0% 2.212Gi ± 0% +0.05% (p=0.000 n=10) geomean 6.919Mi 6.976Mi +0.82% │ before.txt │ after.txt │ │ allocs/op │ allocs/op vs base │ StorageRetrieve/10-12 76.00 ± 0% 76.00 ± 0% ~ (p=1.000 n=10) ¹ StorageRetrieve/100-12 745.0 ± 0% 759.0 ± 0% +1.88% (p=0.000 n=10) StorageRetrieve/1000-12 7.161k ± 0% 7.153k ± 0% -0.11% (p=0.000 n=10) StorageRetrieve/10000-12 70.77k ± 0% 70.58k ± 0% -0.27% (p=0.000 n=10) StorageRetrieve/100000-12 711.9k ± 0% 709.7k ± 0% -0.31% (p=0.000 n=10) StorageRetrieve/1000000-12 7.115M ± 0% 7.077M ± 0% -0.54% (p=0.000 n=10) geomean 22.93k 22.95k +0.11% --- storage.go | 148 +++++++++++++++++++++++++++++++++++++++ storage_bench_test.go | 119 ++++++++++++++++++++++++++++++++ storage_test.go | 156 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 423 insertions(+) diff --git a/storage.go b/storage.go index 690a9f7..2bed523 100644 --- a/storage.go +++ b/storage.go @@ -1558,3 +1558,151 @@ func (s *PersistentSlabStorage) getAllChildReferences(slab Slab) ( return references, brokenReferences, nil } + +func (s *PersistentSlabStorage) BatchPreload(ids []SlabID, numWorkers int) error { + if len(ids) == 0 { + return nil + } + + minCountForBatchPreload := 11 + if len(ids) < minCountForBatchPreload { + + for _, id := range ids { + // fetch from base storage last + data, ok, err := s.baseStorage.Retrieve(id) + if err != nil { + // Wrap err as external error (if needed) because err is returned by BaseStorage interface. + return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to retrieve slab %s", id)) + } + if !ok { + return NewSlabNotFoundError(id, fmt.Errorf("failed to retrieve slab %s", id)) + } + + slab, err := DecodeSlab(id, data, s.cborDecMode, s.DecodeStorable, s.DecodeTypeInfo) + if err != nil { + // err is already categorized by DecodeSlab(). + return err + } + + // save decoded slab to cache + s.cache[id] = slab + } + + return nil + } + + type slabToBeDecoded struct { + slabID SlabID + data []byte + } + + type decodedSlab struct { + slabID SlabID + slab Slab + err error + } + + // Define decoder (worker) to decode slabs in parallel + decoder := func(wg *sync.WaitGroup, done <-chan struct{}, jobs <-chan slabToBeDecoded, results chan<- decodedSlab) { + defer wg.Done() + + for slabData := range jobs { + // Check if goroutine is signaled to stop before proceeding. + select { + case <-done: + return + default: + } + + id := slabData.slabID + data := slabData.data + + slab, err := DecodeSlab(id, data, s.cborDecMode, s.DecodeStorable, s.DecodeTypeInfo) + // err is already categorized by DecodeSlab(). + results <- decodedSlab{ + slabID: id, + slab: slab, + err: err, + } + } + } + + if numWorkers > len(ids) { + numWorkers = len(ids) + } + + var wg sync.WaitGroup + + // Construct done signal channel + done := make(chan struct{}) + + // Construct job queue + jobs := make(chan slabToBeDecoded, len(ids)) + + // Construct result queue + results := make(chan decodedSlab, len(ids)) + + defer func() { + // This ensures that all goroutines are stopped before output channel is closed. + + // Wait for all goroutines to finish + wg.Wait() + + // Close output channel + close(results) + }() + + // Preallocate cache map if empty + if len(s.cache) == 0 { + s.cache = make(map[SlabID]Slab, len(ids)) + } + + // Launch workers + wg.Add(numWorkers) + for i := 0; i < numWorkers; i++ { + go decoder(&wg, done, jobs, results) + } + + // Send jobs + { + // Need to close input channel (jobs) here because + // if there isn't any job in jobs channel, + // done is never processed inside loop "for slabData := range jobs". + defer close(jobs) + + for _, id := range ids { + // fetch from base storage last + data, ok, err := s.baseStorage.Retrieve(id) + if err != nil { + // Closing done channel signals goroutines to stop. + close(done) + // Wrap err as external error (if needed) because err is returned by BaseStorage interface. + return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to retrieve slab %s", id)) + } + if !ok { + // Closing done channel signals goroutines to stop. + close(done) + return NewSlabNotFoundError(id, fmt.Errorf("failed to retrieve slab %s", id)) + } + + jobs <- slabToBeDecoded{id, data} + } + } + + // Process results + for i := 0; i < len(ids); i++ { + result := <-results + + if result.err != nil { + // Closing done channel signals goroutines to stop. + close(done) + // result.err is already categorized by DecodeSlab(). + return result.err + } + + // save decoded slab to cache + s.cache[result.slabID] = result.slab + } + + return nil +} diff --git a/storage_bench_test.go b/storage_bench_test.go index ae76f26..5fb80f1 100644 --- a/storage_bench_test.go +++ b/storage_bench_test.go @@ -132,3 +132,122 @@ func BenchmarkStorageNondeterministicFastCommit(b *testing.B) { benchmarkNondeterministicFastCommit(b, fixedSeed, 100_000) benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000_000) } + +func benchmarkRetrieve(b *testing.B, seed int64, numberOfSlabs int) { + + r := rand.New(rand.NewSource(seed)) + + encMode, err := cbor.EncOptions{}.EncMode() + require.NoError(b, err) + + decMode, err := cbor.DecOptions{}.DecMode() + require.NoError(b, err) + + encodedSlabs := make(map[SlabID][]byte) + ids := make([]SlabID, 0, numberOfSlabs) + for i := 0; i < numberOfSlabs; i++ { + addr := generateRandomAddress(r) + + var index SlabIndex + binary.BigEndian.PutUint64(index[:], uint64(i)) + + id := SlabID{addr, index} + + slab := generateLargeSlab(id) + + data, err := EncodeSlab(slab, encMode) + require.NoError(b, err) + + encodedSlabs[id] = data + ids = append(ids, id) + } + + b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + + baseStorage := NewInMemBaseStorageFromMap(encodedSlabs) + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, decodeStorable, decodeTypeInfo) + + b.StartTimer() + + for _, id := range ids { + _, found, err := storage.Retrieve(id) + require.True(b, found) + require.NoError(b, err) + } + } + }) +} + +func benchmarkBatchPreload(b *testing.B, seed int64, numberOfSlabs int) { + + r := rand.New(rand.NewSource(seed)) + + encMode, err := cbor.EncOptions{}.EncMode() + require.NoError(b, err) + + decMode, err := cbor.DecOptions{}.DecMode() + require.NoError(b, err) + + encodedSlabs := make(map[SlabID][]byte) + ids := make([]SlabID, 0, numberOfSlabs) + for i := 0; i < numberOfSlabs; i++ { + addr := generateRandomAddress(r) + + var index SlabIndex + binary.BigEndian.PutUint64(index[:], uint64(i)) + + id := SlabID{addr, index} + + slab := generateLargeSlab(id) + + data, err := EncodeSlab(slab, encMode) + require.NoError(b, err) + + encodedSlabs[id] = data + ids = append(ids, id) + } + + b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + + baseStorage := NewInMemBaseStorageFromMap(encodedSlabs) + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, decodeStorable, decodeTypeInfo) + + b.StartTimer() + + err = storage.BatchPreload(ids, runtime.NumCPU()) + require.NoError(b, err) + + for _, id := range ids { + _, found, err := storage.Retrieve(id) + require.True(b, found) + require.NoError(b, err) + } + } + }) +} + +func BenchmarkStorageRetrieve(b *testing.B) { + fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc. + + benchmarkRetrieve(b, fixedSeed, 10) + benchmarkRetrieve(b, fixedSeed, 100) + benchmarkRetrieve(b, fixedSeed, 1_000) + benchmarkRetrieve(b, fixedSeed, 10_000) + benchmarkRetrieve(b, fixedSeed, 100_000) + benchmarkRetrieve(b, fixedSeed, 1_000_000) +} + +func BenchmarkStorageBatchPreload(b *testing.B) { + fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc. + + benchmarkBatchPreload(b, fixedSeed, 10) + benchmarkBatchPreload(b, fixedSeed, 100) + benchmarkBatchPreload(b, fixedSeed, 1_000) + benchmarkBatchPreload(b, fixedSeed, 10_000) + benchmarkBatchPreload(b, fixedSeed, 100_000) + benchmarkBatchPreload(b, fixedSeed, 1_000_000) +} diff --git a/storage_test.go b/storage_test.go index 00756c2..33df451 100644 --- a/storage_test.go +++ b/storage_test.go @@ -19,6 +19,7 @@ package atree import ( + "encoding/binary" "errors" "math/rand" "runtime" @@ -4580,3 +4581,158 @@ func testStorageNondeterministicFastCommit(t *testing.T, numberOfAccounts int, n require.Nil(t, storedValue) } } + +func TestStorageBatchPreload(t *testing.T) { + t.Run("0 slab", func(t *testing.T) { + numberOfAccounts := 0 + numberOfSlabsPerAccount := 0 + testStorageBatchPreload(t, numberOfAccounts, numberOfSlabsPerAccount) + }) + + t.Run("1 slab", func(t *testing.T) { + numberOfAccounts := 1 + numberOfSlabsPerAccount := 1 + testStorageBatchPreload(t, numberOfAccounts, numberOfSlabsPerAccount) + }) + + t.Run("10 slab", func(t *testing.T) { + numberOfAccounts := 1 + numberOfSlabsPerAccount := 10 + testStorageBatchPreload(t, numberOfAccounts, numberOfSlabsPerAccount) + }) + + t.Run("100 slabs", func(t *testing.T) { + numberOfAccounts := 10 + numberOfSlabsPerAccount := 10 + testStorageBatchPreload(t, numberOfAccounts, numberOfSlabsPerAccount) + }) + + t.Run("10_000 slabs", func(t *testing.T) { + numberOfAccounts := 10 + numberOfSlabsPerAccount := 1_000 + testStorageBatchPreload(t, numberOfAccounts, numberOfSlabsPerAccount) + }) +} + +func testStorageBatchPreload(t *testing.T, numberOfAccounts int, numberOfSlabsPerAccount int) { + + indexesByAddress := make(map[Address]uint64) + + generateSlabID := func(address Address) SlabID { + nextIndex := indexesByAddress[address] + 1 + + var idx SlabIndex + binary.BigEndian.PutUint64(idx[:], nextIndex) + + indexesByAddress[address] = nextIndex + + return NewSlabID(address, idx) + } + + encMode, err := cbor.EncOptions{}.EncMode() + require.NoError(t, err) + + decMode, err := cbor.DecOptions{}.DecMode() + require.NoError(t, err) + + r := newRand(t) + + encodedSlabs := make(map[SlabID][]byte) + + // Generate and encode slabs + for i := 0; i < numberOfAccounts; i++ { + + addr := generateRandomAddress(r) + + for j := 0; j < numberOfSlabsPerAccount; j++ { + + slabID := generateSlabID(addr) + + slab := generateRandomSlab(slabID, r) + + encodedSlabs[slabID], err = EncodeSlab(slab, encMode) + require.NoError(t, err) + } + } + + baseStorage := NewInMemBaseStorageFromMap(encodedSlabs) + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, decodeStorable, decodeTypeInfo) + + ids := make([]SlabID, 0, len(encodedSlabs)) + for id := range encodedSlabs { + ids = append(ids, id) + } + + // Batch preload slabs from base storage + err = storage.BatchPreload(ids, runtime.NumCPU()) + require.NoError(t, err) + require.Equal(t, len(encodedSlabs), len(storage.cache)) + require.Equal(t, 0, len(storage.deltas)) + + // Compare encoded data + for id, data := range encodedSlabs { + cachedData, err := EncodeSlab(storage.cache[id], encMode) + require.NoError(t, err) + + require.Equal(t, cachedData, data) + } +} + +func TestStorageBatchPreloadError(t *testing.T) { + + encMode, err := cbor.EncOptions{}.EncMode() + require.NoError(t, err) + + decMode, err := cbor.DecOptions{}.DecMode() + require.NoError(t, err) + + r := newRand(t) + + t.Run("empty storage", func(t *testing.T) { + const numberOfSlabs = 10 + + ids := make([]SlabID, numberOfSlabs) + for i := 0; i < numberOfSlabs; i++ { + var index SlabIndex + binary.BigEndian.PutUint64(index[:], uint64(i)) + + ids[i] = NewSlabID(generateRandomAddress(r), index) + } + + baseStorage := NewInMemBaseStorage() + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, decodeStorable, decodeTypeInfo) + + err := storage.BatchPreload(ids, runtime.NumCPU()) + require.Error(t, err) + }) + + t.Run("non-empty storage", func(t *testing.T) { + const numberOfSlabs = 10 + + ids := make([]SlabID, numberOfSlabs) + encodedSlabs := make(map[SlabID][]byte) + + for i := 0; i < numberOfSlabs; i++ { + var index SlabIndex + binary.BigEndian.PutUint64(index[:], uint64(i)) + + id := NewSlabID(generateRandomAddress(r), index) + + slab := generateRandomSlab(id, r) + + encodedSlabs[id], err = EncodeSlab(slab, encMode) + require.NoError(t, err) + + ids[i] = id + } + + // Append a slab ID that doesn't exist in storage. + ids = append(ids, NewSlabID(generateRandomAddress(r), SlabIndex{numberOfSlabs})) + + baseStorage := NewInMemBaseStorageFromMap(encodedSlabs) + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, decodeStorable, decodeTypeInfo) + + err := storage.BatchPreload(ids, runtime.NumCPU()) + require.Error(t, err) + }) +}