Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NonderterministicFastCommit to speed up migrations when ordering isn't required #403

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 205 additions & 2 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,12 +777,17 @@ func (s *PersistentSlabStorage) sortedOwnedDeltaKeys() []SlabID {
}

func (s *PersistentSlabStorage) Commit() error {
var err error

// this part ensures the keys are sorted so commit operation is deterministic
keysWithOwners := s.sortedOwnedDeltaKeys()

for _, id := range keysWithOwners {
return s.commit(keysWithOwners)
}

func (s *PersistentSlabStorage) commit(keys []SlabID) error {
var err error

for _, id := range keys {
slab := s.deltas[id]

// deleted slabs
Expand Down Expand Up @@ -964,6 +969,204 @@ func (s *PersistentSlabStorage) FastCommit(numWorkers int) error {
return nil
}

// NondeterministicFastCommit commits changes in nondeterministic order.
// This is used by migration program when ordering isn't required.
func (s *PersistentSlabStorage) NondeterministicFastCommit(numWorkers int) error {
// No changes
if len(s.deltas) == 0 {
return nil
}

type slabToBeEncoded struct {
slabID SlabID
slab Slab
}

type encodedSlab struct {
slabID SlabID
data []byte
err error
}

// Define encoder (worker) to encode slabs in parallel
encoder := func(
wg *sync.WaitGroup,
done <-chan struct{},
jobs <-chan slabToBeEncoded,
results chan<- encodedSlab,
) {
defer wg.Done()

for job := range jobs {
// Check if goroutine is signaled to stop before proceeding.
select {
case <-done:
return
default:
}

id := job.slabID
slab := job.slab

if slab == nil {
results <- encodedSlab{
slabID: id,
data: nil,
err: nil,
}
continue
}

// Serialize
data, err := EncodeSlab(slab, s.cborEncMode)
results <- encodedSlab{
slabID: id,
data: data,
err: err,
}
}
}

// slabIDsWithOwner contains slab IDs with owner:
// - modified slab IDs are stored from front to back
// - deleted slab IDs are stored from back to front
// This is to avoid extra allocations.
slabIDsWithOwner := make([]SlabID, len(s.deltas))

// Modified slabs need to be encoded (in parallel) and stored in underlying storage.
modifiedSlabCount := 0
// Deleted slabs need to be removed from underlying storage.
deletedSlabCount := 0
for id, slab := range s.deltas {
// Ignore slabs not owned by accounts
if id.address == AddressUndefined {
continue
}
if slab == nil {
// Set deleted slab ID from the end of slabIDsWithOwner.
index := len(slabIDsWithOwner) - 1 - deletedSlabCount
fxamacker marked this conversation as resolved.
Show resolved Hide resolved
slabIDsWithOwner[index] = id
deletedSlabCount++
} else {
// Set modified slab ID from the start of slabIDsWithOwner.
slabIDsWithOwner[modifiedSlabCount] = id
modifiedSlabCount++
}
}

modifiedSlabIDs := slabIDsWithOwner[:modifiedSlabCount]

deletedSlabIDs := slabIDsWithOwner[len(slabIDsWithOwner)-deletedSlabCount:]

if modifiedSlabCount == 0 && deletedSlabCount == 0 {
return nil
}

if modifiedSlabCount < 2 {
// Avoid goroutine overhead.
// Return after committing modified and deleted slabs.
ids := modifiedSlabIDs
ids = append(ids, deletedSlabIDs...)
return s.commit(ids)
}

if numWorkers > modifiedSlabCount {
numWorkers = modifiedSlabCount
}

var wg sync.WaitGroup

// Create done signal channel
done := make(chan struct{})

// Create job queue
jobs := make(chan slabToBeEncoded, modifiedSlabCount)

// Create result queue
results := make(chan encodedSlab, modifiedSlabCount)

defer func() {
// This ensures that all goroutines are stopped before output channel is closed.

// Wait for all goroutines to finish
wg.Wait()

// Close output channel
close(results)
}()

// Launch workers to encode slabs
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go encoder(&wg, done, jobs, results)
}

// Send jobs
for _, id := range modifiedSlabIDs {
jobs <- slabToBeEncoded{id, s.deltas[id]}
}
close(jobs)

// Remove deleted slabs from underlying storage.
for _, id := range deletedSlabIDs {

err := s.baseStorage.Remove(id)
if err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// Wrap err as external error (if needed) because err is returned by BaseStorage interface.
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to remove slab %s", id))
}

// Deleted slabs are removed from deltas and added to read cache so that:
// 1. next read is from in-memory read cache
// 2. deleted slabs are not re-committed in next commit
s.cache[id] = nil
delete(s.deltas, id)
}

// Process encoded slabs
for i := 0; i < modifiedSlabCount; i++ {
result := <-results

if result.err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// result.err is already categorized by Encode().
return result.err
}

id := result.slabID
data := result.data

if data == nil {
// Closing done channel signals goroutines to stop.
close(done)
// This is unexpected because deleted slabs are processed separately.
return NewEncodingErrorf("unexpectd encoded empty data")
}

// Store
err := s.baseStorage.Store(id, data)
if err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// Wrap err as external error (if needed) because err is returned by BaseStorage interface.
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to store slab %s", id))
}

s.cache[id] = s.deltas[id]
// It's safe to remove slab from deltas because
// iteration is on non-temp slabs and temp slabs
// are still in deltas.
delete(s.deltas, id)
}

