Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
c8cc3a1
upgrade otel, merge 'require' blocks
djeebus Sep 12, 2025
ca8e323
more tracing
djeebus Sep 11, 2025
d5cdd57
context stuff
djeebus Sep 12, 2025
9d83d0b
this isn't the right place to split traces
djeebus Sep 12, 2025
9670c08
a little more clean up
djeebus Sep 12, 2025
80112c3
trace the envd requests
djeebus Sep 12, 2025
4a711fc
more context passing
djeebus Sep 13, 2025
43131f1
even more context passing!
djeebus Sep 13, 2025
c7e6e41
so much tracing!
djeebus Sep 13, 2025
2e88710
trace the init, health, and metrics requests
djeebus Sep 15, 2025
7de1009
Merge branch 'main' into straighten-out-tracing
djeebus Sep 17, 2025
8215330
Merge branch 'main' into straighten-out-tracing
djeebus Sep 19, 2025
6808ef4
clean up
djeebus Sep 19, 2025
e764a29
Merge branch 'main' into straighten-out-tracing
djeebus Sep 22, 2025
fcd8214
remove some very chatty spans
djeebus Sep 22, 2025
7edb7f2
clean up trace names
djeebus Sep 22, 2025
f0a69c4
Merge remote-tracking branch 'origin/main' into straighten-out-tracing
djeebus Sep 23, 2025
00d052d
Merge branch 'main' into straighten-out-tracing
djeebus Oct 1, 2025
34b978e
Merge remote-tracking branch 'origin/main' into straighten-out-tracing
djeebus Oct 2, 2025
5004763
revert changes to this file
djeebus Oct 2, 2025
41416f2
clean up more tracing
djeebus Oct 2, 2025
283e7d7
add some tracing to the network pool
djeebus Oct 3, 2025
fec88fa
Merge remote-tracking branch 'origin/main' into straighten-out-tracing
djeebus Oct 6, 2025
2dd0a10
lint
djeebus Oct 6, 2025
d836857
Merge remote-tracking branch 'origin/main' into straighten-out-tracing
djeebus Oct 6, 2025
d9b689d
Merge remote-tracking branch 'origin/main' into straighten-out-tracing
djeebus Oct 7, 2025
7d2536b
remove some chatty spans
djeebus Oct 7, 2025
47e3f7e
add some useful attributes
djeebus Oct 7, 2025
a6d3b9d
linting
djeebus Oct 7, 2025
08ea413
put the write command where it should be, add attrs
djeebus Oct 7, 2025
4e8688a
store more trace attributes
djeebus Oct 7, 2025
bd25570
linting
djeebus Oct 7, 2025
945c12b
Merge branch 'main' into straighten-out-tracing
djeebus Oct 7, 2025
88f27d2
Merge remote-tracking branch 'origin/main' into straighten-out-tracing
djeebus Oct 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/orchestrator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ require (
github.com/vishvananda/netlink v1.3.1-0.20240922070040-084abd93d350
github.com/vishvananda/netns v0.0.5
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.63.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0
go.opentelemetry.io/otel v1.38.0
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.14.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.38.0
Expand Down Expand Up @@ -232,6 +234,7 @@ require (
go.opentelemetry.io/auto/sdk v1.2.0 // indirect
go.opentelemetry.io/contrib/bridges/otelzap v0.13.0 // indirect
go.opentelemetry.io/contrib/detectors/gcp v1.38.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.14.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions packages/orchestrator/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion packages/orchestrator/internal/sandbox/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ import (
"io"

"github.com/google/uuid"
"go.opentelemetry.io/otel"
"go.uber.org/zap"

blockmetrics "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/block/metrics"
"github.com/e2b-dev/infra/packages/shared/pkg/storage"
"github.com/e2b-dev/infra/packages/shared/pkg/storage/header"
)

var tracer = otel.Tracer("github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/build")

type File struct {
header *header.Header
store *DiffStore
Expand Down Expand Up @@ -81,7 +84,8 @@ func (b *File) ReadAt(ctx context.Context, p []byte, off int64) (n int, err erro
return 0, fmt.Errorf("failed to get build: %w", err)
}

buildN, err := mappedBuild.ReadAt(ctx,
buildN, err := mappedBuild.ReadAt(
ctx,
p[n:int64(n)+readLength],
mappedOffset,
)
Expand Down
73 changes: 8 additions & 65 deletions packages/orchestrator/internal/sandbox/build/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package build
// causing a race when closing the cancel channel.

import (
"context"
"fmt"
"sync"
"testing"
Expand Down Expand Up @@ -69,16 +68,8 @@ func newDiffWithAsserts(t *testing.T, cachePath, buildId string, diffType DiffTy

func TestNewDiffStore(t *testing.T) {
cachePath := t.TempDir()
ctx, cancel := context.WithCancel(t.Context())
t.Cleanup(cancel)

store, err := NewDiffStore(
ctx,
cachePath,
25*time.Hour,
60*time.Second,
90.0,
)
store, err := NewDiffStore(t.Context(), cachePath, 25*time.Hour, 60*time.Second, 90.0)
t.Cleanup(store.Close)

require.NoError(t, err)
Expand All @@ -87,18 +78,10 @@ func TestNewDiffStore(t *testing.T) {

func TestDiffStoreTTLEviction(t *testing.T) {
cachePath := t.TempDir()
ctx, cancel := context.WithCancel(t.Context())
t.Cleanup(cancel)

ttl := 1 * time.Second
delay := 60 * time.Second
store, err := NewDiffStore(
ctx,
cachePath,
ttl,
delay,
100.0,
)
store, err := NewDiffStore(t.Context(), cachePath, ttl, delay, 100.0)
t.Cleanup(store.Close)
require.NoError(t, err)

Expand All @@ -117,18 +100,10 @@ func TestDiffStoreTTLEviction(t *testing.T) {

func TestDiffStoreRefreshTTLEviction(t *testing.T) {
cachePath := t.TempDir()
ctx, cancel := context.WithCancel(t.Context())
t.Cleanup(cancel)

ttl := 1 * time.Second
delay := 60 * time.Second
store, err := NewDiffStore(
ctx,
cachePath,
ttl,
delay,
100.0,
)
store, err := NewDiffStore(t.Context(), cachePath, ttl, delay, 100.0)
t.Cleanup(store.Close)
require.NoError(t, err)

Expand All @@ -153,18 +128,10 @@ func TestDiffStoreRefreshTTLEviction(t *testing.T) {

func TestDiffStoreDelayEviction(t *testing.T) {
cachePath := t.TempDir()
ctx, cancel := context.WithCancel(t.Context())
t.Cleanup(cancel)

ttl := 60 * time.Second
delay := 4 * time.Second
store, err := NewDiffStore(
ctx,
cachePath,
ttl,
delay,
0.0,
)
store, err := NewDiffStore(t.Context(), cachePath, ttl, delay, 0.0)
t.Cleanup(store.Close)
require.NoError(t, err)

Expand Down Expand Up @@ -194,18 +161,10 @@ func TestDiffStoreDelayEviction(t *testing.T) {

func TestDiffStoreDelayEvictionAbort(t *testing.T) {
cachePath := t.TempDir()
ctx, cancel := context.WithCancel(t.Context())
t.Cleanup(cancel)

ttl := 60 * time.Second
delay := 4 * time.Second
store, err := NewDiffStore(
ctx,
cachePath,
ttl,
delay,
0.0,
)
store, err := NewDiffStore(t.Context(), cachePath, ttl, delay, 0.0)
t.Cleanup(store.Close)
require.NoError(t, err)

Expand Down Expand Up @@ -305,19 +264,11 @@ func TestDiffStoreOldestFromCache(t *testing.T) {
// detector enabled: go test -race
func TestDiffStoreConcurrentEvictionRace(t *testing.T) {
cachePath := t.TempDir()
ctx, cancel := context.WithCancel(t.Context())
t.Cleanup(cancel)

// Use very short TTL and delay to trigger rapid evictions
ttl := 10 * time.Millisecond
delay := 50 * time.Millisecond
store, err := NewDiffStore(
ctx,
cachePath,
ttl,
delay,
0.0, // Set to 0% to trigger disk space evictions
)
store, err := NewDiffStore(t.Context(), cachePath, ttl, delay, 0.0)
t.Cleanup(store.Close)
require.NoError(t, err)

Expand Down Expand Up @@ -369,7 +320,7 @@ func TestDiffStoreConcurrentEvictionRace(t *testing.T) {
go func() {
defer wg.Done()
for range numIterations * 2 {
_, err = store.deleteOldestFromCache(t.Context())
_, err := store.deleteOldestFromCache(t.Context())
assert.NoError(t, err)
time.Sleep(time.Microsecond * 50)
}
Expand All @@ -389,19 +340,11 @@ func TestDiffStoreConcurrentEvictionRace(t *testing.T) {
// race condition by simulating the exact scenario from the race report
func TestDiffStoreResetDeleteRace(t *testing.T) {
cachePath := t.TempDir()
ctx, cancel := context.WithCancel(t.Context())
t.Cleanup(cancel)

// Very short TTL to trigger evictions quickly
ttl := 5 * time.Millisecond
delay := 100 * time.Millisecond
store, err := NewDiffStore(
ctx,
cachePath,
ttl,
delay,
100.0,
)
store, err := NewDiffStore(t.Context(), cachePath, ttl, delay, 100.0)
t.Cleanup(store.Close)
require.NoError(t, err)

Expand Down
10 changes: 10 additions & 0 deletions packages/orchestrator/internal/sandbox/build/storage_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"io"
"path/filepath"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/block"
blockmetrics "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/block/metrics"
"github.com/e2b-dev/infra/packages/shared/pkg/id"
Expand Down Expand Up @@ -59,6 +62,13 @@ func (b *StorageDiff) CacheKey() DiffStoreKey {
}

func (b *StorageDiff) Init(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "init StorageDiff", trace.WithAttributes(
attribute.String("storage_path", b.storagePath),
attribute.String("cache_path", b.cachePath),
attribute.String("cache_key", string(b.cacheKey)),
))
defer span.End()

obj, err := b.persistence.OpenObject(ctx, b.storagePath)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion packages/orchestrator/internal/sandbox/envd.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type PostInitJSONBody struct {
}

func (s *Sandbox) initEnvd(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "envd-init", trace.WithAttributes(telemetry.WithEnvdVersion(s.Config.Envd.Version)))
ctx, span := tracer.Start(ctx, "init envd", trace.WithAttributes(telemetry.WithEnvdVersion(s.Config.Envd.Version)))
defer span.End()

attributes := []attribute.KeyValue{telemetry.WithEnvdVersion(s.Config.Envd.Version), attribute.Int64("timeout_ms", s.internalConfig.EnvdInitRequestTimeout.Milliseconds())}
Expand Down
46 changes: 33 additions & 13 deletions packages/orchestrator/internal/sandbox/nbd/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"io"
"sync"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/e2b-dev/infra/packages/shared/pkg/storage"
Expand Down Expand Up @@ -110,11 +113,11 @@ func (d *Dispatch) writeResponse(respError uint32, respHandle uint64, chunk []by
return nil
}

/**
* This dispatches incoming NBD requests sequentially to the provider.
*
*/
// Handle dispatches incoming NBD requests sequentially to the provider.
func (d *Dispatch) Handle(ctx context.Context) error {
ctx, span := tracer.Start(ctx, "handle dispatch-requests")
defer span.End()

buffer := make([]byte, dispatchBufferSize)
wp := 0

Expand Down Expand Up @@ -181,7 +184,7 @@ func (d *Dispatch) Handle(ctx context.Context) error {
}
case NBDCmdTrim:
rp += 28
err := d.cmdTrim(request.Handle, request.From, request.Length)
err := d.cmdTrim(ctx, request.Handle, request.From, request.Length)
if err != nil {
return err
}
Expand Down Expand Up @@ -210,7 +213,7 @@ func (d *Dispatch) cmdRead(ctx context.Context, cmdHandle uint64, cmdFrom uint64
}
d.shuttingDownLock.Unlock()

performRead := func(handle uint64, from uint64, length uint32) error {
performRead := func(ctx context.Context, handle uint64, from uint64, length uint32) (bool, error) {
// buffered to avoid goroutine leak
errchan := make(chan error, 1)
data := make([]byte, length)
Expand All @@ -223,26 +226,35 @@ func (d *Dispatch) cmdRead(ctx context.Context, cmdHandle uint64, cmdFrom uint64
// Wait until either the ReadAt completed, or our context is cancelled...
select {
case <-ctx.Done():
return d.writeResponse(1, handle, []byte{})
return false, d.writeResponse(1, handle, []byte{})
case err := <-errchan:
if err != nil {
return d.writeResponse(1, handle, []byte{})
return false, d.writeResponse(1, handle, []byte{})
}
}

// read was successful
return d.writeResponse(0, handle, data)
return true, d.writeResponse(0, handle, data)
}

go func() {
err := performRead(cmdHandle, cmdFrom, cmdLength)
ctx, span := tracer.Start(ctx, "perform read command", trace.WithAttributes(
attribute.Int64("length", int64(cmdLength)),
attribute.Int64("offset", int64(cmdFrom)),
))
defer span.End()

ok, err := performRead(ctx, cmdHandle, cmdFrom, cmdLength)
if err != nil {
select {
case d.fatal <- err:
default:
zap.L().Error("nbd error cmd read", zap.Error(err))
}
}
if !ok {
span.SetStatus(codes.Error, "read failed")
}
d.pendingResponses.Done()
}()

Expand All @@ -259,7 +271,7 @@ func (d *Dispatch) cmdWrite(ctx context.Context, cmdHandle uint64, cmdFrom uint6
}
d.shuttingDownLock.Unlock()

performWrite := func(handle uint64, from uint64, data []byte) error {
performWrite := func(ctx context.Context, handle uint64, from uint64, data []byte) error {
// buffered to avoid goroutine leak
errchan := make(chan error, 1)
go func() {
Expand All @@ -282,7 +294,12 @@ func (d *Dispatch) cmdWrite(ctx context.Context, cmdHandle uint64, cmdFrom uint6
}

go func() {
err := performWrite(cmdHandle, cmdFrom, cmdData)
ctx, span := tracer.Start(ctx, "perform write command", trace.WithAttributes(
attribute.Int64("length", int64(len(cmdData))),
attribute.Int64("offset", int64(cmdFrom))))
defer span.End()

err := performWrite(ctx, cmdHandle, cmdFrom, cmdData)
if err != nil {
select {
case d.fatal <- err:
Expand All @@ -299,7 +316,10 @@ func (d *Dispatch) cmdWrite(ctx context.Context, cmdHandle uint64, cmdFrom uint6
* cmdTrim
*
*/
func (d *Dispatch) cmdTrim(handle uint64, _ uint64, _ uint32) error {
func (d *Dispatch) cmdTrim(ctx context.Context, handle uint64, _ uint64, _ uint32) error {
_, span := tracer.Start(ctx, "dispatch trim-command")
defer span.End()

// TODO: Ask the provider
/*
e := d.prov.Trim(from, length)
Expand Down
3 changes: 3 additions & 0 deletions packages/orchestrator/internal/sandbox/nbd/path_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func NewDirectPathMount(b block.Device, devicePool *DevicePool) *DirectPathMount
}

func (d *DirectPathMount) Open(ctx context.Context) (retDeviceIndex uint32, err error) {
ctx, span := tracer.Start(ctx, "open direct-path-mount")
defer span.End()

ctx, d.cancelfn = context.WithCancel(ctx)

defer func() {
Expand Down
Loading
Loading