Skip to content

Commit

Permalink
Add BatchPreload to decode slabs in parallel
Browse files Browse the repository at this point in the history
The intended use for BatchPreload is to speedup migrations.

BatchPreload decodes slabs in parallel and stores decoded
slabs in cache for later retrieval.  This is useful for
migration program when most or all slabs are expected to
be migrated.

                           │  before.txt  │              after.txt               │
                           │    sec/op    │    sec/op     vs base                │
StorageRetrieve/10-12         36.23µ ± 3%   35.33µ ±  4%        ~ (p=0.075 n=10)
StorageRetrieve/100-12        469.6µ ± 8%   124.3µ ±  0%  -73.52% (p=0.000 n=10)
StorageRetrieve/1000-12       6.678m ± 7%   2.303m ± 20%  -65.51% (p=0.000 n=10)
StorageRetrieve/10000-12      29.81m ± 2%   12.26m ±  5%  -58.86% (p=0.000 n=10)
StorageRetrieve/100000-12    303.33m ± 1%   88.40m ±  1%  -70.86% (p=0.000 n=10)
StorageRetrieve/1000000-12     3.442 ± 1%    1.137 ±  3%  -66.96% (p=0.000 n=10)
geomean                       12.34m        4.816m        -60.98%

                           │  before.txt  │              after.txt              │
                           │     B/op     │     B/op      vs base               │
StorageRetrieve/10-12        21.59Ki ± 0%   21.59Ki ± 0%       ~ (p=1.000 n=10)
StorageRetrieve/100-12       219.8Ki ± 0%   224.7Ki ± 0%  +2.24% (p=0.000 n=10)
StorageRetrieve/1000-12      2.266Mi ± 0%   2.272Mi ± 0%  +0.27% (p=0.000 n=10)
StorageRetrieve/10000-12     21.94Mi ± 0%   22.14Mi ± 0%  +0.91% (p=0.000 n=10)
StorageRetrieve/100000-12    215.3Mi ± 0%   218.5Mi ± 0%  +1.50% (p=0.000 n=10)
StorageRetrieve/1000000-12   2.211Gi ± 0%   2.212Gi ± 0%  +0.05% (p=0.000 n=10)
geomean                      6.919Mi        6.976Mi       +0.82%

                           │ before.txt  │              after.txt               │
                           │  allocs/op  │  allocs/op   vs base                 │
StorageRetrieve/10-12         76.00 ± 0%    76.00 ± 0%       ~ (p=1.000 n=10) ¹
StorageRetrieve/100-12        745.0 ± 0%    759.0 ± 0%  +1.88% (p=0.000 n=10)
StorageRetrieve/1000-12      7.161k ± 0%   7.153k ± 0%  -0.11% (p=0.000 n=10)
StorageRetrieve/10000-12     70.77k ± 0%   70.58k ± 0%  -0.27% (p=0.000 n=10)
StorageRetrieve/100000-12    711.9k ± 0%   709.7k ± 0%  -0.31% (p=0.000 n=10)
StorageRetrieve/1000000-12   7.115M ± 0%   7.077M ± 0%  -0.54% (p=0.000 n=10)
geomean                      22.93k        22.95k       +0.11%
  • Loading branch information
fxamacker committed May 9, 2024
1 parent cfb364c commit 0901d53
Show file tree
Hide file tree
Showing 3 changed files with 423 additions and 0 deletions.
148 changes: 148 additions & 0 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -1558,3 +1558,151 @@ func (s *PersistentSlabStorage) getAllChildReferences(slab Slab) (

return references, brokenReferences, nil
}

func (s *PersistentSlabStorage) BatchPreload(ids []SlabID, numWorkers int) error {
if len(ids) == 0 {
return nil
}

minCountForBatchPreload := 10
if len(ids) == minCountForBatchPreload {

for _, id := range ids {
// fetch from base storage last
data, ok, err := s.baseStorage.Retrieve(id)
if err != nil {
// Wrap err as external error (if needed) because err is returned by BaseStorage interface.
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to retrieve slab %s", id))
}
if !ok {
return NewSlabNotFoundError(id, fmt.Errorf("failed to retrieve slab %s", id))
}

slab, err := DecodeSlab(id, data, s.cborDecMode, s.DecodeStorable, s.DecodeTypeInfo)
if err != nil {
// err is already categorized by DecodeSlab().
return err
}

// save decoded slab to cache
s.cache[id] = slab
}

return nil
}

type slabToBeDecoded struct {
slabID SlabID
data []byte
}

type decodedSlab struct {
slabID SlabID
slab Slab
err error
}

// Define decoder (worker) to decode slabs in parallel
decoder := func(wg *sync.WaitGroup, done <-chan struct{}, jobs <-chan slabToBeDecoded, results chan<- decodedSlab) {
defer wg.Done()

for slabData := range jobs {
// Check if goroutine is signaled to stop before proceeding.
select {
case <-done:
return
default:
}

id := slabData.slabID
data := slabData.data

slab, err := DecodeSlab(id, data, s.cborDecMode, s.DecodeStorable, s.DecodeTypeInfo)
// err is already categorized by DecodeSlab().
results <- decodedSlab{
slabID: id,
slab: slab,
err: err,
}
}
}

if numWorkers > len(ids) {
numWorkers = len(ids)
}

var wg sync.WaitGroup

// Construct done signal channel
done := make(chan struct{})

// Construct job queue
jobs := make(chan slabToBeDecoded, len(ids))

// Construct result queue
results := make(chan decodedSlab, len(ids))

defer func() {
// This ensures that all goroutines are stopped before output channel is closed.

// Wait for all goroutines to finish
wg.Wait()

// Close output channel
close(results)
}()

// Preallocate cache map if empty
if len(s.cache) == 0 {
s.cache = make(map[SlabID]Slab, len(ids))
}

// Launch workers
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go decoder(&wg, done, jobs, results)
}

