Skip to content

Commit

Permalink
Merge pull request #403 from onflow/fxamacker/add-nondeterministic-fa…
Browse files Browse the repository at this point in the history
…st-commit

Add NonderterministicFastCommit to speed up migrations when ordering isn't required
  • Loading branch information
fxamacker authored May 14, 2024
2 parents e4400b2 + 88fa22f commit 12929f5
Show file tree
Hide file tree
Showing 3 changed files with 492 additions and 17 deletions.
207 changes: 205 additions & 2 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,12 +777,17 @@ func (s *PersistentSlabStorage) sortedOwnedDeltaKeys() []SlabID {
}

func (s *PersistentSlabStorage) Commit() error {
var err error

// this part ensures the keys are sorted so commit operation is deterministic
keysWithOwners := s.sortedOwnedDeltaKeys()

for _, id := range keysWithOwners {
return s.commit(keysWithOwners)
}

func (s *PersistentSlabStorage) commit(keys []SlabID) error {
var err error

for _, id := range keys {
slab := s.deltas[id]

// deleted slabs
Expand Down Expand Up @@ -964,6 +969,204 @@ func (s *PersistentSlabStorage) FastCommit(numWorkers int) error {
return nil
}

// NondeterministicFastCommit commits changes in nondeterministic order.
// This is used by migration program when ordering isn't required.
func (s *PersistentSlabStorage) NondeterministicFastCommit(numWorkers int) error {
// No changes
if len(s.deltas) == 0 {
return nil
}

type slabToBeEncoded struct {
slabID SlabID
slab Slab
}

type encodedSlab struct {
slabID SlabID
data []byte
err error
}

// Define encoder (worker) to encode slabs in parallel
encoder := func(
wg *sync.WaitGroup,
done <-chan struct{},
jobs <-chan slabToBeEncoded,
results chan<- encodedSlab,
) {
defer wg.Done()

for job := range jobs {
// Check if goroutine is signaled to stop before proceeding.
select {
case <-done:
return
default:
}

id := job.slabID
slab := job.slab

if slab == nil {
results <- encodedSlab{
slabID: id,
data: nil,
err: nil,
}
continue
}

// Serialize
data, err := EncodeSlab(slab, s.cborEncMode)
results <- encodedSlab{
slabID: id,
data: data,
err: err,
}
}
}

// slabIDsWithOwner contains slab IDs with owner:
// - modified slab IDs are stored from front to back
// - deleted slab IDs are stored from back to front
// This is to avoid extra allocations.
slabIDsWithOwner := make([]SlabID, len(s.deltas))

// Modified slabs need to be encoded (in parallel) and stored in underlying storage.
modifiedSlabCount := 0
// Deleted slabs need to be removed from underlying storage.
deletedSlabCount := 0
for id, slab := range s.deltas {
// Ignore slabs not owned by accounts
if id.address == AddressUndefined {
continue
}
if slab == nil {
// Set deleted slab ID from the end of slabIDsWithOwner.
index := len(slabIDsWithOwner) - 1 - deletedSlabCount
slabIDsWithOwner[index] = id
deletedSlabCount++
} else {
// Set modified slab ID from the start of slabIDsWithOwner.
slabIDsWithOwner[modifiedSlabCount] = id
modifiedSlabCount++
}
}

modifiedSlabIDs := slabIDsWithOwner[:modifiedSlabCount]

deletedSlabIDs := slabIDsWithOwner[len(slabIDsWithOwner)-deletedSlabCount:]

if modifiedSlabCount == 0 && deletedSlabCount == 0 {
return nil
}

if modifiedSlabCount < 2 {
// Avoid goroutine overhead.
// Return after committing modified and deleted slabs.
ids := modifiedSlabIDs
ids = append(ids, deletedSlabIDs...)
return s.commit(ids)
}

if numWorkers > modifiedSlabCount {
numWorkers = modifiedSlabCount
}

var wg sync.WaitGroup

// Create done signal channel
done := make(chan struct{})

// Create job queue
jobs := make(chan slabToBeEncoded, modifiedSlabCount)

// Create result queue
results := make(chan encodedSlab, modifiedSlabCount)

defer func() {
// This ensures that all goroutines are stopped before output channel is closed.

// Wait for all goroutines to finish
wg.Wait()

// Close output channel
close(results)
}()

// Launch workers to encode slabs
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go encoder(&wg, done, jobs, results)
}

// Send jobs
for _, id := range modifiedSlabIDs {
jobs <- slabToBeEncoded{id, s.deltas[id]}
}
close(jobs)

// Remove deleted slabs from underlying storage.
for _, id := range deletedSlabIDs {

err := s.baseStorage.Remove(id)
if err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// Wrap err as external error (if needed) because err is returned by BaseStorage interface.
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to remove slab %s", id))
}

// Deleted slabs are removed from deltas and added to read cache so that:
// 1. next read is from in-memory read cache
// 2. deleted slabs are not re-committed in next commit
s.cache[id] = nil
delete(s.deltas, id)
}

// Process encoded slabs
for i := 0; i < modifiedSlabCount; i++ {
result := <-results

if result.err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// result.err is already categorized by Encode().
return result.err
}

id := result.slabID
data := result.data

if data == nil {
// Closing done channel signals goroutines to stop.
close(done)
// This is unexpected because deleted slabs are processed separately.
return NewEncodingErrorf("unexpectd encoded empty data")
}

// Store
err := s.baseStorage.Store(id, data)
if err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// Wrap err as external error (if needed) because err is returned by BaseStorage interface.
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to store slab %s", id))
}

s.cache[id] = s.deltas[id]
// It's safe to remove slab from deltas because
// iteration is on non-temp slabs and temp slabs
// are still in deltas.
delete(s.deltas, id)
}

