Skip to content

Commit 02fb3ca

Browse files
authored
Use only the latest request to envd (#1293)
1 parent e9ff03f commit 02fb3ca

File tree

4 files changed

+153
-29
lines changed

4 files changed

+153
-29
lines changed

packages/envd/internal/api/init.go

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,16 @@ import (
99
"net/http"
1010
"time"
1111

12+
"github.com/rs/zerolog"
1213
"github.com/txn2/txeh"
1314
"golang.org/x/sys/unix"
1415

1516
"github.com/e2b-dev/infra/packages/envd/internal/host"
1617
"github.com/e2b-dev/infra/packages/envd/internal/logs"
1718
)
1819

20+
var ErrAccessTokenAlreadySet = errors.New("access token is already set")
21+
1922
func (a *API) PostInit(w http.ResponseWriter, r *http.Request) {
2023
defer r.Body.Close()
2124

@@ -33,37 +36,23 @@ func (a *API) PostInit(w http.ResponseWriter, r *http.Request) {
3336
return
3437
}
3538

36-
if initRequest.Timestamp != nil {
37-
logger.Debug().Msgf("Setting sandbox start time to: %v", *initRequest.Timestamp)
38-
ts := unix.NsecToTimespec(initRequest.Timestamp.UnixNano())
39-
err = unix.ClockSettime(unix.CLOCK_REALTIME, &ts)
40-
if err != nil {
41-
logger.Error().Msgf("Failed to set system time: %v", err)
42-
}
43-
}
44-
45-
if initRequest.EnvVars != nil {
46-
logger.Debug().Msg(fmt.Sprintf("Setting %d env vars", len(*initRequest.EnvVars)))
47-
48-
for key, value := range *initRequest.EnvVars {
49-
logger.Debug().Msgf("Setting env var for %s", key)
50-
a.envVars.Store(key, value)
51-
}
52-
}
39+
a.initLock.Lock()
40+
defer a.initLock.Unlock()
5341

54-
if initRequest.AccessToken != nil {
55-
if a.accessToken != nil && *initRequest.AccessToken != *a.accessToken {
56-
logger.Error().Msg("Access token is already set and cannot be changed")
57-
w.WriteHeader(http.StatusConflict)
42+
// Update data only if the request is newer or if there's no timestamp at all
43+
if initRequest.Timestamp == nil || a.lastSetTime.SetToGreater(initRequest.Timestamp.UnixNano()) {
44+
err = a.SetData(logger, initRequest)
45+
if err != nil {
46+
switch {
47+
case errors.Is(err, ErrAccessTokenAlreadySet):
48+
w.WriteHeader(http.StatusConflict)
49+
default:
50+
logger.Error().Msgf("Failed to set data: %v", err)
51+
w.WriteHeader(http.StatusBadRequest)
52+
}
53+
w.Write([]byte(err.Error()))
5854
return
5955
}
60-
61-
logger.Debug().Msg("Setting access token")
62-
a.accessToken = initRequest.AccessToken
63-
}
64-
65-
if initRequest.HyperloopIP != nil {
66-
go a.SetupHyperloop(*initRequest.HyperloopIP)
6756
}
6857
}
6958

@@ -79,6 +68,42 @@ func (a *API) PostInit(w http.ResponseWriter, r *http.Request) {
7968
w.WriteHeader(http.StatusNoContent)
8069
}
8170

71+
func (a *API) SetData(logger zerolog.Logger, data PostInitJSONBody) error {
72+
if data.Timestamp != nil {
73+
logger.Debug().Msgf("Setting sandbox start time to: %v", *data.Timestamp)
74+
ts := unix.NsecToTimespec(data.Timestamp.UnixNano())
75+
err := unix.ClockSettime(unix.CLOCK_REALTIME, &ts)
76+
if err != nil {
77+
logger.Error().Msgf("Failed to set system time: %v", err)
78+
}
79+
}
80+
81+
if data.EnvVars != nil {
82+
logger.Debug().Msg(fmt.Sprintf("Setting %d env vars", len(*data.EnvVars)))
83+
84+
for key, value := range *data.EnvVars {
85+
logger.Debug().Msgf("Setting env var for %s", key)
86+
a.envVars.Store(key, value)
87+
}
88+
}
89+
90+
if data.AccessToken != nil {
91+
if a.accessToken != nil && *data.AccessToken != *a.accessToken {
92+
logger.Error().Msg("Access token is already set and cannot be changed")
93+
return ErrAccessTokenAlreadySet
94+
}
95+
96+
logger.Debug().Msg("Setting access token")
97+
a.accessToken = data.AccessToken
98+
}
99+
100+
if data.HyperloopIP != nil {
101+
go a.SetupHyperloop(*data.HyperloopIP)
102+
}
103+
104+
return nil
105+
}
106+
82107
func (a *API) SetupHyperloop(address string) {
83108
a.hyperloopLock.Lock()
84109
defer a.hyperloopLock.Unlock()

packages/envd/internal/api/store.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@ type API struct {
1818
envVars *utils.Map[string, string]
1919
mmdsChan chan *host.MMDSOpts
2020
hyperloopLock sync.Mutex
21+
22+
lastSetTime *utils.AtomicMax
23+
initLock sync.Mutex
2124
}
2225

2326
func New(l *zerolog.Logger, envVars *utils.Map[string, string], mmdsChan chan *host.MMDSOpts, isNotFC bool) *API {
24-
return &API{logger: l, envVars: envVars, mmdsChan: mmdsChan, isNotFC: isNotFC}
27+
return &API{logger: l, envVars: envVars, mmdsChan: mmdsChan, isNotFC: isNotFC, lastSetTime: utils.NewAtomicMax()}
2528
}
2629

2730
func (a *API) GetHealth(w http.ResponseWriter, r *http.Request) {
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package utils
2+
3+
import (
4+
"sync"
5+
)
6+
7+
type AtomicMax struct {
8+
val int64
9+
mu sync.Mutex
10+
}
11+
12+
func NewAtomicMax() *AtomicMax {
13+
return &AtomicMax{}
14+
}
15+
16+
func (a *AtomicMax) SetToGreater(newValue int64) bool {
17+
a.mu.Lock()
18+
defer a.mu.Unlock()
19+
20+
if a.val > newValue {
21+
return false
22+
}
23+
24+
a.val = newValue
25+
return true
26+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package utils
2+
3+
import (
4+
"sync"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestAtomicMax_NewAtomicMax(t *testing.T) {
12+
am := NewAtomicMax()
13+
require.NotNil(t, am)
14+
require.Equal(t, int64(0), am.val)
15+
}
16+
17+
func TestAtomicMax_SetToGreater_InitialValue(t *testing.T) {
18+
am := NewAtomicMax()
19+
20+
// Should succeed when newValue > current
21+
assert.True(t, am.SetToGreater(10))
22+
assert.Equal(t, int64(10), am.val)
23+
}
24+
25+
func TestAtomicMax_SetToGreater_EqualValue(t *testing.T) {
26+
am := NewAtomicMax()
27+
am.val = 10
28+
29+
// Should succeed when newValue > current
30+
assert.True(t, am.SetToGreater(20))
31+
assert.Equal(t, int64(20), am.val)
32+
}
33+
34+
func TestAtomicMax_SetToGreater_GreaterValue(t *testing.T) {
35+
am := NewAtomicMax()
36+
am.val = 10
37+
38+
// Should fail when newValue < current, keeping the max value
39+
assert.False(t, am.SetToGreater(5))
40+
assert.Equal(t, int64(10), am.val)
41+
}
42+
43+
func TestAtomicMax_SetToGreater_NegativeValues(t *testing.T) {
44+
am := NewAtomicMax()
45+
am.val = -5
46+
47+
assert.True(t, am.SetToGreater(-2))
48+
assert.Equal(t, int64(-2), am.val)
49+
}
50+
51+
func TestAtomicMax_SetToGreater_Concurrent(t *testing.T) {
52+
am := NewAtomicMax()
53+
var wg sync.WaitGroup
54+
55+
// Run 100 goroutines trying to update the value concurrently
56+
numGoroutines := 100
57+
wg.Add(numGoroutines)
58+
59+
for i := range numGoroutines {
60+
go func(val int64) {
61+
defer wg.Done()
62+
am.SetToGreater(val)
63+
}(int64(i))
64+
}
65+
66+
wg.Wait()
67+
68+
// The final value should be 99 (the maximum value)
69+
assert.Equal(t, int64(99), am.val)
70+
}

0 commit comments

Comments
 (0)