Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BatchPreload to decode slabs in parallel and cache #404

Merged
merged 2 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -1560,3 +1560,151 @@ func (s *PersistentSlabStorage) getAllChildReferences(slab Slab) (

return references, brokenReferences, nil
}

func (s *PersistentSlabStorage) BatchPreload(ids []SlabID, numWorkers int) error {
if len(ids) == 0 {
return nil
}

minCountForBatchPreload := 11
turbolent marked this conversation as resolved.
Show resolved Hide resolved
if len(ids) < minCountForBatchPreload {

for _, id := range ids {
// fetch from base storage last
data, ok, err := s.baseStorage.Retrieve(id)
if err != nil {
// Wrap err as external error (if needed) because err is returned by BaseStorage interface.
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to retrieve slab %s", id))
}
if !ok {
continue
}

slab, err := DecodeSlab(id, data, s.cborDecMode, s.DecodeStorable, s.DecodeTypeInfo)
if err != nil {
// err is already categorized by DecodeSlab().
return err
}

// save decoded slab to cache
s.cache[id] = slab
}

return nil
}

type slabToBeDecoded struct {
slabID SlabID
data []byte
}

type decodedSlab struct {
slabID SlabID
slab Slab
err error
}

// Define decoder (worker) to decode slabs in parallel
decoder := func(wg *sync.WaitGroup, done <-chan struct{}, jobs <-chan slabToBeDecoded, results chan<- decodedSlab) {
defer wg.Done()

for slabData := range jobs {
// Check if goroutine is signaled to stop before proceeding.
select {
case <-done:
return
default:
}

id := slabData.slabID
data := slabData.data

slab, err := DecodeSlab(id, data, s.cborDecMode, s.DecodeStorable, s.DecodeTypeInfo)
// err is already categorized by DecodeSlab().
results <- decodedSlab{
slabID: id,
slab: slab,
err: err,
}
}
}

if numWorkers > len(ids) {
numWorkers = len(ids)
}

var wg sync.WaitGroup

// Construct done signal channel
done := make(chan struct{})

// Construct job queue
jobs := make(chan slabToBeDecoded, len(ids))

// Construct result queue
results := make(chan decodedSlab, len(ids))

defer func() {
// This ensures that all goroutines are stopped before output channel is closed.

// Wait for all goroutines to finish
wg.Wait()

// Close output channel
close(results)
}()

// Preallocate cache map if empty
if len(s.cache) == 0 {
s.cache = make(map[SlabID]Slab, len(ids))
}

// Launch workers
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go decoder(&wg, done, jobs, results)
}

// Send jobs
jobCount := 0
{
// Need to close input channel (jobs) here because
// if there isn't any job in jobs channel,
// done is never processed inside loop "for slabData := range jobs".
defer close(jobs)

for _, id := range ids {
// fetch from base storage last
data, ok, err := s.baseStorage.Retrieve(id)
if err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// Wrap err as external error (if needed) because err is returned by BaseStorage interface.
return wrapErrorfAsExternalErrorIfNeeded(err, fmt.Sprintf("failed to retrieve slab %s", id))
}
if !ok {
continue
}

jobs <- slabToBeDecoded{id, data}
jobCount++
}
}

// Process results
for i := 0; i < jobCount; i++ {
result := <-results

if result.err != nil {
// Closing done channel signals goroutines to stop.
close(done)
// result.err is already categorized by DecodeSlab().
return result.err
}

// save decoded slab to cache
s.cache[result.slabID] = result.slab
}

return nil
}
119 changes: 119 additions & 0 deletions storage_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,122 @@ func BenchmarkStorageNondeterministicFastCommit(b *testing.B) {
benchmarkNondeterministicFastCommit(b, fixedSeed, 100_000)
benchmarkNondeterministicFastCommit(b, fixedSeed, 1_000_000)
}

func benchmarkRetrieve(b *testing.B, seed int64, numberOfSlabs int) {

r := rand.New(rand.NewSource(seed))

encMode, err := cbor.EncOptions{}.EncMode()
require.NoError(b, err)

decMode, err := cbor.DecOptions{}.DecMode()
require.NoError(b, err)

encodedSlabs := make(map[SlabID][]byte)
ids := make([]SlabID, 0, numberOfSlabs)
for i := 0; i < numberOfSlabs; i++ {
addr := generateRandomAddress(r)

var index SlabIndex
binary.BigEndian.PutUint64(index[:], uint64(i))

id := SlabID{addr, index}

slab := generateLargeSlab(id)

data, err := EncodeSlab(slab, encMode)
require.NoError(b, err)

encodedSlabs[id] = data
ids = append(ids, id)
}

b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

baseStorage := NewInMemBaseStorageFromMap(encodedSlabs)
storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, decodeStorable, decodeTypeInfo)

b.StartTimer()

for _, id := range ids {
_, found, err := storage.Retrieve(id)
require.True(b, found)
require.NoError(b, err)
}
}
})
}

func benchmarkBatchPreload(b *testing.B, seed int64, numberOfSlabs int) {

r := rand.New(rand.NewSource(seed))

encMode, err := cbor.EncOptions{}.EncMode()
require.NoError(b, err)

decMode, err := cbor.DecOptions{}.DecMode()
require.NoError(b, err)

encodedSlabs := make(map[SlabID][]byte)
ids := make([]SlabID, 0, numberOfSlabs)
for i := 0; i < numberOfSlabs; i++ {
addr := generateRandomAddress(r)

var index SlabIndex
binary.BigEndian.PutUint64(index[:], uint64(i))

id := SlabID{addr, index}

slab := generateLargeSlab(id)

data, err := EncodeSlab(slab, encMode)
require.NoError(b, err)

encodedSlabs[id] = data
ids = append(ids, id)
}

b.Run(strconv.Itoa(numberOfSlabs), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

baseStorage := NewInMemBaseStorageFromMap(encodedSlabs)
storage := NewPersistentSlabStorage(baseStorage, encMode, decMode, decodeStorable, decodeTypeInfo)

b.StartTimer()

err = storage.BatchPreload(ids, runtime.NumCPU())
require.NoError(b, err)

for _, id := range ids {
_, found, err := storage.Retrieve(id)
require.True(b, found)
require.NoError(b, err)
}
}
})
}

func BenchmarkStorageRetrieve(b *testing.B) {
fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc.

benchmarkRetrieve(b, fixedSeed, 10)
benchmarkRetrieve(b, fixedSeed, 100)
benchmarkRetrieve(b, fixedSeed, 1_000)
benchmarkRetrieve(b, fixedSeed, 10_000)
benchmarkRetrieve(b, fixedSeed, 100_000)
benchmarkRetrieve(b, fixedSeed, 1_000_000)
}

func BenchmarkStorageBatchPreload(b *testing.B) {
fixedSeed := int64(1234567) // intentionally use fixed constant rather than time, etc.

benchmarkBatchPreload(b, fixedSeed, 10)
benchmarkBatchPreload(b, fixedSeed, 100)
benchmarkBatchPreload(b, fixedSeed, 1_000)
benchmarkBatchPreload(b, fixedSeed, 10_000)
benchmarkBatchPreload(b, fixedSeed, 100_000)
benchmarkBatchPreload(b, fixedSeed, 1_000_000)
}
Loading
Loading