Skip to content

Commit e54621e

Browse files
increase download concurrency (#53)
* increase download concurrency * cleanup * rename --------- Co-authored-by: Luke Lombardi <[email protected]>
1 parent 331af3b commit e54621e

File tree

8 files changed

+578
-467
lines changed

8 files changed

+578
-467
lines changed

e2e/basic/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func main() {
7171
log.Printf("Stored content with hash: %v", hash)
7272
log.Printf("Computed hash: %v", computedHash)
7373

74-
exists, err := client.IsCachedNearby(hash)
74+
exists, err := client.IsCachedNearby(hash, hash)
7575
if err != nil {
7676
log.Fatalf("Unable to check if content is cached nearby: %v", err)
7777
}

pkg/config.default.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ server:
88
pageSizeBytes: 4000000 # 4MB
99
maxCachePct: 60
1010
options: []
11+
downloadConcurrency: 16
12+
downloadChunkSize: 64000000 # 64MB
1113
metadata:
1214
redisAddr:
1315
redisPasswd:

pkg/s3_client.go

Lines changed: 81 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,19 @@ import (
55
"context"
66
"fmt"
77
"io"
8+
"sync"
89

910
"github.com/aws/aws-sdk-go-v2/aws"
1011
"github.com/aws/aws-sdk-go-v2/config"
1112
"github.com/aws/aws-sdk-go-v2/credentials"
1213
"github.com/aws/aws-sdk-go-v2/service/s3"
1314
)
1415

15-
const (
16-
downloadChunkSize = 64 * 1024 * 1024 // 64MB
17-
)
18-
1916
type S3Client struct {
20-
Client *s3.Client
21-
Source S3SourceConfig
17+
Client *s3.Client
18+
Source S3SourceConfig
19+
DownloadConcurrency int64
20+
DownloadChunkSize int64
2221
}
2322

2423
type S3SourceConfig struct {
@@ -29,7 +28,7 @@ type S3SourceConfig struct {
2928
SecretKey string
3029
}
3130

32-
func NewS3Client(ctx context.Context, sourceConfig S3SourceConfig) (*S3Client, error) {
31+
func NewS3Client(ctx context.Context, sourceConfig S3SourceConfig, serverConfig BlobCacheServerConfig) (*S3Client, error) {
3332
cfg, err := config.LoadDefaultConfig(ctx,
3433
config.WithRegion(sourceConfig.Region),
3534
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
@@ -48,8 +47,10 @@ func NewS3Client(ctx context.Context, sourceConfig S3SourceConfig) (*S3Client, e
4847

4948
s3Client := s3.NewFromConfig(cfg)
5049
return &S3Client{
51-
Client: s3Client,
52-
Source: sourceConfig,
50+
Client: s3Client,
51+
Source: sourceConfig,
52+
DownloadConcurrency: serverConfig.S3DownloadConcurrency,
53+
DownloadChunkSize: serverConfig.S3DownloadChunkSize,
5354
}, nil
5455
}
5556

@@ -79,33 +80,79 @@ func (c *S3Client) DownloadIntoBuffer(ctx context.Context, key string, buffer *b
7980
return err
8081
}
8182
size := aws.ToInt64(head.ContentLength)
83+
if size <= 0 {
84+
return fmt.Errorf("invalid object size: %d", size)
85+
}
86+
87+
numChunks := int((size + c.DownloadChunkSize - 1) / c.DownloadChunkSize)
88+
chunks := make([][]byte, numChunks)
89+
sem := make(chan struct{}, c.DownloadConcurrency)
90+
var wg sync.WaitGroup
91+
92+
ctx, cancel := context.WithCancel(ctx)
93+
defer cancel()
94+
95+
errCh := make(chan error, 1)
96+
97+
for i := 0; i < numChunks; i++ {
98+
wg.Add(1)
99+
100+
go func(i int) {
101+
sem <- struct{}{}
102+
defer func() { <-sem }()
103+
defer wg.Done()
104+
105+
if ctx.Err() != nil {
106+
return
107+
}
108+
109+
start := int64(i) * c.DownloadChunkSize
110+
end := start + c.DownloadChunkSize - 1
111+
if end >= size {
112+
end = size - 1
113+
}
114+
115+
rangeHeader := fmt.Sprintf("bytes=%d-%d", start, end)
116+
resp, err := c.Client.GetObject(ctx, &s3.GetObjectInput{
117+
Bucket: aws.String(c.Source.BucketName),
118+
Key: aws.String(key),
119+
Range: &rangeHeader,
120+
})
121+
if err != nil {
122+
select {
123+
case errCh <- fmt.Errorf("range request failed for %s: %w", rangeHeader, err):
124+
cancel()
125+
default:
126+
}
127+
return
128+
}
129+
defer resp.Body.Close()
130+
131+
part := make([]byte, end-start+1)
132+
n, err := io.ReadFull(resp.Body, part)
133+
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
134+
select {
135+
case errCh <- fmt.Errorf("error reading range %s: %w", rangeHeader, err):
136+
cancel()
137+
default:
138+
}
139+
return
140+
}
141+
142+
chunks[i] = part[:n]
143+
}(i)
144+
}
145+
146+
wg.Wait()
147+
close(errCh)
148+
149+
if err, ok := <-errCh; ok {
150+
return err
151+
}
82152

83153
buffer.Reset()
84-
var start int64 = 0
85-
86-
for start < size {
87-
end := start + downloadChunkSize - 1
88-
if end >= size {
89-
end = size - 1
90-
}
91-
92-
rangeHeader := fmt.Sprintf("bytes=%d-%d", start, end)
93-
resp, err := c.Client.GetObject(ctx, &s3.GetObjectInput{
94-
Bucket: aws.String(c.Source.BucketName),
95-
Key: aws.String(key),
96-
Range: &rangeHeader,
97-
})
98-
if err != nil {
99-
return err
100-
}
101-
102-
n, err := io.Copy(buffer, resp.Body)
103-
resp.Body.Close()
104-
if err != nil {
105-
return err
106-
}
107-
108-
start += n
154+
for _, chunk := range chunks {
155+
buffer.Write(chunk)
109156
}
110157

111158
return nil

pkg/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ func (cs *CacheService) cacheSourceFromS3(source *proto.CacheSource, buffer *byt
471471
EndpointURL: source.EndpointUrl,
472472
AccessKey: source.AccessKey,
473473
SecretKey: source.SecretKey,
474-
})
474+
}, cs.serverConfig)
475475
if err != nil {
476476
return err
477477
}

pkg/types.go

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -54,32 +54,36 @@ const (
5454
)
5555

5656
type BlobCacheServerConfig struct {
57-
Mode BlobCacheServerMode `key:"mode" json:"mode"`
58-
DiskCacheDir string `key:"diskCacheDir" json:"disk_cache_dir"`
59-
DiskCacheMaxUsagePct float64 `key:"diskCacheMaxUsagePct" json:"disk_cache_max_usage_pct"`
60-
Token string `key:"token" json:"token"`
61-
PrettyLogs bool `key:"prettyLogs" json:"pretty_logs"`
62-
ObjectTtlS int `key:"objectTtlS" json:"object_ttl_s"`
63-
MaxCachePct int64 `key:"maxCachePct" json:"max_cache_pct"`
64-
PageSizeBytes int64 `key:"pageSizeBytes" json:"page_size_bytes"`
65-
Metadata MetadataConfig `key:"metadata" json:"metadata"`
66-
Sources []SourceConfig `key:"sources" json:"sources"`
57+
Mode BlobCacheServerMode `key:"mode" json:"mode"`
58+
DiskCacheDir string `key:"diskCacheDir" json:"disk_cache_dir"`
59+
DiskCacheMaxUsagePct float64 `key:"diskCacheMaxUsagePct" json:"disk_cache_max_usage_pct"`
60+
Token string `key:"token" json:"token"`
61+
PrettyLogs bool `key:"prettyLogs" json:"pretty_logs"`
62+
ObjectTtlS int `key:"objectTtlS" json:"object_ttl_s"`
63+
MaxCachePct int64 `key:"maxCachePct" json:"max_cache_pct"`
64+
PageSizeBytes int64 `key:"pageSizeBytes" json:"page_size_bytes"`
65+
Metadata MetadataConfig `key:"metadata" json:"metadata"`
66+
Sources []SourceConfig `key:"sources" json:"sources"`
67+
S3DownloadConcurrency int64 `key:"s3DownloadConcurrency" json:"s3_download_concurrency"`
68+
S3DownloadChunkSize int64 `key:"s3DownloadChunkSize" json:"s3_download_chunk_size"`
6769

6870
// Allows a coordinator to override a slave server's config for a specific locality/region
6971
Regions map[string]RegionConfig `key:"regions" json:"regions"`
7072
}
7173

7274
func (c *BlobCacheServerConfig) ToProto() *proto.BlobCacheServerConfig {
7375
protoConfig := &proto.BlobCacheServerConfig{
74-
Mode: string(c.Mode),
75-
DiskCacheDir: c.DiskCacheDir,
76-
DiskCacheMaxUsagePct: float32(c.DiskCacheMaxUsagePct),
77-
MaxCachePct: c.MaxCachePct,
78-
PageSizeBytes: c.PageSizeBytes,
79-
ObjectTtlS: int64(c.ObjectTtlS),
80-
PrettyLogs: c.PrettyLogs,
81-
Token: c.Token,
82-
Sources: make([]*proto.SourceConfig, 0),
76+
Mode: string(c.Mode),
77+
DiskCacheDir: c.DiskCacheDir,
78+
DiskCacheMaxUsagePct: float32(c.DiskCacheMaxUsagePct),
79+
MaxCachePct: c.MaxCachePct,
80+
PageSizeBytes: c.PageSizeBytes,
81+
ObjectTtlS: int64(c.ObjectTtlS),
82+
PrettyLogs: c.PrettyLogs,
83+
Token: c.Token,
84+
Sources: make([]*proto.SourceConfig, 0),
85+
S3DownloadConcurrency: c.S3DownloadConcurrency,
86+
S3DownloadChunkSize: c.S3DownloadChunkSize,
8387
}
8488

8589
for _, source := range c.Sources {
@@ -118,15 +122,17 @@ func (c *BlobCacheServerConfig) ToProto() *proto.BlobCacheServerConfig {
118122

119123
func BlobCacheServerConfigFromProto(protoConfig *proto.BlobCacheServerConfig) BlobCacheServerConfig {
120124
cfg := BlobCacheServerConfig{
121-
Mode: BlobCacheServerMode(protoConfig.Mode),
122-
DiskCacheDir: protoConfig.DiskCacheDir,
123-
DiskCacheMaxUsagePct: float64(protoConfig.DiskCacheMaxUsagePct),
124-
MaxCachePct: protoConfig.MaxCachePct,
125-
PageSizeBytes: protoConfig.PageSizeBytes,
126-
ObjectTtlS: int(protoConfig.ObjectTtlS),
127-
PrettyLogs: protoConfig.PrettyLogs,
128-
Token: protoConfig.Token,
129-
Sources: make([]SourceConfig, len(protoConfig.Sources)),
125+
Mode: BlobCacheServerMode(protoConfig.Mode),
126+
DiskCacheDir: protoConfig.DiskCacheDir,
127+
DiskCacheMaxUsagePct: float64(protoConfig.DiskCacheMaxUsagePct),
128+
MaxCachePct: protoConfig.MaxCachePct,
129+
PageSizeBytes: protoConfig.PageSizeBytes,
130+
ObjectTtlS: int(protoConfig.ObjectTtlS),
131+
PrettyLogs: protoConfig.PrettyLogs,
132+
Token: protoConfig.Token,
133+
Sources: make([]SourceConfig, len(protoConfig.Sources)),
134+
S3DownloadConcurrency: protoConfig.S3DownloadConcurrency,
135+
S3DownloadChunkSize: protoConfig.S3DownloadChunkSize,
130136
}
131137

132138
for i, protoSource := range protoConfig.Sources {

0 commit comments

Comments
 (0)