Skip to content

Commit 68ece42

Browse files
committed
clean up implementation of WriteFromFileSystem
- no need to wait for the cache write - one cache block write failure does not cancel all writes - wrap reader so we can use io.CopyN instead of writing our own
1 parent c87f7d4 commit 68ece42

File tree

2 files changed

+81
-71
lines changed

2 files changed

+81
-71
lines changed

packages/shared/pkg/storage/providers/storage_cache.go

Lines changed: 57 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ import (
88
"io"
99
"os"
1010
"path/filepath"
11+
"sync"
1112
"time"
1213

1314
"github.com/google/uuid"
1415
"go.opentelemetry.io/otel"
1516
"go.opentelemetry.io/otel/attribute"
1617
"go.opentelemetry.io/otel/trace"
1718
"go.uber.org/zap"
18-
"golang.org/x/sync/errgroup"
1919

2020
"github.com/e2b-dev/infra/packages/shared/pkg/env"
2121
"github.com/e2b-dev/infra/packages/shared/pkg/storage"
@@ -25,8 +25,9 @@ import (
2525
var tracer = otel.Tracer("github.com/e2b-dev/infra/packages/shared/pkg/storage/providers")
2626

2727
const (
28-
cacheFilePermissions = 0o600
29-
cacheDirPermissions = 0o700
28+
cacheFilePermissions = 0o600
29+
cacheDirPermissions = 0o700
30+
maxCacheWriterConcurrency = 10
3031
)
3132

3233
var cacheRootPath = env.GetEnv("SHARED_CHUNK_CACHE_PATH", "")
@@ -185,17 +186,13 @@ func (c *CachedFileObjectProvider) WriteFromFileSystem(ctx context.Context, path
185186
// write the file to the disk and the remote system at the same time.
186187
// this opens the file twice, but the API makes it difficult to use a MultiWriter
187188

188-
var eg errgroup.Group
189+
go c.createCacheBlocksFromFile(ctx, path)
189190

190-
eg.Go(func() error {
191-
return c.createCacheBlocksFromFile(ctx, path)
192-
})
193-
194-
eg.Go(func() error {
195-
return c.inner.WriteFromFileSystem(ctx, path)
196-
})
191+
if err := c.inner.WriteFromFileSystem(ctx, path); err != nil {
192+
return fmt.Errorf("failed to write to remote storage: %w", err)
193+
}
197194

198-
return eg.Wait()
195+
return nil
199196
}
200197

201198
func (c *CachedFileObjectProvider) Write(ctx context.Context, src []byte) (int, error) {
@@ -332,46 +329,68 @@ func (c *CachedFileObjectProvider) Delete(ctx context.Context) error {
332329
return c.inner.Delete(ctx)
333330
}
334331

335-
func (c *CachedFileObjectProvider) createCacheBlocksFromFile(ctx context.Context, inputPath string) error {
336-
var err error
332+
func (c *CachedFileObjectProvider) createCacheBlocksFromFile(ctx context.Context, inputPath string) {
337333
ctx, span := tracer.Start(ctx, "CachedFileObjectProvider.createCacheBlocksFromFile")
338334
defer span.End()
339335

340336
input, err := os.Open(inputPath)
341337
if err != nil {
342-
return fmt.Errorf("failed to open file: %w", err)
338+
zap.L().Error("failed to open input file", zap.String("path", inputPath), zap.Error(err))
339+
return
343340
}
344341
defer cleanup("failed to close file", input)
345342

346343
stat, err := input.Stat()
347344
if err != nil {
348-
return fmt.Errorf("failed to get file size: %w", err)
345+
zap.L().Error("failed to stat input file", zap.String("path", inputPath), zap.Error(err))
346+
return
349347
}
350348

351349
totalSize := stat.Size()
352-
errs, ctx := errgroup.WithContext(ctx)
353-
errs.SetLimit(10)
350+
var wg sync.WaitGroup
351+
workers := make(chan struct{}, maxCacheWriterConcurrency)
354352
for offset := int64(0); offset < totalSize; offset += c.chunkSize {
355-
func(offset, totalSize int64) {
356-
errs.Go(func() error {
357-
if fname, err := c.writeChunkFromFile(ctx, offset, totalSize, input); err != nil {
358-
safelyRemoveFile(fname)
359-
return err
360-
}
361-
return nil
362-
})
363-
}(offset, totalSize)
353+
wg.Add(1)
354+
go func(offset int64) {
355+
defer wg.Done()
356+
357+
// limit concurrency
358+
workers <- struct{}{}
359+
defer func() { <-workers }()
360+
361+
if err := c.writeChunkFromFile(ctx, offset, input); err != nil {
362+
zap.L().Error("failed to write chunk file",
363+
zap.String("path", inputPath),
364+
zap.Int64("offset", offset),
365+
zap.Error(err))
366+
}
367+
}(offset)
364368
}
365-
return errs.Wait()
369+
wg.Wait()
370+
}
371+
372+
type offsetReader struct {
373+
wrapped io.ReaderAt
374+
offset int64
375+
}
376+
377+
var _ io.Reader = (*offsetReader)(nil)
378+
379+
func (r *offsetReader) Read(p []byte) (n int, err error) {
380+
n, err = r.wrapped.ReadAt(p, r.offset)
381+
r.offset += int64(n)
382+
return
383+
}
384+
385+
func newOffsetReader(file *os.File, offset int64) *offsetReader {
386+
return &offsetReader{file, offset}
366387
}
367388

368389
// writeChunkFromFile writes a piece of a local file. It does not need to worry about race conditions, as it will only
369390
// be called when building templates, and templates cannot be built on multiple machines at the same time.x
370-
func (c *CachedFileObjectProvider) writeChunkFromFile(ctx context.Context, offset int64, totalSize int64, input *os.File) (string, error) {
371-
var err error
372-
ctx, span := tracer.Start(ctx, "CachedFileObjectProvider.writeChunkFromFile", trace.WithAttributes(
391+
func (c *CachedFileObjectProvider) writeChunkFromFile(ctx context.Context, offset int64, input *os.File) error {
392+
_, span := tracer.Start(ctx, "write chunk-from-file", trace.WithAttributes(
373393
attribute.Int64("offset", offset),
374-
attribute.Int64("total_size", totalSize),
375394
))
376395
defer span.End()
377396

@@ -380,40 +399,17 @@ func (c *CachedFileObjectProvider) writeChunkFromFile(ctx context.Context, offse
380399

381400
output, err := os.OpenFile(chunkPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, cacheFilePermissions)
382401
if err != nil {
383-
return chunkPath, fmt.Errorf("failed to open file %s: %w", chunkPath, err)
402+
return fmt.Errorf("failed to open file %s: %w", chunkPath, err)
384403
}
385404
defer cleanup("failed to close file", output)
386405

387-
expectedRead := min(c.chunkSize, totalSize-offset)
388-
totalRead := int64(0)
389-
buffer := make([]byte, min(32*1024, expectedRead))
390-
for totalRead < expectedRead {
391-
select {
392-
case <-ctx.Done():
393-
return chunkPath, ctx.Err()
394-
default:
395-
}
396-
397-
read, err := input.ReadAt(buffer, offset+totalRead)
398-
if ignoreEOF(err) != nil {
399-
return chunkPath, fmt.Errorf("failed to write to %q [%d bytes @ %d]: %w",
400-
chunkPath, c.chunkSize, offset, err)
401-
} else if read == 0 {
402-
return chunkPath, fmt.Errorf("empty read at %d+%d", offset, totalRead)
403-
}
404-
405-
if _, err = output.Write(buffer[:read]); err != nil {
406-
return chunkPath, fmt.Errorf("failed to write to %q [%d bytes @ %d]: %w",
407-
chunkPath, c.chunkSize, offset, err)
408-
}
409-
totalRead += int64(read)
410-
411-
if errors.Is(err, io.EOF) {
412-
return chunkPath, nil
413-
}
406+
offsetReader := newOffsetReader(input, offset)
407+
if _, err := io.CopyN(output, offsetReader, c.chunkSize); ignoreEOF(err) != nil {
408+
safelyRemoveFile(chunkPath)
409+
return fmt.Errorf("failed to copy chunk: %w", err)
414410
}
415411

416-
return chunkPath, nil
412+
return err // in case err == io.EOF
417413
}
418414

419415
func ignoreFileMissingError(err error) error {

packages/shared/pkg/storage/providers/storage_cache_test.go

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,11 @@ func TestCachedFileObjectProvider_ReadAt(t *testing.T) {
164164
})
165165

166166
t.Run("WriteFromFileSystem should write to cache", func(t *testing.T) {
167-
fakeData := generateBytes(t, 11*1024*1024) // 11 MB
167+
const megabyte = 1024 * 1024
168+
const fileSize = 11 * megabyte
169+
const chunkSize = 2 * megabyte
170+
171+
fakeData := generateBytes(t, fileSize)
168172

169173
fakeStorageObjectProvider := storagemocks.NewMockStorageObjectProvider(t)
170174
fakeStorageObjectProvider.
@@ -175,7 +179,7 @@ func TestCachedFileObjectProvider_ReadAt(t *testing.T) {
175179
tempDir := t.TempDir()
176180
c := CachedFileObjectProvider{
177181
path: tempDir,
178-
chunkSize: 2 * 1024 * 1024, // 2 MB
182+
chunkSize: chunkSize,
179183
inner: fakeStorageObjectProvider,
180184
}
181185

@@ -188,22 +192,32 @@ func TestCachedFileObjectProvider_ReadAt(t *testing.T) {
188192
err = c.WriteFromFileSystem(t.Context(), inputFile)
189193
require.NoError(t, err)
190194

195+
time.Sleep(time.Millisecond * 20)
196+
191197
// ensure remote is not called
192198
c.inner = nil
193199

194200
// read bytes 4-6 MB
195-
buffer := make([]byte, 2*1024*1024) // 2 MB buffer
196-
read, err := c.ReadAt(t.Context(), buffer, 4*1024*1024) // read 4-6 MB
201+
buffer := make([]byte, chunkSize)
202+
read, err := c.ReadAt(t.Context(), buffer, 4*megabyte) // read 4-6 MB
197203
require.NoError(t, err)
198-
assert.Equal(t, fakeData[4*1024*1024:6*1024*1024], buffer)
204+
assert.Equal(t, fakeData[4*megabyte:6*megabyte], buffer)
199205
assert.Equal(t, len(buffer), read)
200206

201207
// read bytes 10-11 MB
202-
buffer = make([]byte, 1*1024*1024) // 2 MB buffer
203-
read, err = c.ReadAt(t.Context(), buffer, 10*1024*1024) // read 4-6 MB
204-
require.NoError(t, err)
205-
assert.Equal(t, fakeData[10*1024*1024:], buffer)
206-
assert.Equal(t, len(buffer), read)
208+
buffer = make([]byte, chunkSize)
209+
read, err = c.ReadAt(t.Context(), buffer, 10*megabyte) // read 10-11 MB
210+
require.ErrorIs(t, err, io.EOF)
211+
assert.Equal(t, megabyte, read) // short read
212+
assert.Equal(t, fakeData[10*megabyte:], buffer[:read])
213+
214+
// verify all chunk files are len(file) == chunkSize
215+
for offset := int64(0); offset < fileSize; offset += chunkSize {
216+
fname := c.makeChunkFilename(offset)
217+
info, err := os.Stat(fname)
218+
require.NoError(t, err)
219+
assert.Equal(t, min(chunkSize, fileSize-offset), info.Size())
220+
}
207221
})
208222

209223
t.Run("ReadFrom should read from cache", func(t *testing.T) {

0 commit comments

Comments
 (0)