Skip to content

Commit

Permalink
Add NonderterministicFastCommit
Browse files Browse the repository at this point in the history
NonderterministicFastCommit commits changes in nondeterministic order.
It can be used by migration program when ordering isn't required.

                             │  before.txt  │              after.txt               │
                             │    sec/op    │    sec/op     vs base                │
StorageFastCommit/10-12         89.72µ ± 4%   57.50µ ±  3%  -35.92% (p=0.000 n=10)
StorageFastCommit/100-12        118.9µ ± 1%   116.0µ ±  4%        ~ (p=0.436 n=10)
StorageFastCommit/1000-12       4.086m ± 5%   2.397m ± 25%  -41.35% (p=0.000 n=10)
StorageFastCommit/10000-12     12.629m ± 4%   9.857m ±  3%  -21.95% (p=0.000 n=10)
StorageFastCommit/100000-12    102.73m ± 0%   72.26m ±  1%  -29.66% (p=0.000 n=10)
StorageFastCommit/1000000-12     1.544 ± 2%    1.141 ±  2%  -26.09% (p=0.000 n=10)
geomean                         6.661m        4.848m        -27.21%

                             │  before.txt  │              after.txt              │
                             │     B/op     │     B/op      vs base               │
StorageFastCommit/10-12        28.92Ki ± 0%   28.05Ki ± 0%  -3.00% (p=0.000 n=10)
StorageFastCommit/100-12       286.4Ki ± 0%   278.6Ki ± 0%  -2.71% (p=0.000 n=10)
StorageFastCommit/1000-12      3.009Mi ± 0%   2.901Mi ± 0%  -3.58% (p=0.000 n=10)
StorageFastCommit/10000-12     28.65Mi ± 0%   27.79Mi ± 0%  -2.98% (p=0.000 n=10)
StorageFastCommit/100000-12    278.8Mi ± 0%   271.1Mi ± 0%  -2.75% (p=0.000 n=10)
StorageFastCommit/1000000-12   2.923Gi ± 0%   2.821Gi ± 0%  -3.49% (p=0.000 n=10)
geomean                        9.101Mi        8.820Mi       -3.09%

                             │ before.txt  │             after.txt              │
                             │  allocs/op  │  allocs/op   vs base               │
StorageFastCommit/10-12         219.0 ± 0%    205.0 ± 0%  -6.39% (p=0.000 n=10)
StorageFastCommit/100-12       1.980k ± 0%   1.875k ± 0%  -5.30% (p=0.000 n=10)
StorageFastCommit/1000-12      19.23k ± 0%   18.23k ± 0%  -5.22% (p=0.000 n=10)
StorageFastCommit/10000-12     191.1k ± 0%   181.1k ± 0%  -5.24% (p=0.000 n=10)
StorageFastCommit/100000-12    1.918M ± 0%   1.816M ± 0%  -5.30% (p=0.000 n=10)
StorageFastCommit/1000000-12   19.15M ± 0%   18.15M ± 0%  -5.22% (p=0.000 n=10)
geomean                        62.31k        58.91k       -5.45%
  • Loading branch information
fxamacker committed May 9, 2024
1 parent e11f55f commit 7162eab
Show file tree
Hide file tree
Showing 3 changed files with 474 additions and 17 deletions.
207 changes: 205 additions & 2 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,12 +777,17 @@ func (s *PersistentSlabStorage) sortedOwnedDeltaKeys() []SlabID {
}

func (s *PersistentSlabStorage) Commit() error {
var err error

// this part ensures the keys are sorted so commit operation is deterministic
keysWithOwners := s.sortedOwnedDeltaKeys()

for _, id := range keysWithOwners {
return s.commit(keysWithOwners)
}

func (s *PersistentSlabStorage) commit(keys []SlabID) error {
var err error

for _, id := range keys {
slab := s.deltas[id]

// deleted slabs
Expand Down Expand Up @@ -964,6 +969,204 @@ func (s *PersistentSlabStorage) FastCommit(numWorkers int) error {
return nil
}

// NonderterministicFastCommit commits changes in nondeterministic order.
// This is used by migration program when ordering isn't required.
func (s *PersistentSlabStorage) NonderterministicFastCommit(numWorkers int) error {
// No changes
if len(s.deltas) == 0 {
return nil
}

type slabToBeEncoded struct {
slabID SlabID
slab Slab
}

type encodedSlab struct {
slabID SlabID
data []byte
err error
}

// Define encoder (worker) to encode slabs in parallel
encoder := func(
wg *sync.WaitGroup,
done <-chan struct{},
jobs <-chan slabToBeEncoded,
results chan<- encodedSlab,
) {
defer wg.Done()

for job := range jobs {
// Check if goroutine is signaled to stop before proceeding.
select {
case <-done:
return
default:
}

id := job.slabID
slab := job.slab

if slab == nil {
results <- encodedSlab{
slabID: id,
data: nil,
err: nil,
}
continue
}

// Serialize
data, err := EncodeSlab(slab, s.cborEncMode)
results <- encodedSlab{
slabID: id,
data: data,
err: err,
}
}
}

// Modified slabs need to be encoded (in parallel) and stored in underlying storage.
modifiedSlabCount := 0
// Deleted slabs need to be removed from underlying storage.
deletedSlabCount := 0
for k, v := range s.deltas {
// Ignore slabs not owned by accounts
if k.address == AddressUndefined {
continue
}
if v == nil {
deletedSlabCount++
} else {
modifiedSlabCount++
}
}

if modifiedSlabCount == 0 && deletedSlabCount == 0 {
return nil
}

if modifiedSlabCount < 2 {
// Avoid goroutine overhead
ids := make([]SlabID, 0, modifiedSlabCount+deletedSlabCount)
for k := range s.deltas {
// Ignore slabs not owned by accounts
if k.address == AddressUndefined {
continue
}
ids = append(ids, k)
}

return s.commit(ids)
}

if numWorkers > modifiedSlabCount {
numWorkers = modifiedSlabCount
}

var wg sync.WaitGroup

// Create done signal channel
done := make(chan struct{})

// Create job queue
jobs := make(chan slabToBeEncoded, modifiedSlabCount)

// Create result queue
results := make(chan encodedSlab, modifiedSlabCount)

defer func() {
// This ensures that all goroutines are stopped before output channel is closed.

// Wait for all goroutines to finish
wg.Wait()

// Close output channel
close(results)
}()

// Launch workers to encode slabs
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go encoder(&wg, done, jobs, results)
}

// Send jobs
deletedSlabIDs := make([]SlabID, 0, deletedSlabCount)
for k, v := range s.deltas {
// ignore the ones that are not owned by accounts
if k.address == AddressUndefined {
continue
}
if v == nil {
deletedSlabIDs = append(deletedSlabIDs, k)
} else {
jobs <- slabToBeEncoded{k, v}
}
}
close(jobs)

// Remove deleted slabs from underlying storage.
for _, id := range deletedSlabIDs {

err := s.baseStorage.Remove(id)
if err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// Wrap err as external error (if needed) because err is returned by BaseStorage interface.
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to remove slab %s", id))
}

// Deleted slabs are removed from deltas and added to read cache so that:
// 1. next read is from in-memory read cache
// 2. deleted slabs are not re-committed in next commit
s.cache[id] = nil
delete(s.deltas, id)
}

// Process encoded slabs
for i := 0; i < modifiedSlabCount; i++ {
result := <-results

if result.err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// result.err is already categorized by Encode().
return result.err
}

id := result.slabID
data := result.data

if data == nil {
// Closing done channel signals goroutines to stop.
close(done)
// This is unexpected because deleted slabs are processed separately.
return NewEncodingErrorf("unexpectd encoded empty data")
}

// Store
err := s.baseStorage.Store(id, data)
if err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// Wrap err as external error (if needed) because err is returned by BaseStorage interface.
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to store slab %s", id))
}

s.cache[id] = s.deltas[id]
// It's safe to remove slab from deltas because
// iteration is on non-temp slabs and temp slabs
// are still in deltas.
delete(s.deltas, id)
}

