Skip to content

Commit

Permalink
refactor(chunker): reduce overall memory use (#649)
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero authored Aug 21, 2024
1 parent aa27cd2 commit 3cd3857
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ The following emojis are used to highlight certain changes:

### Changed

- `chunker` refactored to reduce overall memory use by reducing heap fragmentation [#649](https://github.com/ipfs/boxo/pull/649)

### Removed

### Fixed
Expand Down
36 changes: 36 additions & 0 deletions chunker/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,39 @@ func benchmarkChunkerSize(b *testing.B, ns newSplitter, size int) {
}
Res = Res + res
}

func benchmarkFilesAlloc(b *testing.B, ns newSplitter) {
const (
chunkSize = 4096
minDataSize = 20000
maxDataSize = 60000
fileCount = 10000
)
rng := rand.New(rand.NewSource(1))
data := make([]byte, maxDataSize)
rng.Read(data)

b.SetBytes(maxDataSize)
b.ReportAllocs()
b.ResetTimer()

var res uint64

for i := 0; i < b.N; i++ {
for j := 0; j < fileCount; j++ {
fileSize := rng.Intn(maxDataSize-minDataSize) + minDataSize
r := ns(bytes.NewReader(data[:fileSize]))
for {
chunk, err := r.NextBytes()
if err != nil {
if err == io.EOF {
break
}
b.Fatal(err)
}
res = res + uint64(len(chunk))
}
}
}
Res = Res + res
}
48 changes: 38 additions & 10 deletions chunker/splitting.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,20 @@
package chunk

import (
"errors"
"io"
"math/bits"

logging "github.com/ipfs/go-log/v2"
pool "github.com/libp2p/go-buffer-pool"
)

var log = logging.Logger("chunk")

// maxOverAllocBytes is the maximum unused space a chunk can have without being
// reallocated to a smaller size to fit the data.
const maxOverAllocBytes = 1024

// A Splitter reads bytes from a Reader and creates "chunks" (byte slices)
// that can be used to build DAG nodes.
type Splitter interface {
Expand Down Expand Up @@ -81,19 +87,41 @@ func (ss *sizeSplitterv2) NextBytes() ([]byte, error) {

full := pool.Get(int(ss.size))
n, err := io.ReadFull(ss.r, full)
switch err {
case io.ErrUnexpectedEOF:
ss.err = io.EOF
small := make([]byte, n)
copy(small, full)
pool.Put(full)
return small, nil
case nil:
return full, nil
default:
if err != nil {
if errors.Is(err, io.ErrUnexpectedEOF) {
ss.err = io.EOF
return reallocChunk(full, n), nil
}
pool.Put(full)
return nil, err
}
return full, nil
}

func reallocChunk(full []byte, n int) []byte {
// Do not return an empty buffer.
if n == 0 {
pool.Put(full)
return nil
}

// If chunk is close enough to fully used.
if cap(full)-n <= maxOverAllocBytes {
return full[:n]
}

var small []byte
// If reallocating to the nearest power of two saves space without leaving
// too much unused space.
powTwoSize := 1 << bits.Len32(uint32(n-1))
if powTwoSize-n <= maxOverAllocBytes {
small = make([]byte, n, powTwoSize)
} else {
small = make([]byte, n)
}
copy(small, full)
pool.Put(full)
return small
}

// Reader returns the io.Reader associated to this Splitter.
Expand Down
61 changes: 59 additions & 2 deletions chunker/splitting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chunk

import (
"bytes"
"errors"
"io"
"testing"

Expand Down Expand Up @@ -33,7 +34,7 @@ func TestSizeSplitterOverAllocate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if cap(chunk) > len(chunk) {
if cap(chunk)-len(chunk) > maxOverAllocBytes {
t.Fatal("chunk capacity too large")
}
}
Expand Down Expand Up @@ -89,7 +90,6 @@ func TestSizeSplitterFillsChunks(t *testing.T) {
sofar := 0
whole := make([]byte, max)
for chunk := range c {

bc := b[sofar : sofar+len(chunk)]
if !bytes.Equal(bc, chunk) {
t.Fatalf("chunk not correct: (sofar: %d) %d != %d, %v != %v", sofar, len(bc), len(chunk), bc[:100], chunk[:100])
Expand Down Expand Up @@ -127,3 +127,60 @@ func BenchmarkDefault(b *testing.B) {
return DefaultSplitter(r)
})
}

// BenchmarkFilesAllocPool benchmarks splitter that uses go-buffer-pool,
// simulating use in unixfs with many small files.
func BenchmarkFilesAllocPool(b *testing.B) {
const fileBlockSize = 4096

benchmarkFilesAlloc(b, func(r io.Reader) Splitter {
return NewSizeSplitter(r, fileBlockSize)
})
}

// BenchmarkFilesAllocPool benchmarks splitter that does not use
// go-buffer-pool, simulating use in unixfs with many small files.
func BenchmarkFilesAllocNoPool(b *testing.B) {
const fileBlockSize = 4096

benchmarkFilesAlloc(b, func(r io.Reader) Splitter {
return &sizeSplitterNoPool{
r: r,
size: uint32(fileBlockSize),
}
})
}

// sizeSplitterNoPool implements Splitter that allocates without pool. Provided
// for benchmarking against implementation with pool.
type sizeSplitterNoPool struct {
r io.Reader
size uint32
err error
}

func (ss *sizeSplitterNoPool) NextBytes() ([]byte, error) {
if ss.err != nil {
return nil, ss.err
}

full := make([]byte, ss.size)
n, err := io.ReadFull(ss.r, full)
if err != nil {
if errors.Is(err, io.ErrUnexpectedEOF) {
ss.err = io.EOF
if n == 0 {
return nil, nil
}
small := make([]byte, n)
copy(small, full)
return small, nil
}
return nil, err
}
return full, nil
}

func (ss *sizeSplitterNoPool) Reader() io.Reader {
return ss.r
}

0 comments on commit 3cd3857

Please sign in to comment.