// Do NOT reset deltas because slabs with empty address are not saved.

return nil
}

func (s *PersistentSlabStorage) DropDeltas() {
s.deltas = make(map[SlabID]Slab)
}
Expand Down
134 changes: 134 additions & 0 deletions storage_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Atree - Scalable Arrays and Ordered Maps
*
* Copyright 2024 Dapper Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package atree

import (
"encoding/binary"
"math/rand"
"runtime"
"strconv"
"testing"

"github.com/fxamacker/cbor/v2"
"github.com/stretchr/testify/require"
)

func benchmarkFastCommit(b *testing.B, seed int64, numberOfSlabs int) {
r := rand.New(rand.NewSource(seed))

encMode, err := cbor.EncOptions{}.EncMode()
require.NoError(b, err)

decMode, err := cbor.DecOptions{}.DecMode()
require.NoError(b, err)

slabs := make([]Slab, numberOfSlabs)
for i := 0; i < numberOfSlabs; i++ {
addr := generateRandomAddress(r)

var index SlabIndex
binary.BigEndian.PutUint64(index[:], uint64(i))

id := SlabID{addr, index}

slabs[i] = generateLargeSlab(id)
}

b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

baseStorage := NewInMemBaseStorage()
storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil)

for _, slab := range slabs {
err = storage.Store(slab.SlabID(), slab)
require.NoError(b, err)
}

b.StartTimer()

err := storage.FastCommit(runtime.NumCPU())
require.NoError(b, err)
}
})
}

func benchmarkNondeterministicFastCommit(b *testing.B, seed int64, numberOfSlabs int) {
r := rand.New(rand.NewSource(seed))

encMode, err := cbor.EncOptions{}.EncMode()
require.NoError(b, err)

decMode, err := cbor.DecOptions{}.DecMode()
require.NoError(b, err)

slabs := make([]Slab, numberOfSlabs)
for i := 0; i < numberOfSlabs; i++ {
addr := generateRandomAddress(r)

var index SlabIndex
binary.BigEndian.PutUint64(index[:], uint64(i))

id := SlabID{addr, index}

slabs[i] = generateLargeSlab(id)
}

b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

baseStorage := NewInMemBaseStorage()
storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil)

for _, slab := range slabs {
err = storage.Store(slab.SlabID(), slab)
require.NoError(b, err)
}

b.StartTimer()

err := storage.NondeterministicFastCommit(runtime.NumCPU())
require.NoError(b, err)
}
})
}

func BenchmarkStorageFastCommit(b *testing.B) {
fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc.

benchmarkFastCommit(b, fixedSeed, 10)
benchmarkFastCommit(b, fixedSeed, 100)
benchmarkFastCommit(b, fixedSeed, 1_000)
benchmarkFastCommit(b, fixedSeed, 10_000)
benchmarkFastCommit(b, fixedSeed, 100_000)
benchmarkFastCommit(b, fixedSeed, 1_000_000)
}

func BenchmarkStorageNondeterministicFastCommit(b *testing.B) {
fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc.

benchmarkNondeterministicFastCommit(b, fixedSeed, 10)
benchmarkNondeterministicFastCommit(b, fixedSeed, 100)
benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000)
benchmarkNondeterministicFastCommit(b, fixedSeed, 10_000)
benchmarkNondeterministicFastCommit(b, fixedSeed, 100_000)
benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000_000)
}
Loading

0 comments on commit 12929f5

Please sign in to comment.