diff --git a/storage.go b/storage.go index 394ecf2..690a9f7 100644 --- a/storage.go +++ b/storage.go @@ -777,12 +777,17 @@ func (s *PersistentSlabStorage) sortedOwnedDeltaKeys() []SlabID { } func (s *PersistentSlabStorage) Commit() error { - var err error // this part ensures the keys are sorted so commit operation is deterministic keysWithOwners := s.sortedOwnedDeltaKeys() - for _, id := range keysWithOwners { + return s.commit(keysWithOwners) +} + +func (s *PersistentSlabStorage) commit(keys []SlabID) error { + var err error + + for _, id := range keys { slab := s.deltas[id] // deleted slabs @@ -964,6 +969,202 @@ func (s *PersistentSlabStorage) FastCommit(numWorkers int) error { return nil } +// NonderterministicFastCommit commits changes in nondeterministic order. +// This is used by migration program when ordering isn't required. +func (s *PersistentSlabStorage) NonderterministicFastCommit(numWorkers int) error { + // No changes + if len(s.deltas) == 0 { + return nil + } + + type slabToBeEncoded struct { + slabID SlabID + slab Slab + } + + type encodedSlab struct { + slabID SlabID + data []byte + err error + } + + // Define encoder (worker) to encode slabs in parallel + encoder := func( + wg *sync.WaitGroup, + done <-chan struct{}, + jobs <-chan slabToBeEncoded, + results chan<- encodedSlab, + ) { + defer wg.Done() + + for job := range jobs { + // Check if goroutine is signaled to stop before proceeding. + select { + case <-done: + return + default: + } + + id := job.slabID + slab := job.slab + + if slab == nil { + results <- encodedSlab{ + slabID: id, + data: nil, + err: nil, + } + continue + } + + // Serialize + data, err := EncodeSlab(slab, s.cborEncMode) + results <- encodedSlab{ + slabID: id, + data: data, + err: err, + } + } + } + + // Modified slabs need to be encoded (in parallel) and stored in underlying storage. + modifiedSlabCount := 0 + // Deleted slabs need to be removed from underlying storage. + deletedSlabCount := 0 + for k, v := range s.deltas { + // Ignore slabs not owned by accounts + if k.address == AddressUndefined { + continue + } + if v == nil { + deletedSlabCount++ + } else { + modifiedSlabCount++ + } + } + + if modifiedSlabCount == 0 && deletedSlabCount == 0 { + return nil + } + + if modifiedSlabCount < 2 { + // Avoid goroutine overhead + ids := make([]SlabID, 0, modifiedSlabCount+deletedSlabCount) + for k := range s.deltas { + // Ignore slabs not owned by accounts + if k.address == AddressUndefined { + continue + } + ids = append(ids, k) + } + + return s.commit(ids) + } + + if numWorkers > modifiedSlabCount { + numWorkers = modifiedSlabCount + } + + var wg sync.WaitGroup + + // Create done signal channel + done := make(chan struct{}) + + // Create job queue + jobs := make(chan slabToBeEncoded, modifiedSlabCount) + + // Create result queue + results := make(chan encodedSlab, modifiedSlabCount) + + defer func() { + // This ensures that all goroutines are stopped before output channel is closed. + + // Wait for all goroutines to finish + wg.Wait() + + // Close output channel + close(results) + }() + + // Launch workers to encode slabs + wg.Add(numWorkers) + for i := 0; i < numWorkers; i++ { + go encoder(&wg, done, jobs, results) + } + + // Send jobs + deletedSlabIDs := make([]SlabID, 0, deletedSlabCount) + for k, v := range s.deltas { + // ignore the ones that are not owned by accounts + if k.address == AddressUndefined { + continue + } + if v == nil { + deletedSlabIDs = append(deletedSlabIDs, k) + } else { + jobs <- slabToBeEncoded{k, v} + } + } + close(jobs) + + // Remove deleted slabs from underlying storage. + for _, id := range deletedSlabIDs { + + err := s.baseStorage.Remove(id) + if err != nil { + // Closing done channel signals goroutines to stop. + close(done) + // Wrap err as external error (if needed) because err is returned by BaseStorage interface. + return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to remove slab %s", id)) + } + + // Deleted slabs are removed from deltas and added to read cache so that: + // 1. next read is from in-memory read cache + // 2. deleted slabs are not re-committed in next commit + s.cache[id] = nil + delete(s.deltas, id) + } + + // Process encoded slabs + for i := 0; i < modifiedSlabCount; i++ { + result := <-results + + if result.err != nil { + // Closing done channel signals goroutines to stop. + close(done) + // result.err is already categorized by Encode(). + return result.err + } + + id := result.slabID + data := result.data + + if data == nil { + // Closing done channel signals goroutines to stop. + close(done) + // This is unexpected because deleted slabs are processed separately. + return NewEncodingErrorf("unexpectd encoded empty data") + } + + // Store + err := s.baseStorage.Store(id, data) + if err != nil { + // Wrap err as external error (if needed) because err is returned by BaseStorage interface. + return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to store slab %s", id)) + } + + s.cache[id] = s.deltas[id] + // It's safe to remove slab from deltas because + // iteration is on non-temp slabs and temp slabs + // are still in deltas. + delete(s.deltas, id) + } + + // Do NOT reset deltas because slabs with empty address are not saved. + + return nil +} + func (s *PersistentSlabStorage) DropDeltas() { s.deltas = make(map[SlabID]Slab) } diff --git a/storage_bench_test.go b/storage_bench_test.go new file mode 100644 index 0000000..155367e --- /dev/null +++ b/storage_bench_test.go @@ -0,0 +1,134 @@ +/* + * Atree - Scalable Arrays and Ordered Maps + * + * Copyright Dapper Labs, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package atree + +import ( + "encoding/binary" + "math/rand" + "runtime" + "strconv" + "testing" + + "github.com/fxamacker/cbor/v2" + "github.com/stretchr/testify/require" +) + +func benchmarkFastCommit(b *testing.B, seed int64, numberOfSlabs int) { + r := rand.New(rand.NewSource(seed)) + + encMode, err := cbor.EncOptions{}.EncMode() + require.NoError(b, err) + + decMode, err := cbor.DecOptions{}.DecMode() + require.NoError(b, err) + + slabs := make([]Slab, numberOfSlabs) + for i := 0; i < numberOfSlabs; i++ { + addr := generateRandomAddress(r) + + var index SlabIndex + binary.BigEndian.PutUint64(index[:], uint64(i)) + + id := SlabID{addr, index} + + slabs[i] = generateLargeSlab(id) + } + + b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + + baseStorage := NewInMemBaseStorage() + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil) + + for _, slab := range slabs { + err = storage.Store(slab.SlabID(), slab) + require.NoError(b, err) + } + + b.StartTimer() + + err := storage.FastCommit(runtime.NumCPU()) + require.NoError(b, err) + } + }) +} + +func benchmarkNondeterministicFastCommit(b *testing.B, seed int64, numberOfSlabs int) { + r := rand.New(rand.NewSource(seed)) + + encMode, err := cbor.EncOptions{}.EncMode() + require.NoError(b, err) + + decMode, err := cbor.DecOptions{}.DecMode() + require.NoError(b, err) + + slabs := make([]Slab, numberOfSlabs) + for i := 0; i < numberOfSlabs; i++ { + addr := generateRandomAddress(r) + + var index SlabIndex + binary.BigEndian.PutUint64(index[:], uint64(i)) + + id := SlabID{addr, index} + + slabs[i] = generateLargeSlab(id) + } + + b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + + baseStorage := NewInMemBaseStorage() + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil) + + for _, slab := range slabs { + err = storage.Store(slab.SlabID(), slab) + require.NoError(b, err) + } + + b.StartTimer() + + err := storage.NonderterministicFastCommit(runtime.NumCPU()) + require.NoError(b, err) + } + }) +} + +func BenchmarkStorageFastCommit(b *testing.B) { + fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc. + + benchmarkFastCommit(b, fixedSeed, 10) + benchmarkFastCommit(b, fixedSeed, 100) + benchmarkFastCommit(b, fixedSeed, 1_000) + benchmarkFastCommit(b, fixedSeed, 10_000) + benchmarkFastCommit(b, fixedSeed, 100_000) + benchmarkFastCommit(b, fixedSeed, 1_000_000) +} + +func BenchmarkStorageNondeterministicFastCommit(b *testing.B) { + fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc. + + benchmarkNondeterministicFastCommit(b, fixedSeed, 10) + benchmarkNondeterministicFastCommit(b, fixedSeed, 100) + benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000) + benchmarkNondeterministicFastCommit(b, fixedSeed, 10_000) + benchmarkNondeterministicFastCommit(b, fixedSeed, 100_000) + benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000_000) +} diff --git a/storage_test.go b/storage_test.go index 1b407fb..00756c2 100644 --- a/storage_test.go +++ b/storage_test.go @@ -412,8 +412,8 @@ func TestBasicSlabStorageStore(t *testing.T) { r := newRand(t) address := Address{1} slabs := map[SlabID]Slab{ - {address, SlabIndex{1}}: generateRandomSlab(address, r), - {address, SlabIndex{2}}: generateRandomSlab(address, r), + {address, SlabIndex{1}}: generateRandomSlab(SlabID{address, SlabIndex{1}}, r), + {address, SlabIndex{2}}: generateRandomSlab(SlabID{address, SlabIndex{2}}, r), } // Store values @@ -424,7 +424,7 @@ func TestBasicSlabStorageStore(t *testing.T) { // Overwrite stored values for id := range slabs { - slab := generateRandomSlab(id.address, r) + slab := generateRandomSlab(id, r) slabs[id] = slab err := storage.Store(id, slab) require.NoError(t, err) @@ -446,7 +446,7 @@ func TestBasicSlabStorageRetrieve(t *testing.T) { r := newRand(t) id := SlabID{Address{1}, SlabIndex{1}} - slab := generateRandomSlab(id.address, r) + slab := generateRandomSlab(id, r) // Retrieve value from empty storage retrievedSlab, found, err := storage.Retrieve(id) @@ -476,7 +476,7 @@ func TestBasicSlabStorageRemove(t *testing.T) { r := newRand(t) id := SlabID{Address{1}, SlabIndex{1}} - slab := generateRandomSlab(id.address, r) + slab := generateRandomSlab(id, r) // Remove value from empty storage err := storage.Remove(id) @@ -546,7 +546,7 @@ func TestBasicSlabStorageSlabIDs(t *testing.T) { // Store values for id := range wantIDs { - err := storage.Store(id, generateRandomSlab(id.address, r)) + err := storage.Store(id, generateRandomSlab(id, r)) require.NoError(t, err) } @@ -569,9 +569,9 @@ func TestBasicSlabStorageSlabIterat(t *testing.T) { id3 := SlabID{address: address, index: index.Next()} want := map[SlabID]Slab{ - id1: generateRandomSlab(id1.address, r), - id2: generateRandomSlab(id2.address, r), - id3: generateRandomSlab(id3.address, r), + id1: generateRandomSlab(id1, r), + id2: generateRandomSlab(id2, r), + id3: generateRandomSlab(id3, r), } storage := NewBasicSlabStorage(nil, nil, nil, nil) @@ -642,8 +642,8 @@ func TestPersistentStorage(t *testing.T) { permSlabID, err := NewSlabIDFromRawBytes([]byte{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}) require.NoError(t, err) - slab1 := generateRandomSlab(tempSlabID.address, r) - slab2 := generateRandomSlab(permSlabID.address, r) + slab1 := generateRandomSlab(tempSlabID, r) + slab2 := generateRandomSlab(permSlabID, r) // no temp ids should be in the base storage err = storage.Store(tempSlabID, slab1) @@ -724,8 +724,10 @@ func TestPersistentStorage(t *testing.T) { numberOfSlabsPerAccount := 10 r := newRand(t) + baseStorage := newAccessOrderTrackerBaseStorage() storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil) + baseStorage2 := newAccessOrderTrackerBaseStorage() storageWithFastCommit := NewPersistentSlabStorage(baseStorage2, encMode, decMode, nil, nil) @@ -735,16 +737,19 @@ func TestPersistentStorage(t *testing.T) { for i := 0; i < numberOfAccounts; i++ { for j := 0; j < numberOfSlabsPerAccount; j++ { addr := generateRandomAddress(r) - slab := generateRandomSlab(addr, r) - slabSize += uint64(slab.ByteSize()) slabID, err := storage.GenerateSlabID(addr) require.NoError(t, err) + + slab := generateRandomSlab(slabID, r) + slabSize += uint64(slab.ByteSize()) + err = storage.Store(slabID, slab) require.NoError(t, err) slabID2, err := storageWithFastCommit.GenerateSlabID(addr) require.NoError(t, err) + err = storageWithFastCommit.Store(slabID2, slab) require.NoError(t, err) @@ -1042,12 +1047,12 @@ func TestPersistentStorageGenerateSlabID(t *testing.T) { }) } -func generateRandomSlab(address Address, r *rand.Rand) Slab { +func generateRandomSlab(id SlabID, r *rand.Rand) Slab { storable := Uint64Value(r.Uint64()) return &ArrayDataSlab{ header: ArraySlabHeader{ - slabID: NewSlabID(address, SlabIndex{1}), + slabID: id, size: arrayRootDataSlabPrefixSize + storable.ByteSize(), count: 1, }, @@ -1055,6 +1060,28 @@ func generateRandomSlab(address Address, r *rand.Rand) Slab { } } +func generateLargeSlab(id SlabID) Slab { + + const elementCount = 100 + + storables := make([]Storable, elementCount) + size := uint32(0) + for i := 0; i < elementCount; i++ { + storable := Uint64Value(uint64(i)) + size += storable.ByteSize() + storables[i] = storable + } + + return &ArrayDataSlab{ + header: ArraySlabHeader{ + slabID: id, + size: arrayRootDataSlabPrefixSize + size, + count: elementCount, + }, + elements: storables, + } +} + func generateRandomAddress(r *rand.Rand) Address { address := Address{} r.Read(address[:]) @@ -4460,3 +4487,96 @@ func testGetAllChildReferences( require.Equal(t, len(expectedBrokenRefIDs), len(brokenRefs)) require.ElementsMatch(t, expectedBrokenRefIDs, brokenRefs) } + +func TestStorageNondeterministicFastCommit(t *testing.T) { + numberOfAccounts := 10 + + t.Run("small", func(t *testing.T) { + numberOfSlabsPerAccount := 10 + testStorageNondeterministicFastCommit(t, numberOfAccounts, numberOfSlabsPerAccount) + }) + + t.Run("large", func(t *testing.T) { + numberOfSlabsPerAccount := 1_000 + testStorageNondeterministicFastCommit(t, numberOfAccounts, numberOfSlabsPerAccount) + }) +} + +func testStorageNondeterministicFastCommit(t *testing.T, numberOfAccounts int, numberOfSlabsPerAccount int) { + encMode, err := cbor.EncOptions{}.EncMode() + require.NoError(t, err) + + decMode, err := cbor.DecOptions{}.DecMode() + require.NoError(t, err) + + r := newRand(t) + + baseStorage := NewInMemBaseStorage() + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil) + + encodedSlabs := make(map[SlabID][]byte) + slabSize := uint64(0) + + // Storage slabs + for i := 0; i < numberOfAccounts; i++ { + + addr := generateRandomAddress(r) + + for j := 0; j < numberOfSlabsPerAccount; j++ { + + slabID, err := storage.GenerateSlabID(addr) + require.NoError(t, err) + + slab := generateRandomSlab(slabID, r) + slabSize += uint64(slab.ByteSize()) + + err = storage.Store(slabID, slab) + require.NoError(t, err) + + // capture data for accuracy testing + encodedSlabs[slabID], err = EncodeSlab(slab, encMode) + require.NoError(t, err) + } + } + + require.Equal(t, uint(len(encodedSlabs)), storage.DeltasWithoutTempAddresses()) + require.Equal(t, slabSize, storage.DeltasSizeWithoutTempAddresses()) + + // Commit deltas + err = storage.NonderterministicFastCommit(10) + require.NoError(t, err) + + require.Equal(t, uint(0), storage.DeltasWithoutTempAddresses()) + require.Equal(t, uint64(0), storage.DeltasSizeWithoutTempAddresses()) + require.Equal(t, len(encodedSlabs), storage.Count()) + + // Compare encoded data + for sid, value := range encodedSlabs { + storedValue, found, err := baseStorage.Retrieve(sid) + require.NoError(t, err) + require.True(t, found) + require.Equal(t, value, storedValue) + } + + // Remove all slabs from storage + for sid := range encodedSlabs { + err = storage.Remove(sid) + require.NoError(t, err) + require.Equal(t, uint64(0), storage.DeltasSizeWithoutTempAddresses()) + } + + // Commit deltas + err = storage.NonderterministicFastCommit(10) + require.NoError(t, err) + + require.Equal(t, 0, storage.Count()) + require.Equal(t, uint64(0), storage.DeltasSizeWithoutTempAddresses()) + + // Check remove functionality + for sid := range encodedSlabs { + storedValue, found, err := storage.Retrieve(sid) + require.NoError(t, err) + require.False(t, found) + require.Nil(t, storedValue) + } +}