Skip to content

Commit b6a786c

Browse files
committed
Merge remote-tracking branch 'origin/main' into tmp-5
# Conflicts: # packages/orchestrator/internal/template/build/builder.go
2 parents b67e6de + c6600ba commit b6a786c

File tree

15 files changed

+110
-46
lines changed

15 files changed

+110
-46
lines changed

packages/orchestrator/benchmark_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import (
3636
featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags"
3737
"github.com/e2b-dev/infra/packages/shared/pkg/limit"
3838
sbxlogger "github.com/e2b-dev/infra/packages/shared/pkg/logger/sandbox"
39-
"github.com/e2b-dev/infra/packages/shared/pkg/smap"
4039
"github.com/e2b-dev/infra/packages/shared/pkg/storage"
4140
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
4241
"github.com/e2b-dev/infra/packages/shared/pkg/utils"
@@ -210,7 +209,7 @@ func BenchmarkBaseImageLaunch(b *testing.B) {
210209

211210
var proxyPort uint16 = 5007
212211

213-
sandboxes := smap.New[*sandbox.Sandbox]()
212+
sandboxes := sandbox.NewSandboxesMap()
214213

215214
sandboxProxy, err := proxy.NewSandboxProxy(noop.MeterProvider{}, proxyPort, sandboxes)
216215
require.NoError(b, err)

packages/orchestrator/cmd/build-template/main.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags"
2929
l "github.com/e2b-dev/infra/packages/shared/pkg/logger"
3030
sbxlogger "github.com/e2b-dev/infra/packages/shared/pkg/logger/sandbox"
31-
"github.com/e2b-dev/infra/packages/shared/pkg/smap"
3231
"github.com/e2b-dev/infra/packages/shared/pkg/storage"
3332
)
3433

@@ -87,7 +86,7 @@ func buildTemplate(
8786

8887
// The sandbox map is shared between the server and the proxy
8988
// to propagate information about sandbox routing.
90-
sandboxes := smap.New[*sandbox.Sandbox]()
89+
sandboxes := sandbox.NewSandboxesMap()
9190

9291
sandboxProxy, err := proxy.NewSandboxProxy(noop.MeterProvider{}, proxyPort, sandboxes)
9392
if err != nil {

packages/orchestrator/internal/hyperloopserver/handlers/store.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,19 @@ import (
1212

1313
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox"
1414
api "github.com/e2b-dev/infra/packages/shared/pkg/http/hyperloop"
15-
"github.com/e2b-dev/infra/packages/shared/pkg/smap"
1615
)
1716

1817
const CollectorExporterTimeout = 10 * time.Second
1918

2019
type APIStore struct {
2120
logger *zap.Logger
22-
sandboxes *smap.Map[*sandbox.Sandbox]
21+
sandboxes *sandbox.Map
2322

2423
collectorClient http.Client
2524
collectorAddr string
2625
}
2726

28-
func NewHyperloopStore(logger *zap.Logger, sandboxes *smap.Map[*sandbox.Sandbox], sandboxCollectorAddr string) *APIStore {
27+
func NewHyperloopStore(logger *zap.Logger, sandboxes *sandbox.Map, sandboxCollectorAddr string) *APIStore {
2928
return &APIStore{
3029
logger: logger,
3130
sandboxes: sandboxes,

packages/orchestrator/internal/hyperloopserver/server.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@ import (
1515
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox"
1616
"github.com/e2b-dev/infra/packages/shared/pkg/env"
1717
api "github.com/e2b-dev/infra/packages/shared/pkg/http/hyperloop"
18-
"github.com/e2b-dev/infra/packages/shared/pkg/smap"
1918
)
2019

2120
const maxUploadLimit = 1 << 28 // 256 MiB
2221

23-
func NewHyperloopServer(ctx context.Context, port uint16, logger *zap.Logger, sandboxes *smap.Map[*sandbox.Sandbox]) (*http.Server, error) {
22+
func NewHyperloopServer(ctx context.Context, port uint16, logger *zap.Logger, sandboxes *sandbox.Map) (*http.Server, error) {
2423
sandboxCollectorAddr := env.LogsCollectorAddress()
2524
store := handlers.NewHyperloopStore(logger, sandboxes, sandboxCollectorAddr)
2625
swagger, err := api.GetSwagger()

packages/orchestrator/internal/metrics/sandboxes.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox"
2020
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
2121
sbxlogger "github.com/e2b-dev/infra/packages/shared/pkg/logger/sandbox"
22-
"github.com/e2b-dev/infra/packages/shared/pkg/smap"
2322
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
2423
"github.com/e2b-dev/infra/packages/shared/pkg/utils"
2524
)
@@ -51,7 +50,7 @@ type SandboxObserver struct {
5150
registration metric.Registration
5251
exportInterval time.Duration
5352

54-
sandboxes *smap.Map[*sandbox.Sandbox]
53+
sandboxes *sandbox.Map
5554

5655
meter metric.Meter
5756
cpuTotal metric.Int64ObservableGauge
@@ -62,7 +61,7 @@ type SandboxObserver struct {
6261
diskUsed metric.Int64ObservableGauge
6362
}
6463

65-
func NewSandboxObserver(ctx context.Context, nodeID, serviceName, serviceCommit, serviceVersion, serviceInstanceID string, sandboxes *smap.Map[*sandbox.Sandbox]) (*SandboxObserver, error) {
64+
func NewSandboxObserver(ctx context.Context, nodeID, serviceName, serviceCommit, serviceVersion, serviceInstanceID string, sandboxes *sandbox.Map) (*SandboxObserver, error) {
6665
deltaTemporality := otlpmetricgrpc.WithTemporalitySelector(func(kind sdkmetric.InstrumentKind) metricdata.Temporality {
6766
// Use delta temporality for gauges and cumulative for all other instrument kinds.
6867
// This is used to prevent reporting sandbox metrics indefinitely.

packages/orchestrator/internal/proxy/proxy.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
1515
reverseproxy "github.com/e2b-dev/infra/packages/shared/pkg/proxy"
1616
"github.com/e2b-dev/infra/packages/shared/pkg/proxy/pool"
17-
"github.com/e2b-dev/infra/packages/shared/pkg/smap"
1817
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
1918
)
2019

@@ -29,7 +28,7 @@ type SandboxProxy struct {
2928
proxy *reverseproxy.Proxy
3029
}
3130

32-
func NewSandboxProxy(meterProvider metric.MeterProvider, port uint16, sandboxes *smap.Map[*sandbox.Sandbox]) (*SandboxProxy, error) {
31+
func NewSandboxProxy(meterProvider metric.MeterProvider, port uint16, sandboxes *sandbox.Map) (*SandboxProxy, error) {
3332
proxy := reverseproxy.New(
3433
port,
3534
idleTimeout,
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package sandbox
2+
3+
import (
4+
"sync"
5+
6+
"github.com/e2b-dev/infra/packages/shared/pkg/smap"
7+
)
8+
9+
type MapSubscriber interface {
10+
OnInsert(sandbox *Sandbox)
11+
OnRemove(sandboxID string)
12+
}
13+
14+
type Map struct {
15+
sandboxes *smap.Map[*Sandbox]
16+
17+
subs []MapSubscriber
18+
subsLock sync.RWMutex
19+
}
20+
21+
func (m *Map) Subscribe(subscriber MapSubscriber) {
22+
m.subsLock.Lock()
23+
defer m.subsLock.Unlock()
24+
25+
m.subs = append(m.subs, subscriber)
26+
}
27+
28+
func (m *Map) trigger(fn func(MapSubscriber)) {
29+
m.subsLock.RLock()
30+
defer m.subsLock.RUnlock()
31+
32+
for _, subscriber := range m.subs {
33+
fn(subscriber)
34+
}
35+
}
36+
37+
func (m *Map) Items() map[string]*Sandbox {
38+
return m.sandboxes.Items()
39+
}
40+
41+
func (m *Map) Count() int {
42+
return m.sandboxes.Count()
43+
}
44+
45+
func (m *Map) Get(sandboxID string) (*Sandbox, bool) {
46+
return m.sandboxes.Get(sandboxID)
47+
}
48+
49+
func (m *Map) Insert(sbx *Sandbox) {
50+
m.sandboxes.Insert(sbx.Runtime.SandboxID, sbx)
51+
52+
go m.trigger(func(s MapSubscriber) {
53+
s.OnInsert(sbx)
54+
})
55+
}
56+
57+
func (m *Map) Remove(sandboxID string) {
58+
m.sandboxes.Remove(sandboxID)
59+
60+
go m.trigger(func(s MapSubscriber) {
61+
s.OnRemove(sandboxID)
62+
})
63+
}
64+
65+
func (m *Map) RemoveByExecutionID(sandboxID, executionID string) {
66+
removed := m.sandboxes.RemoveCb(sandboxID, func(_ string, v *Sandbox, exists bool) bool {
67+
if !exists {
68+
return false
69+
}
70+
71+
if v == nil {
72+
return false
73+
}
74+
75+
return v.Runtime.ExecutionID == executionID
76+
})
77+
78+
if removed {
79+
go m.trigger(func(s MapSubscriber) {
80+
s.OnRemove(sandboxID)
81+
})
82+
}
83+
}
84+
85+
func NewSandboxesMap() *Map {
86+
return &Map{sandboxes: smap.New[*Sandbox]()}
87+
}

packages/orchestrator/internal/server/main.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"github.com/e2b-dev/infra/packages/shared/pkg/events/event"
2020
featureflags "github.com/e2b-dev/infra/packages/shared/pkg/feature-flags"
2121
"github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator"
22-
"github.com/e2b-dev/infra/packages/shared/pkg/smap"
2322
"github.com/e2b-dev/infra/packages/shared/pkg/storage"
2423
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
2524
)
@@ -29,7 +28,7 @@ type server struct {
2928

3029
sandboxFactory *sandbox.Factory
3130
info *service.ServiceInfo
32-
sandboxes *smap.Map[*sandbox.Sandbox]
31+
sandboxes *sandbox.Map
3332
proxy *proxy.SandboxProxy
3433
networkPool *network.Pool
3534
templateCache *template.Cache
@@ -58,7 +57,7 @@ type ServiceConfig struct {
5857
Info *service.ServiceInfo
5958
Proxy *proxy.SandboxProxy
6059
SandboxFactory *sandbox.Factory
61-
Sandboxes *smap.Map[*sandbox.Sandbox]
60+
Sandboxes *sandbox.Map
6261
Persistence storage.StorageProvider
6362
FeatureFlags *featureflags.Client
6463
SbxEventsService events.EventsService[event.SandboxEvent]

packages/orchestrator/internal/server/sandboxes.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func (s *server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ
134134
return nil, status.Errorf(codes.Internal, "failed to create sandbox: %s", err)
135135
}
136136

137-
s.sandboxes.Insert(req.GetSandbox().GetSandboxId(), sbx)
137+
s.sandboxes.Insert(sbx)
138138
go func() {
139139
ctx, childSpan := tracer.Start(context.WithoutCancel(ctx), "sandbox-create-stop", trace.WithNewRoot())
140140
defer childSpan.End()
@@ -152,17 +152,7 @@ func (s *server) Create(ctx context.Context, req *orchestrator.SandboxCreateRequ
152152
// Remove the sandbox from cache only if the cleanup IDs match.
153153
// This prevents us from accidentally removing started sandbox (via resume) from the cache if cleanup is taking longer than the request timeout.
154154
// This could have caused the "invisible" sandboxes that are not in orchestrator or API, but are still on client.
155-
s.sandboxes.RemoveCb(req.GetSandbox().GetSandboxId(), func(_ string, v *sandbox.Sandbox, exists bool) bool {
156-
if !exists {
157-
return false
158-
}
159-
160-
if v == nil {
161-
return false
162-
}
163-
164-
return sbx.Runtime.ExecutionID == v.Runtime.ExecutionID
165-
})
155+
s.sandboxes.RemoveByExecutionID(req.GetSandbox().GetSandboxId(), sbx.Runtime.ExecutionID)
166156

167157
// Remove the proxies assigned to the sandbox from the pool to prevent them from being reused.
168158
s.proxy.RemoveFromPool(sbx.Runtime.ExecutionID)

packages/orchestrator/internal/server/sandboxes_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"github.com/e2b-dev/infra/packages/orchestrator/internal/service"
1313
"github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator"
1414
"github.com/e2b-dev/infra/packages/shared/pkg/id"
15-
"github.com/e2b-dev/infra/packages/shared/pkg/smap"
1615
)
1716

1817
var (
@@ -68,11 +67,11 @@ func Test_server_List(t *testing.T) {
6867
for _, tt := range tests {
6968
t.Run(tt.name, func(t *testing.T) {
7069
s := &server{
71-
sandboxes: smap.New[*sandbox.Sandbox](),
70+
sandboxes: sandbox.NewSandboxesMap(),
7271
info: &service.ServiceInfo{},
7372
}
7473
for _, sbx := range tt.data {
75-
s.sandboxes.Insert(sbx.Runtime.SandboxID, sbx)
74+
s.sandboxes.Insert(sbx)
7675
}
7776
got, err := s.List(t.Context(), tt.args.in1)
7877
if (err != nil) != tt.wantErr {

0 commit comments

Comments
 (0)