Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion packages/orchestrator/internal/template/build/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,32 @@ func (b *Builder) Build(ctx context.Context, template storage.TemplateFiles, cfg
return runBuild(ctx, logger, buildContext, b)
}

func (b *Builder) useNFSCache(ctx context.Context) (string, bool) {
if b.config.SharedChunkCacheDir == "" {
// can't enable cache if we don't have a cache path
return "", false
}

flag, err := b.featureFlags.BoolFlag(ctx, featureflags.BuildingFeatureFlagName)
if err != nil {
zap.L().Error("failed to get nfs cache feature flag", zap.Error(err))
}

return b.config.SharedChunkCacheDir, flag
}

func runBuild(
ctx context.Context,
userLogger *zap.Logger,
bc buildcontext.BuildContext,
builder *Builder,
) (*Result, error) {
index := cache.NewHashIndex(bc.CacheScope, builder.buildStorage, builder.templateStorage)
templateStorage := builder.templateStorage
if path, ok := builder.useNFSCache(ctx); ok {
templateStorage = storage.NewCachedProvider(path, templateStorage)
}

index := cache.NewHashIndex(bc.CacheScope, builder.buildStorage, templateStorage)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: NFS cache wrapping not applied to storage operations

The templateStorage variable is wrapped with NewCachedProvider at lines 229-231 to enable NFS caching, but the unwrapped builder.templateStorage is then passed to layerExecutor (line 241), baseBuilder (line 251), and postProcessingBuilder (line 293). Additionally, the unwrapped builder.templateStorage is used at line 327 for getRootfsSize. This means the cache wrapping is never actually used for any storage operations, defeating the entire purpose of the caching feature introduced in this PR.

Fix in Cursor Fix in Web


layerExecutor := layer.NewLayerExecutor(
bc,
Expand Down
1 change: 1 addition & 0 deletions packages/shared/pkg/feature-flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (
MetricsReadFlagName = newBoolFlag("sandbox-metrics-read", env.IsDevelopment())
SnapshotFeatureFlagName = newBoolFlag("use-nfs-for-snapshots", env.IsDevelopment())
TemplateFeatureFlagName = newBoolFlag("use-nfs-for-templates", env.IsDevelopment())
BuildingFeatureFlagName = newBoolFlag("use-nfs-for-building-templates", env.IsDevelopment())
BestOfKPlacementAlgorithm = newBoolFlag("best-of-k-placement-algorithm", env.IsDevelopment())
BestOfKCanFit = newBoolFlag("best-of-k-can-fit", true)
BestOfKTooManyStarting = newBoolFlag("best-of-k-too-many-starting", false)
Expand Down
98 changes: 98 additions & 0 deletions packages/shared/pkg/storage/lock/atomic_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package lock

import (
"errors"
"fmt"
"io"
"os"
"sync"

"github.com/google/uuid"
"go.uber.org/zap"
)

type AtomicFile struct {
lockFile *os.File
tempFile *os.File
filename string

closeOnce sync.Once
}

func (f *AtomicFile) Write(p []byte) (n int, err error) {
return f.tempFile.Write(p)
}

var _ io.Writer = (*AtomicFile)(nil)

func OpenFile(filename string) (*AtomicFile, error) {
lockFile, err := TryAcquireLock(filename)
if err != nil {
return nil, err
}

tempFilename := fmt.Sprintf("%s.temp.%s", filename, uuid.NewString())
tempFile, err := os.OpenFile(tempFilename, os.O_WRONLY|os.O_CREATE, 0o600)
if err != nil {
cleanup("failed to close lock file", lockFile.Close)

return nil, fmt.Errorf("failed to open temp file: %w", err)
}

return &AtomicFile{
lockFile: lockFile,
tempFile: tempFile,
filename: filename,
}, nil
}

func (f *AtomicFile) Close() error {
var err error

f.closeOnce.Do(func() {
defer cleanup("failed to unlock file", func() error {
return ReleaseLock(f.lockFile)
})

if err = f.tempFile.Close(); err != nil {
err = fmt.Errorf("failed to close temp file: %w", err)

return
}

if err = moveWithoutReplace(f.tempFile.Name(), f.filename); err != nil {
err = fmt.Errorf("failed to commit file: %w", err)

return
}
})

return err
}

func cleanup(msg string, fn func() error) {
if err := fn(); err != nil {
zap.L().Warn(msg, zap.Error(err))
}
}

// moveWithoutReplace tries to rename a file but will not replace the target if it already exists.
// If the file already exists, the file will be deleted.
func moveWithoutReplace(oldPath, newPath string) error {
defer func() {
if err := os.Remove(oldPath); err != nil {
zap.L().Warn("failed to remove existing file", zap.Error(err))
}
}()

if err := os.Link(oldPath, newPath); err != nil {
if errors.Is(err, os.ErrExist) {
// Someone else created newPath first. Treat as success.
return nil
}

return err
}

return nil
}
74 changes: 74 additions & 0 deletions packages/shared/pkg/storage/lock/atomic_file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package lock

import (
"io/fs"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestOpenFile(t *testing.T) {
t.Run("happy path", func(t *testing.T) {
expected := []byte("hello")

tempDir := t.TempDir()
filename := filepath.Join(tempDir, "test.bin")

f, err := OpenFile(filename)
require.NoError(t, err)
require.NotNil(t, f)

count, err := f.Write(expected)
require.NoError(t, err)
assert.Equal(t, len(expected), count)

_, err = os.Stat("test.bin")
require.Error(t, err)
assert.True(t, os.IsNotExist(err))

err = f.Close()
require.NoError(t, err)

data, err := os.ReadFile(filename)
require.NoError(t, err)
assert.Equal(t, expected, data)
})

t.Run("two files cannot be opened at the same time", func(t *testing.T) {
tempDir := t.TempDir()
filename := filepath.Join(tempDir, "test.bin")

f1, err := OpenFile(filename)
require.NoError(t, err)
t.Cleanup(func() {
err := f1.Close()
assert.NoError(t, err)
})

f2, err := OpenFile(filename)
require.ErrorIs(t, err, ErrLockAlreadyHeld)
assert.Nil(t, f2)

err = f1.Close()
require.NoError(t, err)

f2, err = OpenFile(filename)
require.NoError(t, err)
t.Cleanup(func() {
err := f2.Close()
assert.NoError(t, err)
})
})

t.Run("missing directory returns error", func(t *testing.T) {
tempDir := t.TempDir()
filename := filepath.Join(tempDir, "a", "b", "test.bin")

_, err := OpenFile(filename)
require.Error(t, err)
assert.ErrorIs(t, err, fs.ErrNotExist)
})
}
8 changes: 6 additions & 2 deletions packages/shared/pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,18 @@ type WriterToCtx interface {
WriteTo(ctx context.Context, w io.Writer) (n int64, err error)
}

type WriteFromFSCtx interface {
WriteFromFileSystem(ctx context.Context, path string) error
}

type ReaderAtCtx interface {
ReadAt(ctx context.Context, p []byte, off int64) (n int, err error)
}

type ObjectProvider interface {
// write
WriterCtx
WriteFromFileSystem(ctx context.Context, path string) error
WriteFromFSCtx

// read
WriterToCtx
Expand All @@ -90,7 +94,7 @@ type ObjectProvider interface {

type SeekableObjectProvider interface {
// write
WriteFromFileSystem(ctx context.Context, path string) error
WriteFromFSCtx

// read
ReaderAtCtx
Expand Down
28 changes: 23 additions & 5 deletions packages/shared/pkg/storage/storage_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
"path/filepath"
"time"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
Expand All @@ -34,10 +35,6 @@ var (
"Total bytes written to the cache",
"Total writes to the cache",
))
cacheHits = utils.Must(meter.Int64Counter("orchestrator.storage.cache.hits",
metric.WithDescription("total cache hits")))
cacheMisses = utils.Must(meter.Int64Counter("orchestrator.storage.cache.misses",
metric.WithDescription("total cache misses")))
)

type CachedProvider struct {
Expand All @@ -53,6 +50,8 @@ func NewCachedProvider(rootPath string, inner StorageProvider) *CachedProvider {
}

func (c CachedProvider) DeleteObjectsWithPrefix(ctx context.Context, prefix string) error {
go c.deleteObjectsWithPrefix(prefix)

return c.inner.DeleteObjectsWithPrefix(ctx, prefix)
}

Expand Down Expand Up @@ -93,6 +92,16 @@ func (c CachedProvider) GetDetails() string {
c.rootPath, c.inner.GetDetails())
}

func (c CachedProvider) deleteObjectsWithPrefix(prefix string) {
fullPrefix := filepath.Join(c.rootPath, prefix)
if err := os.RemoveAll(fullPrefix); err != nil {
zap.L().Error("failed to remove object with prefix",
zap.String("prefix", prefix),
zap.String("path", fullPrefix),
zap.Error(err))
}
}

func cleanup(msg string, fn func() error) {
if err := fn(); err != nil {
zap.L().Warn(msg, zap.Error(err))
Expand Down Expand Up @@ -127,3 +136,12 @@ func moveWithoutReplace(oldPath, newPath string) error {

return nil
}

func recordError(span trace.Span, err error) {
if err == nil {
return
}

span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
58 changes: 58 additions & 0 deletions packages/shared/pkg/storage/storage_cache_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package storage

import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/e2b-dev/infra/packages/shared/pkg/utils"
)

var (
cacheOpCounter = utils.Must(meter.Int64Counter("orchestrator.storage.cache.ops",
metric.WithDescription("total cache operations")))
cacheBytesCounter = utils.Must(meter.Int64Counter("orchestrator.storage.cache.bytes",
metric.WithDescription("total cache bytes processed"),
metric.WithUnit("byte")))
)

type cacheOp string

const (
cacheOpWriteTo cacheOp = "write_to"
cacheOpReadAt cacheOp = "read_at"
cacheOpSize cacheOp = "size"

cacheOpWrite cacheOp = "write"
cacheOpWriteFromFileSystem cacheOp = "write_from_filesystem"
)

func recordCacheRead(ctx context.Context, isHit bool, bytesRead int64, op cacheOp) {
cacheOpCounter.Add(ctx, 1, metric.WithAttributes(
attribute.Bool("cache_hit", isHit),
attribute.String("operation", string(op)),
))

cacheBytesCounter.Add(ctx, bytesRead, metric.WithAttributes(
attribute.Bool("cache_hit", isHit),
attribute.String("operation", string(op)),
))
}

func recordCacheWrite(ctx context.Context, bytesWritten int64, op cacheOp) {
cacheOpCounter.Add(ctx, 1, metric.WithAttributes(
attribute.String("operation", string(op)),
))

cacheBytesCounter.Add(ctx, bytesWritten, metric.WithAttributes(
attribute.String("operation", string(op)),
))
}

func recordCacheError[T ~string](ctx context.Context, op T, err error) {
cacheOpCounter.Add(ctx, 1, metric.WithAttributes(
attribute.String("error", err.Error()),
attribute.String("operation", string(op)),
))
}
Loading
Loading