// Send jobs
{
// Need to close input channel (jobs) here because
// if there isn't any job in jobs channel,
// done is never processed inside loop "for slabData := range jobs".
defer close(jobs)

for _, id := range ids {
// fetch from base storage last
data, ok, err := s.baseStorage.Retrieve(id)
if err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// Wrap err as external error (if needed) because err is returned by BaseStorage interface.
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to retrieve slab %s", id))
}
if !ok {
// Closing done channel signals goroutines to stop.
close(done)
return NewSlabNotFoundError(id, fmt.Errorf("failed to retrieve slab %s", id))
}

jobs <- slabToBeDecoded{id, data}
}
}

// Process results
for i := 0; i < len(ids); i++ {
result := <-results

if result.err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// result.err is already categorized by DecodeSlab().
return result.err
}

// save decoded slab to cache
s.cache[result.slabID] = result.slab
}

return nil
}
119 changes: 119 additions & 0 deletions storage_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,122 @@ func BenchmarkStorageNondeterministicFastCommit(b *testing.B) {
benchmarkNondeterministicFastCommit(b, fixedSeed, 100_000)
benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000_000)
}

func benchmarkRetrieve(b *testing.B, seed int64, numberOfSlabs int) {

r := rand.New(rand.NewSource(seed))

encMode, err := cbor.EncOptions{}.EncMode()
require.NoError(b, err)

decMode, err := cbor.DecOptions{}.DecMode()
require.NoError(b, err)

encodedSlabs := make(map[SlabID][]byte)
ids := make([]SlabID, 0, numberOfSlabs)
for i := 0; i < numberOfSlabs; i++ {
addr := generateRandomAddress(r)

var index SlabIndex
binary.BigEndian.PutUint64(index[:], uint64(i))

id := SlabID{addr, index}

slab := generateLargeSlab(id)

data, err := EncodeSlab(slab, encMode)
require.NoError(b, err)

encodedSlabs[id] = data
ids = append(ids, id)
}

b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

baseStorage := NewInMemBaseStorageFromMap(encodedSlabs)
storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, decodeStorable, decodeTypeInfo)

b.StartTimer()

for _, id := range ids {
_, found, err := storage.Retrieve(id)
require.True(b, found)
require.NoError(b, err)
}
}
})
}

func benchmarkBatchPreload(b *testing.B, seed int64, numberOfSlabs int) {

r := rand.New(rand.NewSource(seed))

encMode, err := cbor.EncOptions{}.EncMode()
require.NoError(b, err)

decMode, err := cbor.DecOptions{}.DecMode()
require.NoError(b, err)

encodedSlabs := make(map[SlabID][]byte)
ids := make([]SlabID, 0, numberOfSlabs)
for i := 0; i < numberOfSlabs; i++ {
addr := generateRandomAddress(r)

var index SlabIndex
binary.BigEndian.PutUint64(index[:], uint64(i))

id := SlabID{addr, index}

slab := generateLargeSlab(id)

data, err := EncodeSlab(slab, encMode)
require.NoError(b, err)

encodedSlabs[id] = data
ids = append(ids, id)
}

b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

baseStorage := NewInMemBaseStorageFromMap(encodedSlabs)
storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, decodeStorable, decodeTypeInfo)

b.StartTimer()

err = storage.BatchPreload(ids, runtime.NumCPU())
require.NoError(b, err)

for _, id := range ids {
_, found, err := storage.Retrieve(id)
require.True(b, found)
require.NoError(b, err)
}
}
})
}

func BenchmarkStorageRetrieve(b *testing.B) {
fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc.

benchmarkRetrieve(b, fixedSeed, 10)
benchmarkRetrieve(b, fixedSeed, 100)
benchmarkRetrieve(b, fixedSeed, 1_000)
benchmarkRetrieve(b, fixedSeed, 10_000)
benchmarkRetrieve(b, fixedSeed, 100_000)
benchmarkRetrieve(b, fixedSeed, 1_000_000)
}

func BenchmarkStorageBatchPreload(b *testing.B) {
fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc.

benchmarkBatchPreload(b, fixedSeed, 10)
benchmarkBatchPreload(b, fixedSeed, 100)
benchmarkBatchPreload(b, fixedSeed, 1_000)
benchmarkBatchPreload(b, fixedSeed, 10_000)
benchmarkBatchPreload(b, fixedSeed, 100_000)
benchmarkBatchPreload(b, fixedSeed, 1_000_000)
}
Loading

0 comments on commit 0901d53

Please sign in to comment.