Skip to content

Commit 55b6158

Browse files
remove extra copy and modify write buffer size to align with largest fuse read size (#22)
* try to remove extra copy * fix allocation * wip * wip * cleanup * use const * add more buffers * bigger buffer
1 parent 070c7e4 commit 55b6158

File tree

3 files changed

+53
-17
lines changed

3 files changed

+53
-17
lines changed

pkg/client.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ import (
2020
"tailscale.com/client/tailscale"
2121
)
2222

23-
const getContentRequestTimeout = 30 * time.Second
24-
const storeContentRequestTimeout = 300 * time.Second
25-
const closestHostTimeout = 30 * time.Second
26-
const localClientCacheCleanupInterval = 5 * time.Second
27-
const localClientCacheTTL = 300 * time.Second
23+
const (
24+
getContentRequestTimeout = 30 * time.Second
25+
storeContentRequestTimeout = 300 * time.Second
26+
closestHostTimeout = 30 * time.Second
27+
localClientCacheCleanupInterval = 5 * time.Second
28+
localClientCacheTTL = 300 * time.Second
29+
readBufferSizeBytes = 128 * 1024
30+
)
2831

2932
func AuthInterceptor(token string) grpc.UnaryClientInterceptor {
3033
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
@@ -160,6 +163,7 @@ func (c *BlobCacheClient) addHost(host *BlobCacheHost) error {
160163
grpc.MaxCallRecvMsgSize(maxMessageSize),
161164
grpc.MaxCallSendMsgSize(maxMessageSize),
162165
),
166+
grpc.WithReadBufferSize(readBufferSizeBytes),
163167
}
164168

165169
if c.cfg.Token != "" {

pkg/server.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"os/signal"
1313
"path/filepath"
1414
"runtime"
15+
"sync"
1516
"syscall"
1617
"time"
1718

@@ -23,6 +24,12 @@ import (
2324
"google.golang.org/grpc/status"
2425
)
2526

27+
const (
28+
writeBufferSizeBytes = 128 * 1024
29+
getContentBufferPoolSize = 128
30+
getContentBufferSize = 256 * 1024
31+
)
32+
2633
type CacheServiceOpts struct {
2734
Addr string
2835
}
@@ -149,6 +156,7 @@ func (cs *CacheService) StartServer(port uint) error {
149156
s := grpc.NewServer(
150157
grpc.MaxRecvMsgSize(maxMessageSize),
151158
grpc.MaxSendMsgSize(maxMessageSize),
159+
grpc.WriteBufferSize(writeBufferSizeBytes),
152160
)
153161
proto.RegisterBlobCacheServer(s, cs)
154162

@@ -170,15 +178,42 @@ func (cs *CacheService) StartServer(port uint) error {
170178
return nil
171179
}
172180

181+
var getContentBufferPool = sync.Pool{
182+
New: func() interface{} {
183+
b := make([]byte, getContentBufferSize)
184+
return &b
185+
},
186+
}
187+
188+
func init() {
189+
for i := 0; i < getContentBufferPoolSize; i++ {
190+
//lint:ignore SA6002 reason: pre-allocating buffers for performance
191+
getContentBufferPool.Put(make([]byte, getContentBufferSize))
192+
}
193+
}
194+
173195
func (cs *CacheService) GetContent(ctx context.Context, req *proto.GetContentRequest) (*proto.GetContentResponse, error) {
174-
content, err := cs.cas.Get(req.Hash, req.Offset, req.Length)
196+
bufPtr := getContentBufferPool.Get().(*[]byte)
197+
defer getContentBufferPool.Put(bufPtr)
198+
dst := *bufPtr
199+
200+
if cap(dst) < int(req.Length) {
201+
dst = make([]byte, req.Length)
202+
*bufPtr = dst
203+
}
204+
dst = dst[:req.Length]
205+
206+
resp := &proto.GetContentResponse{Content: dst}
207+
err := cs.cas.Get(req.Hash, req.Offset, req.Length, dst)
175208
if err != nil {
176209
Logger.Debugf("Get - [%s] - %v", req.Hash, err)
177210
return &proto.GetContentResponse{Content: nil, Ok: false}, nil
178211
}
179212

180213
Logger.Debugf("Get - [%s] (offset=%d, length=%d)", req.Hash, req.Offset, req.Length)
181-
return &proto.GetContentResponse{Content: content, Ok: true}, nil
214+
215+
resp.Ok = true
216+
return resp, nil
182217
}
183218

184219
func (cs *CacheService) store(ctx context.Context, buffer *bytes.Buffer, sourcePath string, sourceOffset int64) (string, error) {

pkg/storage.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package blobcache
22

33
import (
4-
"bytes"
54
"context"
65
"errors"
76
"fmt"
@@ -136,11 +135,10 @@ func (cas *ContentAddressableStorage) Add(ctx context.Context, hash string, cont
136135
return nil
137136
}
138137

139-
func (cas *ContentAddressableStorage) Get(hash string, offset, length int64) ([]byte, error) {
140-
buffer := bytes.NewBuffer(make([]byte, 0, length))
141-
138+
func (cas *ContentAddressableStorage) Get(hash string, offset, length int64, dst []byte) error {
142139
remainingLength := length
143140
o := offset
141+
dstOffset := int64(0)
144142

145143
cas.cache.ResetTTL(hash, time.Duration(cas.config.ObjectTtlS)*time.Second)
146144

@@ -151,7 +149,7 @@ func (cas *ContentAddressableStorage) Get(hash string, offset, length int64) ([]
151149
// Check cache for chunk
152150
value, found := cas.cache.Get(chunkKey)
153151
if !found {
154-
return nil, ErrContentNotFound
152+
return ErrContentNotFound
155153
}
156154

157155
v := value.(cacheValue)
@@ -167,18 +165,17 @@ func (cas *ContentAddressableStorage) Get(hash string, offset, length int64) ([]
167165
end := start + readLength
168166

169167
if start < 0 || end <= start || end > int64(len(chunkBytes)) {
170-
return nil, fmt.Errorf("invalid chunk boundaries: start %d, end %d, chunk size %d", start, end, len(chunkBytes))
168+
return fmt.Errorf("invalid chunk boundaries: start %d, end %d, chunk size %d", start, end, len(chunkBytes))
171169
}
172170

173-
if _, err := buffer.Write(chunkBytes[start:end]); err != nil {
174-
return nil, fmt.Errorf("failed to write to buffer: %v", err)
175-
}
171+
copy(dst[dstOffset:dstOffset+readLength], chunkBytes[start:end])
176172

177173
remainingLength -= readLength
178174
o += readLength
175+
dstOffset += readLength
179176
}
180177

181-
return buffer.Bytes(), nil
178+
return nil
182179
}
183180

184181
func (cas *ContentAddressableStorage) onEvict(item *ristretto.Item[interface{}]) {

0 commit comments

Comments
 (0)