// Do NOT reset deltas because slabs with empty address are not saved.

return nil
}

func (s *PersistentSlabStorage) DropDeltas() {
s.deltas = make(map[SlabID]Slab)
}
Expand Down
134 changes: 134 additions & 0 deletions storage_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Atree - Scalable Arrays and Ordered Maps
*
* Copyright 2024 Dapper Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package atree

import (
"encoding/binary"
"math/rand"
"runtime"
"strconv"
"testing"

"github.com/fxamacker/cbor/v2"
"github.com/stretchr/testify/require"
)

func benchmarkFastCommit(b *testing.B, seed int64, numberOfSlabs int) {
r := rand.New(rand.NewSource(seed))

encMode, err := cbor.EncOptions{}.EncMode()
require.NoError(b, err)

decMode, err := cbor.DecOptions{}.DecMode()
require.NoError(b, err)

slabs := make([]Slab, numberOfSlabs)
for i := 0; i < numberOfSlabs; i++ {
addr := generateRandomAddress(r)

var index SlabIndex
binary.BigEndian.PutUint64(index[:], uint64(i))

id := SlabID{addr, index}

slabs[i] = generateLargeSlab(id)
}

b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

baseStorage := NewInMemBaseStorage()
storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil)

for _, slab := range slabs {
err = storage.Store(slab.SlabID(), slab)
require.NoError(b, err)
}

b.StartTimer()

err := storage.FastCommit(runtime.NumCPU())
require.NoError(b, err)
}
})
}

func benchmarkNondeterministicFastCommit(b *testing.B, seed int64, numberOfSlabs int) {
r := rand.New(rand.NewSource(seed))

encMode, err := cbor.EncOptions{}.EncMode()
require.NoError(b, err)

decMode, err := cbor.DecOptions{}.DecMode()
require.NoError(b, err)

slabs := make([]Slab, numberOfSlabs)
for i := 0; i < numberOfSlabs; i++ {
addr := generateRandomAddress(r)

var index SlabIndex
binary.BigEndian.PutUint64(index[:], uint64(i))

id := SlabID{addr, index}

slabs[i] = generateLargeSlab(id)
}

b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

baseStorage := NewInMemBaseStorage()
storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil)

for _, slab := range slabs {
err = storage.Store(slab.SlabID(), slab)
require.NoError(b, err)
}

b.StartTimer()

err := storage.NondeterministicFastCommit(runtime.NumCPU())
require.NoError(b, err)
}
})
}

func BenchmarkStorageFastCommit(b *testing.B) {
fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc.

benchmarkFastCommit(b, fixedSeed, 10)
benchmarkFastCommit(b, fixedSeed, 100)
benchmarkFastCommit(b, fixedSeed, 1_000)
benchmarkFastCommit(b, fixedSeed, 10_000)
benchmarkFastCommit(b, fixedSeed, 100_000)
benchmarkFastCommit(b, fixedSeed, 1_000_000)
}

func BenchmarkStorageNondeterministicFastCommit(b *testing.B) {
fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc.

benchmarkNondeterministicFastCommit(b, fixedSeed, 10)
benchmarkNondeterministicFastCommit(b, fixedSeed, 100)
benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000)
benchmarkNondeterministicFastCommit(b, fixedSeed, 10_000)
benchmarkNondeterministicFastCommit(b, fixedSeed, 100_000)
benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000_000)
}
Loading
Loading