From 7162eabcbf3bbe25fd2816bc4bace147bc6fa7d3 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Wed, 8 May 2024 14:48:59 -0500 Subject: [PATCH 1/8] Add NonderterministicFastCommit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit NonderterministicFastCommit commits changes in nondeterministic order. It can be used by migration program when ordering isn't required. │ before.txt │ after.txt │ │ sec/op │ sec/op vs base │ StorageFastCommit/10-12 89.72µ ± 4% 57.50µ ± 3% -35.92% (p=0.000 n=10) StorageFastCommit/100-12 118.9µ ± 1% 116.0µ ± 4% ~ (p=0.436 n=10) StorageFastCommit/1000-12 4.086m ± 5% 2.397m ± 25% -41.35% (p=0.000 n=10) StorageFastCommit/10000-12 12.629m ± 4% 9.857m ± 3% -21.95% (p=0.000 n=10) StorageFastCommit/100000-12 102.73m ± 0% 72.26m ± 1% -29.66% (p=0.000 n=10) StorageFastCommit/1000000-12 1.544 ± 2% 1.141 ± 2% -26.09% (p=0.000 n=10) geomean 6.661m 4.848m -27.21% │ before.txt │ after.txt │ │ B/op │ B/op vs base │ StorageFastCommit/10-12 28.92Ki ± 0% 28.05Ki ± 0% -3.00% (p=0.000 n=10) StorageFastCommit/100-12 286.4Ki ± 0% 278.6Ki ± 0% -2.71% (p=0.000 n=10) StorageFastCommit/1000-12 3.009Mi ± 0% 2.901Mi ± 0% -3.58% (p=0.000 n=10) StorageFastCommit/10000-12 28.65Mi ± 0% 27.79Mi ± 0% -2.98% (p=0.000 n=10) StorageFastCommit/100000-12 278.8Mi ± 0% 271.1Mi ± 0% -2.75% (p=0.000 n=10) StorageFastCommit/1000000-12 2.923Gi ± 0% 2.821Gi ± 0% -3.49% (p=0.000 n=10) geomean 9.101Mi 8.820Mi -3.09% │ before.txt │ after.txt │ │ allocs/op │ allocs/op vs base │ StorageFastCommit/10-12 219.0 ± 0% 205.0 ± 0% -6.39% (p=0.000 n=10) StorageFastCommit/100-12 1.980k ± 0% 1.875k ± 0% -5.30% (p=0.000 n=10) StorageFastCommit/1000-12 19.23k ± 0% 18.23k ± 0% -5.22% (p=0.000 n=10) StorageFastCommit/10000-12 191.1k ± 0% 181.1k ± 0% -5.24% (p=0.000 n=10) StorageFastCommit/100000-12 1.918M ± 0% 1.816M ± 0% -5.30% (p=0.000 n=10) StorageFastCommit/1000000-12 19.15M ± 0% 18.15M ± 0% -5.22% (p=0.000 n=10) geomean 62.31k 58.91k -5.45% --- storage.go | 207 +++++++++++++++++++++++++++++++++++++++++- storage_bench_test.go | 134 +++++++++++++++++++++++++++ storage_test.go | 150 +++++++++++++++++++++++++++--- 3 files changed, 474 insertions(+), 17 deletions(-) create mode 100644 storage_bench_test.go diff --git a/storage.go b/storage.go index 394ecf2..19d0c9c 100644 --- a/storage.go +++ b/storage.go @@ -777,12 +777,17 @@ func (s *PersistentSlabStorage) sortedOwnedDeltaKeys() []SlabID { } func (s *PersistentSlabStorage) Commit() error { - var err error // this part ensures the keys are sorted so commit operation is deterministic keysWithOwners := s.sortedOwnedDeltaKeys() - for _, id := range keysWithOwners { + return s.commit(keysWithOwners) +} + +func (s *PersistentSlabStorage) commit(keys []SlabID) error { + var err error + + for _, id := range keys { slab := s.deltas[id] // deleted slabs @@ -964,6 +969,204 @@ func (s *PersistentSlabStorage) FastCommit(numWorkers int) error { return nil } +// NonderterministicFastCommit commits changes in nondeterministic order. +// This is used by migration program when ordering isn't required. +func (s *PersistentSlabStorage) NonderterministicFastCommit(numWorkers int) error { + // No changes + if len(s.deltas) == 0 { + return nil + } + + type slabToBeEncoded struct { + slabID SlabID + slab Slab + } + + type encodedSlab struct { + slabID SlabID + data []byte + err error + } + + // Define encoder (worker) to encode slabs in parallel + encoder := func( + wg *sync.WaitGroup, + done <-chan struct{}, + jobs <-chan slabToBeEncoded, + results chan<- encodedSlab, + ) { + defer wg.Done() + + for job := range jobs { + // Check if goroutine is signaled to stop before proceeding. + select { + case <-done: + return + default: + } + + id := job.slabID + slab := job.slab + + if slab == nil { + results <- encodedSlab{ + slabID: id, + data: nil, + err: nil, + } + continue + } + + // Serialize + data, err := EncodeSlab(slab, s.cborEncMode) + results <- encodedSlab{ + slabID: id, + data: data, + err: err, + } + } + } + + // Modified slabs need to be encoded (in parallel) and stored in underlying storage. + modifiedSlabCount := 0 + // Deleted slabs need to be removed from underlying storage. + deletedSlabCount := 0 + for k, v := range s.deltas { + // Ignore slabs not owned by accounts + if k.address == AddressUndefined { + continue + } + if v == nil { + deletedSlabCount++ + } else { + modifiedSlabCount++ + } + } + + if modifiedSlabCount == 0 && deletedSlabCount == 0 { + return nil + } + + if modifiedSlabCount < 2 { + // Avoid goroutine overhead + ids := make([]SlabID, 0, modifiedSlabCount+deletedSlabCount) + for k := range s.deltas { + // Ignore slabs not owned by accounts + if k.address == AddressUndefined { + continue + } + ids = append(ids, k) + } + + return s.commit(ids) + } + + if numWorkers > modifiedSlabCount { + numWorkers = modifiedSlabCount + } + + var wg sync.WaitGroup + + // Create done signal channel + done := make(chan struct{}) + + // Create job queue + jobs := make(chan slabToBeEncoded, modifiedSlabCount) + + // Create result queue + results := make(chan encodedSlab, modifiedSlabCount) + + defer func() { + // This ensures that all goroutines are stopped before output channel is closed. + + // Wait for all goroutines to finish + wg.Wait() + + // Close output channel + close(results) + }() + + // Launch workers to encode slabs + wg.Add(numWorkers) + for i := 0; i < numWorkers; i++ { + go encoder(&wg, done, jobs, results) + } + + // Send jobs + deletedSlabIDs := make([]SlabID, 0, deletedSlabCount) + for k, v := range s.deltas { + // ignore the ones that are not owned by accounts + if k.address == AddressUndefined { + continue + } + if v == nil { + deletedSlabIDs = append(deletedSlabIDs, k) + } else { + jobs <- slabToBeEncoded{k, v} + } + } + close(jobs) + + // Remove deleted slabs from underlying storage. + for _, id := range deletedSlabIDs { + + err := s.baseStorage.Remove(id) + if err != nil { + // Closing done channel signals goroutines to stop. + close(done) + // Wrap err as external error (if needed) because err is returned by BaseStorage interface. + return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to remove slab %s", id)) + } + + // Deleted slabs are removed from deltas and added to read cache so that: + // 1. next read is from in-memory read cache + // 2. deleted slabs are not re-committed in next commit + s.cache[id] = nil + delete(s.deltas, id) + } + + // Process encoded slabs + for i := 0; i < modifiedSlabCount; i++ { + result := <-results + + if result.err != nil { + // Closing done channel signals goroutines to stop. + close(done) + // result.err is already categorized by Encode(). + return result.err + } + + id := result.slabID + data := result.data + + if data == nil { + // Closing done channel signals goroutines to stop. + close(done) + // This is unexpected because deleted slabs are processed separately. + return NewEncodingErrorf("unexpectd encoded empty data") + } + + // Store + err := s.baseStorage.Store(id, data) + if err != nil { + // Closing done channel signals goroutines to stop. + close(done) + // Wrap err as external error (if needed) because err is returned by BaseStorage interface. + return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to store slab %s", id)) + } + + s.cache[id] = s.deltas[id] + // It's safe to remove slab from deltas because + // iteration is on non-temp slabs and temp slabs + // are still in deltas. + delete(s.deltas, id) + } + + // Do NOT reset deltas because slabs with empty address are not saved. + + return nil +} + func (s *PersistentSlabStorage) DropDeltas() { s.deltas = make(map[SlabID]Slab) } diff --git a/storage_bench_test.go b/storage_bench_test.go new file mode 100644 index 0000000..ae76f26 --- /dev/null +++ b/storage_bench_test.go @@ -0,0 +1,134 @@ +/* + * Atree - Scalable Arrays and Ordered Maps + * + * Copyright 2024 Dapper Labs, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package atree + +import ( + "encoding/binary" + "math/rand" + "runtime" + "strconv" + "testing" + + "github.com/fxamacker/cbor/v2" + "github.com/stretchr/testify/require" +) + +func benchmarkFastCommit(b *testing.B, seed int64, numberOfSlabs int) { + r := rand.New(rand.NewSource(seed)) + + encMode, err := cbor.EncOptions{}.EncMode() + require.NoError(b, err) + + decMode, err := cbor.DecOptions{}.DecMode() + require.NoError(b, err) + + slabs := make([]Slab, numberOfSlabs) + for i := 0; i < numberOfSlabs; i++ { + addr := generateRandomAddress(r) + + var index SlabIndex + binary.BigEndian.PutUint64(index[:], uint64(i)) + + id := SlabID{addr, index} + + slabs[i] = generateLargeSlab(id) + } + + b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + + baseStorage := NewInMemBaseStorage() + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil) + + for _, slab := range slabs { + err = storage.Store(slab.SlabID(), slab) + require.NoError(b, err) + } + + b.StartTimer() + + err := storage.FastCommit(runtime.NumCPU()) + require.NoError(b, err) + } + }) +} + +func benchmarkNondeterministicFastCommit(b *testing.B, seed int64, numberOfSlabs int) { + r := rand.New(rand.NewSource(seed)) + + encMode, err := cbor.EncOptions{}.EncMode() + require.NoError(b, err) + + decMode, err := cbor.DecOptions{}.DecMode() + require.NoError(b, err) + + slabs := make([]Slab, numberOfSlabs) + for i := 0; i < numberOfSlabs; i++ { + addr := generateRandomAddress(r) + + var index SlabIndex + binary.BigEndian.PutUint64(index[:], uint64(i)) + + id := SlabID{addr, index} + + slabs[i] = generateLargeSlab(id) + } + + b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + + baseStorage := NewInMemBaseStorage() + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil) + + for _, slab := range slabs { + err = storage.Store(slab.SlabID(), slab) + require.NoError(b, err) + } + + b.StartTimer() + + err := storage.NonderterministicFastCommit(runtime.NumCPU()) + require.NoError(b, err) + } + }) +} + +func BenchmarkStorageFastCommit(b *testing.B) { + fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc. + + benchmarkFastCommit(b, fixedSeed, 10) + benchmarkFastCommit(b, fixedSeed, 100) + benchmarkFastCommit(b, fixedSeed, 1_000) + benchmarkFastCommit(b, fixedSeed, 10_000) + benchmarkFastCommit(b, fixedSeed, 100_000) + benchmarkFastCommit(b, fixedSeed, 1_000_000) +} + +func BenchmarkStorageNondeterministicFastCommit(b *testing.B) { + fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc. + + benchmarkNondeterministicFastCommit(b, fixedSeed, 10) + benchmarkNondeterministicFastCommit(b, fixedSeed, 100) + benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000) + benchmarkNondeterministicFastCommit(b, fixedSeed, 10_000) + benchmarkNondeterministicFastCommit(b, fixedSeed, 100_000) + benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000_000) +} diff --git a/storage_test.go b/storage_test.go index 1b407fb..00756c2 100644 --- a/storage_test.go +++ b/storage_test.go @@ -412,8 +412,8 @@ func TestBasicSlabStorageStore(t *testing.T) { r := newRand(t) address := Address{1} slabs := map[SlabID]Slab{ - {address, SlabIndex{1}}: generateRandomSlab(address, r), - {address, SlabIndex{2}}: generateRandomSlab(address, r), + {address, SlabIndex{1}}: generateRandomSlab(SlabID{address, SlabIndex{1}}, r), + {address, SlabIndex{2}}: generateRandomSlab(SlabID{address, SlabIndex{2}}, r), } // Store values @@ -424,7 +424,7 @@ func TestBasicSlabStorageStore(t *testing.T) { // Overwrite stored values for id := range slabs { - slab := generateRandomSlab(id.address, r) + slab := generateRandomSlab(id, r) slabs[id] = slab err := storage.Store(id, slab) require.NoError(t, err) @@ -446,7 +446,7 @@ func TestBasicSlabStorageRetrieve(t *testing.T) { r := newRand(t) id := SlabID{Address{1}, SlabIndex{1}} - slab := generateRandomSlab(id.address, r) + slab := generateRandomSlab(id, r) // Retrieve value from empty storage retrievedSlab, found, err := storage.Retrieve(id) @@ -476,7 +476,7 @@ func TestBasicSlabStorageRemove(t *testing.T) { r := newRand(t) id := SlabID{Address{1}, SlabIndex{1}} - slab := generateRandomSlab(id.address, r) + slab := generateRandomSlab(id, r) // Remove value from empty storage err := storage.Remove(id) @@ -546,7 +546,7 @@ func TestBasicSlabStorageSlabIDs(t *testing.T) { // Store values for id := range wantIDs { - err := storage.Store(id, generateRandomSlab(id.address, r)) + err := storage.Store(id, generateRandomSlab(id, r)) require.NoError(t, err) } @@ -569,9 +569,9 @@ func TestBasicSlabStorageSlabIterat(t *testing.T) { id3 := SlabID{address: address, index: index.Next()} want := map[SlabID]Slab{ - id1: generateRandomSlab(id1.address, r), - id2: generateRandomSlab(id2.address, r), - id3: generateRandomSlab(id3.address, r), + id1: generateRandomSlab(id1, r), + id2: generateRandomSlab(id2, r), + id3: generateRandomSlab(id3, r), } storage := NewBasicSlabStorage(nil, nil, nil, nil) @@ -642,8 +642,8 @@ func TestPersistentStorage(t *testing.T) { permSlabID, err := NewSlabIDFromRawBytes([]byte{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}) require.NoError(t, err) - slab1 := generateRandomSlab(tempSlabID.address, r) - slab2 := generateRandomSlab(permSlabID.address, r) + slab1 := generateRandomSlab(tempSlabID, r) + slab2 := generateRandomSlab(permSlabID, r) // no temp ids should be in the base storage err = storage.Store(tempSlabID, slab1) @@ -724,8 +724,10 @@ func TestPersistentStorage(t *testing.T) { numberOfSlabsPerAccount := 10 r := newRand(t) + baseStorage := newAccessOrderTrackerBaseStorage() storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil) + baseStorage2 := newAccessOrderTrackerBaseStorage() storageWithFastCommit := NewPersistentSlabStorage(baseStorage2, encMode, decMode, nil, nil) @@ -735,16 +737,19 @@ func TestPersistentStorage(t *testing.T) { for i := 0; i < numberOfAccounts; i++ { for j := 0; j < numberOfSlabsPerAccount; j++ { addr := generateRandomAddress(r) - slab := generateRandomSlab(addr, r) - slabSize += uint64(slab.ByteSize()) slabID, err := storage.GenerateSlabID(addr) require.NoError(t, err) + + slab := generateRandomSlab(slabID, r) + slabSize += uint64(slab.ByteSize()) + err = storage.Store(slabID, slab) require.NoError(t, err) slabID2, err := storageWithFastCommit.GenerateSlabID(addr) require.NoError(t, err) + err = storageWithFastCommit.Store(slabID2, slab) require.NoError(t, err) @@ -1042,12 +1047,12 @@ func TestPersistentStorageGenerateSlabID(t *testing.T) { }) } -func generateRandomSlab(address Address, r *rand.Rand) Slab { +func generateRandomSlab(id SlabID, r *rand.Rand) Slab { storable := Uint64Value(r.Uint64()) return &ArrayDataSlab{ header: ArraySlabHeader{ - slabID: NewSlabID(address, SlabIndex{1}), + slabID: id, size: arrayRootDataSlabPrefixSize + storable.ByteSize(), count: 1, }, @@ -1055,6 +1060,28 @@ func generateRandomSlab(address Address, r *rand.Rand) Slab { } } +func generateLargeSlab(id SlabID) Slab { + + const elementCount = 100 + + storables := make([]Storable, elementCount) + size := uint32(0) + for i := 0; i < elementCount; i++ { + storable := Uint64Value(uint64(i)) + size += storable.ByteSize() + storables[i] = storable + } + + return &ArrayDataSlab{ + header: ArraySlabHeader{ + slabID: id, + size: arrayRootDataSlabPrefixSize + size, + count: elementCount, + }, + elements: storables, + } +} + func generateRandomAddress(r *rand.Rand) Address { address := Address{} r.Read(address[:]) @@ -4460,3 +4487,96 @@ func testGetAllChildReferences( require.Equal(t, len(expectedBrokenRefIDs), len(brokenRefs)) require.ElementsMatch(t, expectedBrokenRefIDs, brokenRefs) } + +func TestStorageNondeterministicFastCommit(t *testing.T) { + numberOfAccounts := 10 + + t.Run("small", func(t *testing.T) { + numberOfSlabsPerAccount := 10 + testStorageNondeterministicFastCommit(t, numberOfAccounts, numberOfSlabsPerAccount) + }) + + t.Run("large", func(t *testing.T) { + numberOfSlabsPerAccount := 1_000 + testStorageNondeterministicFastCommit(t, numberOfAccounts, numberOfSlabsPerAccount) + }) +} + +func testStorageNondeterministicFastCommit(t *testing.T, numberOfAccounts int, numberOfSlabsPerAccount int) { + encMode, err := cbor.EncOptions{}.EncMode() + require.NoError(t, err) + + decMode, err := cbor.DecOptions{}.DecMode() + require.NoError(t, err) + + r := newRand(t) + + baseStorage := NewInMemBaseStorage() + storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil) + + encodedSlabs := make(map[SlabID][]byte) + slabSize := uint64(0) + + // Storage slabs + for i := 0; i < numberOfAccounts; i++ { + + addr := generateRandomAddress(r) + + for j := 0; j < numberOfSlabsPerAccount; j++ { + + slabID, err := storage.GenerateSlabID(addr) + require.NoError(t, err) + + slab := generateRandomSlab(slabID, r) + slabSize += uint64(slab.ByteSize()) + + err = storage.Store(slabID, slab) + require.NoError(t, err) + + // capture data for accuracy testing + encodedSlabs[slabID], err = EncodeSlab(slab, encMode) + require.NoError(t, err) + } + } + + require.Equal(t, uint(len(encodedSlabs)), storage.DeltasWithoutTempAddresses()) + require.Equal(t, slabSize, storage.DeltasSizeWithoutTempAddresses()) + + // Commit deltas + err = storage.NonderterministicFastCommit(10) + require.NoError(t, err) + + require.Equal(t, uint(0), storage.DeltasWithoutTempAddresses()) + require.Equal(t, uint64(0), storage.DeltasSizeWithoutTempAddresses()) + require.Equal(t, len(encodedSlabs), storage.Count()) + + // Compare encoded data + for sid, value := range encodedSlabs { + storedValue, found, err := baseStorage.Retrieve(sid) + require.NoError(t, err) + require.True(t, found) + require.Equal(t, value, storedValue) + } + + // Remove all slabs from storage + for sid := range encodedSlabs { + err = storage.Remove(sid) + require.NoError(t, err) + require.Equal(t, uint64(0), storage.DeltasSizeWithoutTempAddresses()) + } + + // Commit deltas + err = storage.NonderterministicFastCommit(10) + require.NoError(t, err) + + require.Equal(t, 0, storage.Count()) + require.Equal(t, uint64(0), storage.DeltasSizeWithoutTempAddresses()) + + // Check remove functionality + for sid := range encodedSlabs { + storedValue, found, err := storage.Retrieve(sid) + require.NoError(t, err) + require.False(t, found) + require.Nil(t, storedValue) + } +} From e739c6e4bc4c3bccb279618261daf212fc2a61f8 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Thu, 9 May 2024 10:01:55 -0500 Subject: [PATCH 2/8] Add more tests --- storage_test.go | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/storage_test.go b/storage_test.go index 00756c2..12ebdf4 100644 --- a/storage_test.go +++ b/storage_test.go @@ -4489,14 +4489,32 @@ func testGetAllChildReferences( } func TestStorageNondeterministicFastCommit(t *testing.T) { - numberOfAccounts := 10 + t.Run("0 slabs", func(t *testing.T) { + numberOfAccounts := 0 + numberOfSlabsPerAccount := 0 + testStorageNondeterministicFastCommit(t, numberOfAccounts, numberOfSlabsPerAccount) + }) + + t.Run("1 slabs", func(t *testing.T) { + numberOfAccounts := 1 + numberOfSlabsPerAccount := 1 + testStorageNondeterministicFastCommit(t, numberOfAccounts, numberOfSlabsPerAccount) + }) + + t.Run("10 slabs", func(t *testing.T) { + numberOfAccounts := 1 + numberOfSlabsPerAccount := 10 + testStorageNondeterministicFastCommit(t, numberOfAccounts, numberOfSlabsPerAccount) + }) - t.Run("small", func(t *testing.T) { + t.Run("100 slabs", func(t *testing.T) { + numberOfAccounts := 10 numberOfSlabsPerAccount := 10 testStorageNondeterministicFastCommit(t, numberOfAccounts, numberOfSlabsPerAccount) }) - t.Run("large", func(t *testing.T) { + t.Run("10_000 slabs", func(t *testing.T) { + numberOfAccounts := 10 numberOfSlabsPerAccount := 1_000 testStorageNondeterministicFastCommit(t, numberOfAccounts, numberOfSlabsPerAccount) }) From a2a4f5c08636bd27e3c436814dff3c7db1da2db5 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Mon, 13 May 2024 14:18:29 -0500 Subject: [PATCH 3/8] Refactor to iterate deltas once in NonderterministicFastCommit This change reduces number of lines in the function but is not expected to yield significant speed improvements. --- storage.go | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/storage.go b/storage.go index 19d0c9c..a82fb03 100644 --- a/storage.go +++ b/storage.go @@ -1027,6 +1027,12 @@ func (s *PersistentSlabStorage) NonderterministicFastCommit(numWorkers int) erro } } + // slabIDsWithOwner contains slab IDs with owner: + // - modified slab IDs are stored from front to back + // - deleted slab IDs are stored from back to front + // This is to avoid extra allocations. + slabIDsWithOwner := make([]SlabID, len(s.deltas)) + // Modified slabs need to be encoded (in parallel) and stored in underlying storage. modifiedSlabCount := 0 // Deleted slabs need to be removed from underlying storage. @@ -1037,27 +1043,26 @@ func (s *PersistentSlabStorage) NonderterministicFastCommit(numWorkers int) erro continue } if v == nil { + index := len(slabIDsWithOwner) - 1 - deletedSlabCount + slabIDsWithOwner[index] = k deletedSlabCount++ } else { + slabIDsWithOwner[modifiedSlabCount] = k modifiedSlabCount++ } } + modifiedSlabIDs := slabIDsWithOwner[:modifiedSlabCount] + + deletedSlabIDs := slabIDsWithOwner[len(slabIDsWithOwner)-deletedSlabCount:] + if modifiedSlabCount == 0 && deletedSlabCount == 0 { return nil } if modifiedSlabCount < 2 { // Avoid goroutine overhead - ids := make([]SlabID, 0, modifiedSlabCount+deletedSlabCount) - for k := range s.deltas { - // Ignore slabs not owned by accounts - if k.address == AddressUndefined { - continue - } - ids = append(ids, k) - } - + ids := append(modifiedSlabIDs, deletedSlabIDs...) return s.commit(ids) } @@ -1093,17 +1098,8 @@ func (s *PersistentSlabStorage) NonderterministicFastCommit(numWorkers int) erro } // Send jobs - deletedSlabIDs := make([]SlabID, 0, deletedSlabCount) - for k, v := range s.deltas { - // ignore the ones that are not owned by accounts - if k.address == AddressUndefined { - continue - } - if v == nil { - deletedSlabIDs = append(deletedSlabIDs, k) - } else { - jobs <- slabToBeEncoded{k, v} - } + for _, id := range modifiedSlabIDs { + jobs <- slabToBeEncoded{id, s.deltas[id]} } close(jobs) From aa1c121a7695e6efedb3426f1c822ff906f806b6 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Mon, 13 May 2024 15:14:19 -0500 Subject: [PATCH 4/8] Lint --- storage.go | 4 ++-- storage_bench_test.go | 2 +- storage_test.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/storage.go b/storage.go index a82fb03..b4b76d7 100644 --- a/storage.go +++ b/storage.go @@ -969,9 +969,9 @@ func (s *PersistentSlabStorage) FastCommit(numWorkers int) error { return nil } -// NonderterministicFastCommit commits changes in nondeterministic order. +// NondeterministicFastCommit commits changes in nondeterministic order. // This is used by migration program when ordering isn't required. -func (s *PersistentSlabStorage) NonderterministicFastCommit(numWorkers int) error { +func (s *PersistentSlabStorage) NondeterministicFastCommit(numWorkers int) error { // No changes if len(s.deltas) == 0 { return nil diff --git a/storage_bench_test.go b/storage_bench_test.go index ae76f26..f97a60e 100644 --- a/storage_bench_test.go +++ b/storage_bench_test.go @@ -105,7 +105,7 @@ func benchmarkNondeterministicFastCommit(b *testing.B, seed int64, numberOfSlabs b.StartTimer() - err := storage.NonderterministicFastCommit(runtime.NumCPU()) + err := storage.NondeterministicFastCommit(runtime.NumCPU()) require.NoError(b, err) } }) diff --git a/storage_test.go b/storage_test.go index 12ebdf4..fcadcfb 100644 --- a/storage_test.go +++ b/storage_test.go @@ -4561,7 +4561,7 @@ func testStorageNondeterministicFastCommit(t *testing.T, numberOfAccounts int, n require.Equal(t, slabSize, storage.DeltasSizeWithoutTempAddresses()) // Commit deltas - err = storage.NonderterministicFastCommit(10) + err = storage.NondeterministicFastCommit(10) require.NoError(t, err) require.Equal(t, uint(0), storage.DeltasWithoutTempAddresses()) @@ -4584,7 +4584,7 @@ func testStorageNondeterministicFastCommit(t *testing.T, numberOfAccounts int, n } // Commit deltas - err = storage.NonderterministicFastCommit(10) + err = storage.NondeterministicFastCommit(10) require.NoError(t, err) require.Equal(t, 0, storage.Count()) From e83159f855e51fdd7c2442aaa887f11a7e138119 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Mon, 13 May 2024 15:16:58 -0500 Subject: [PATCH 5/8] Bump golangci-lint from 1.52.2 to 1.53.3 --- .github/workflows/safer-golangci-lint.yml | 40 +++-------------------- 1 file changed, 4 insertions(+), 36 deletions(-) diff --git a/.github/workflows/safer-golangci-lint.yml b/.github/workflows/safer-golangci-lint.yml index 6722fa8..ec1ecec 100644 --- a/.github/workflows/safer-golangci-lint.yml +++ b/.github/workflows/safer-golangci-lint.yml @@ -4,38 +4,6 @@ # Safer GitHub Actions Workflow for golangci-lint. # https://github.com/x448/safer-golangci-lint # -# safer-golangci-lint.yml -# -# This workflow downloads, verifies, and runs golangci-lint in a -# deterministic, reviewable, and safe manner. -# -# To use: -# Step 1. Copy this file into [your_github_repo]/.github/workflows/ -# Step 2. There's no step 2 if you like the default settings. -# -# See golangci-lint docs for more info at -# https://github.com/golangci/golangci-lint -# -# 100% of the script for downloading, installing, and running golangci-lint -# is embedded in this file. The embedded SHA-256 digest is used to verify the -# downloaded golangci-lint tarball (golangci-lint-1.xx.x-linux-amd64.tar.gz). -# -# The embedded SHA-256 digest matches golangci-lint-1.xx.x-checksums.txt at -# https://github.com/golangci/golangci-lint/releases -# -# To use a newer version of golangci-lint, change these values: -# 1. GOLINTERS_VERSION -# 2. GOLINTERS_TGZ_DGST -# -# Release v1.52.2 (May 14, 2023) -# - Bump Go to 1.20 -# - Bump actions/setup-go to v4 -# - Bump golangci-lint to 1.52.2 -# - Hash of golangci-lint-1.52.2-linux-amd64.tar.gz -# - SHA-256: c9cf72d12058a131746edd409ed94ccd578fbd178899d1ed41ceae3ce5f54501 -# This SHA-256 digest matches golangci-lint-1.52.2-checksums.txt at -# https://github.com/golangci/golangci-lint/releases -# name: linters # Remove default permissions and grant only what is required in each job. @@ -49,9 +17,9 @@ on: env: GO_VERSION: '1.20' - GOLINTERS_VERSION: 1.52.2 + GOLINTERS_VERSION: 1.53.3 GOLINTERS_ARCH: linux-amd64 - GOLINTERS_TGZ_DGST: c9cf72d12058a131746edd409ed94ccd578fbd178899d1ed41ceae3ce5f54501 + GOLINTERS_TGZ_DGST: 4f62007ca96372ccba54760e2ed39c2446b40ec24d9a90c21aad9f2fdf6cf0da GOLINTERS_TIMEOUT: 15m OPENSSL_DGST_CMD: openssl dgst -sha256 -r CURL_CMD: curl --proto =https --tlsv1.2 --location --silent --show-error --fail @@ -64,12 +32,12 @@ jobs: contents: read steps: - name: Checkout source - uses: actions/checkout@v3 + uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 with: fetch-depth: 1 - name: Setup Go - uses: actions/setup-go@v4 + uses: actions/setup-go@cdcb36043654635271a94b9a6d1392de5bb323a7 # v5.0.1 with: go-version: ${{ env.GO_VERSION }} check-latest: true From 5808810b6a39852840d1aaa5b55c697c4281e5f4 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Mon, 13 May 2024 15:33:12 -0500 Subject: [PATCH 6/8] Lint --- storage.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/storage.go b/storage.go index b4b76d7..909beb9 100644 --- a/storage.go +++ b/storage.go @@ -1061,8 +1061,10 @@ func (s *PersistentSlabStorage) NondeterministicFastCommit(numWorkers int) error } if modifiedSlabCount < 2 { - // Avoid goroutine overhead - ids := append(modifiedSlabIDs, deletedSlabIDs...) + // Avoid goroutine overhead. + // Return after committing modified and deleted slabs. + ids := modifiedSlabIDs + ids = append(ids, deletedSlabIDs...) return s.commit(ids) } From 81b6dcdd24950438cee0ddae0bdd768877edc13d Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Tue, 14 May 2024 07:12:39 -0500 Subject: [PATCH 7/8] Revert "Bump golangci-lint from 1.52.2 to 1.53.3" This reverts commit e83159f855e51fdd7c2442aaa887f11a7e138119. --- .github/workflows/safer-golangci-lint.yml | 40 ++++++++++++++++++++--- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/.github/workflows/safer-golangci-lint.yml b/.github/workflows/safer-golangci-lint.yml index ec1ecec..6722fa8 100644 --- a/.github/workflows/safer-golangci-lint.yml +++ b/.github/workflows/safer-golangci-lint.yml @@ -4,6 +4,38 @@ # Safer GitHub Actions Workflow for golangci-lint. # https://github.com/x448/safer-golangci-lint # +# safer-golangci-lint.yml +# +# This workflow downloads, verifies, and runs golangci-lint in a +# deterministic, reviewable, and safe manner. +# +# To use: +# Step 1. Copy this file into [your_github_repo]/.github/workflows/ +# Step 2. There's no step 2 if you like the default settings. +# +# See golangci-lint docs for more info at +# https://github.com/golangci/golangci-lint +# +# 100% of the script for downloading, installing, and running golangci-lint +# is embedded in this file. The embedded SHA-256 digest is used to verify the +# downloaded golangci-lint tarball (golangci-lint-1.xx.x-linux-amd64.tar.gz). +# +# The embedded SHA-256 digest matches golangci-lint-1.xx.x-checksums.txt at +# https://github.com/golangci/golangci-lint/releases +# +# To use a newer version of golangci-lint, change these values: +# 1. GOLINTERS_VERSION +# 2. GOLINTERS_TGZ_DGST +# +# Release v1.52.2 (May 14, 2023) +# - Bump Go to 1.20 +# - Bump actions/setup-go to v4 +# - Bump golangci-lint to 1.52.2 +# - Hash of golangci-lint-1.52.2-linux-amd64.tar.gz +# - SHA-256: c9cf72d12058a131746edd409ed94ccd578fbd178899d1ed41ceae3ce5f54501 +# This SHA-256 digest matches golangci-lint-1.52.2-checksums.txt at +# https://github.com/golangci/golangci-lint/releases +# name: linters # Remove default permissions and grant only what is required in each job. @@ -17,9 +49,9 @@ on: env: GO_VERSION: '1.20' - GOLINTERS_VERSION: 1.53.3 + GOLINTERS_VERSION: 1.52.2 GOLINTERS_ARCH: linux-amd64 - GOLINTERS_TGZ_DGST: 4f62007ca96372ccba54760e2ed39c2446b40ec24d9a90c21aad9f2fdf6cf0da + GOLINTERS_TGZ_DGST: c9cf72d12058a131746edd409ed94ccd578fbd178899d1ed41ceae3ce5f54501 GOLINTERS_TIMEOUT: 15m OPENSSL_DGST_CMD: openssl dgst -sha256 -r CURL_CMD: curl --proto =https --tlsv1.2 --location --silent --show-error --fail @@ -32,12 +64,12 @@ jobs: contents: read steps: - name: Checkout source - uses: actions/checkout@0ad4b8fadaa221de15dcec353f45205ec38ea70b # v4.1.4 + uses: actions/checkout@v3 with: fetch-depth: 1 - name: Setup Go - uses: actions/setup-go@cdcb36043654635271a94b9a6d1392de5bb323a7 # v5.0.1 + uses: actions/setup-go@v4 with: go-version: ${{ env.GO_VERSION }} check-latest: true From 88fa22fd1a11905d697d8acd6733a9cdc71b5024 Mon Sep 17 00:00:00 2001 From: Faye Amacker <33205765+fxamacker@users.noreply.github.com> Date: Tue, 14 May 2024 07:20:30 -0500 Subject: [PATCH 8/8] Improve code readability by adding comments, etc. --- storage.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/storage.go b/storage.go index 909beb9..8c40161 100644 --- a/storage.go +++ b/storage.go @@ -1037,17 +1037,19 @@ func (s *PersistentSlabStorage) NondeterministicFastCommit(numWorkers int) error modifiedSlabCount := 0 // Deleted slabs need to be removed from underlying storage. deletedSlabCount := 0 - for k, v := range s.deltas { + for id, slab := range s.deltas { // Ignore slabs not owned by accounts - if k.address == AddressUndefined { + if id.address == AddressUndefined { continue } - if v == nil { + if slab == nil { + // Set deleted slab ID from the end of slabIDsWithOwner. index := len(slabIDsWithOwner) - 1 - deletedSlabCount - slabIDsWithOwner[index] = k + slabIDsWithOwner[index] = id deletedSlabCount++ } else { - slabIDsWithOwner[modifiedSlabCount] = k + // Set modified slab ID from the start of slabIDsWithOwner. + slabIDsWithOwner[modifiedSlabCount] = id modifiedSlabCount++ } }