// Do NOT reset deltas because slabs with empty address are not saved.

return nil
}

func (s *PersistentSlabStorage) DropDeltas() {
s.deltas = make(map[SlabID]Slab)
}
Expand Down
134 changes: 134 additions & 0 deletions storage_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Atree - Scalable Arrays and Ordered Maps
*
* Copyright 2024 Dapper Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package atree

import (
"encoding/binary"
"math/rand"
"runtime"
"strconv"
"testing"

"github.com/fxamacker/cbor/v2"
"github.com/stretchr/testify/require"
)

func benchmarkFastCommit(b *testing.B, seed int64, numberOfSlabs int) {
r := rand.New(rand.NewSource(seed))

encMode, err := cbor.EncOptions{}.EncMode()
require.NoError(b, err)

decMode, err := cbor.DecOptions{}.DecMode()
require.NoError(b, err)

slabs := make([]Slab, numberOfSlabs)
for i := 0; i < numberOfSlabs; i++ {
addr := generateRandomAddress(r)

var index SlabIndex
binary.BigEndian.PutUint64(index[:], uint64(i))

id := SlabID{addr, index}

slabs[i] = generateLargeSlab(id)
}

b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

baseStorage := NewInMemBaseStorage()
storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil)

for _, slab := range slabs {
err = storage.Store(slab.SlabID(), slab)
require.NoError(b, err)
}

b.StartTimer()

err := storage.FastCommit(runtime.NumCPU())
require.NoError(b, err)
}
})
}

func benchmarkNondeterministicFastCommit(b *testing.B, seed int64, numberOfSlabs int) {
r := rand.New(rand.NewSource(seed))

encMode, err := cbor.EncOptions{}.EncMode()
require.NoError(b, err)

decMode, err := cbor.DecOptions{}.DecMode()
require.NoError(b, err)

slabs := make([]Slab, numberOfSlabs)
for i := 0; i < numberOfSlabs; i++ {
addr := generateRandomAddress(r)

var index SlabIndex
binary.BigEndian.PutUint64(index[:], uint64(i))

id := SlabID{addr, index}

slabs[i] = generateLargeSlab(id)
}

b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

baseStorage := NewInMemBaseStorage()
storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, nil, nil)

for _, slab := range slabs {
err = storage.Store(slab.SlabID(), slab)
require.NoError(b, err)
}

b.StartTimer()

err := storage.NonderterministicFastCommit(runtime.NumCPU())
require.NoError(b, err)
}
})
}

func BenchmarkStorageFastCommit(b *testing.B) {
fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc.

benchmarkFastCommit(b, fixedSeed, 10)
benchmarkFastCommit(b, fixedSeed, 100)
benchmarkFastCommit(b, fixedSeed, 1_000)
benchmarkFastCommit(b, fixedSeed, 10_000)
benchmarkFastCommit(b, fixedSeed, 100_000)
benchmarkFastCommit(b, fixedSeed, 1_000_000)
}

func BenchmarkStorageNondeterministicFastCommit(b *testing.B) {
fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc.

benchmarkNondeterministicFastCommit(b, fixedSeed, 10)
benchmarkNondeterministicFastCommit(b, fixedSeed, 100)
benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000)
benchmarkNondeterministicFastCommit(b, fixedSeed, 10_000)
benchmarkNondeterministicFastCommit(b, fixedSeed, 100_000)
benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000_000)
}
Loading

0 comments on commit 7162eab

Please sign in to comment.