diff --git a/iac/provider-gcp/nomad/jobs/orchestrator.hcl b/iac/provider-gcp/nomad/jobs/orchestrator.hcl index 35e4a9995f..b3d516df58 100644 --- a/iac/provider-gcp/nomad/jobs/orchestrator.hcl +++ b/iac/provider-gcp/nomad/jobs/orchestrator.hcl @@ -70,10 +70,12 @@ EOT TEMPLATE_BUCKET_NAME = "${template_bucket_name}" OTEL_COLLECTOR_GRPC_ENDPOINT = "${otel_collector_grpc_endpoint}" ALLOW_SANDBOX_INTERNET = "${allow_sandbox_internet}" - SHARED_CHUNK_CACHE_PATH = "${shared_chunk_cache_path}" + SHARED_CHUNK_CACHE_PATH = "${shared_chunk_cache_path}" CLICKHOUSE_CONNECTION_STRING = "${clickhouse_connection_string}" REDIS_URL = "${redis_url}" REDIS_CLUSTER_URL = "${redis_cluster_url}" + GRPC_PORT = "${port}" + PROXY_PORT = "${proxy_port}" %{ if launch_darkly_api_key != "" } LAUNCH_DARKLY_API_KEY = "${launch_darkly_api_key}" @@ -82,7 +84,7 @@ EOT config { command = "/bin/bash" - args = ["-c", " chmod +x local/orchestrator && local/orchestrator --port ${port} --proxy-port ${proxy_port}"] + args = ["-c", " chmod +x local/orchestrator && local/orchestrator"] } artifact { diff --git a/iac/provider-gcp/nomad/jobs/template-manager.hcl b/iac/provider-gcp/nomad/jobs/template-manager.hcl index ea26be7123..978c4d30c8 100644 --- a/iac/provider-gcp/nomad/jobs/template-manager.hcl +++ b/iac/provider-gcp/nomad/jobs/template-manager.hcl @@ -77,6 +77,7 @@ job "template-manager-system" { SHARED_CHUNK_CACHE_PATH = "${shared_chunk_cache_path}" CLICKHOUSE_CONNECTION_STRING = "${clickhouse_connection_string}" DOCKERHUB_REMOTE_REPOSITORY_URL = "${dockerhub_remote_repository_url}" + GRPC_PORT = "${port}" %{ if !update_stanza } FORCE_STOP = "true" %{ endif } @@ -87,7 +88,7 @@ job "template-manager-system" { config { command = "/bin/bash" - args = ["-c", " chmod +x local/template-manager && local/template-manager --port ${port}"] + args = ["-c", " chmod +x local/template-manager && local/template-manager"] } artifact { diff --git a/packages/client-proxy/internal/proxy/proxy.go b/packages/client-proxy/internal/proxy/proxy.go index 8eb5ef2810..84ea005b77 100644 --- a/packages/client-proxy/internal/proxy/proxy.go +++ b/packages/client-proxy/internal/proxy/proxy.go @@ -96,7 +96,13 @@ func catalogResolution(ctx context.Context, sandboxId string, c catalog.Sandboxe return s.OrchestratorIP, nil } -func NewClientProxy(meterProvider metric.MeterProvider, serviceName string, port uint, catalog catalog.SandboxesCatalog, useCatalogResolution bool, useDnsResolution bool) (*reverseproxy.Proxy, error) { +func NewClientProxy( + meterProvider metric.MeterProvider, + serviceName string, + port uint16, + catalog catalog.SandboxesCatalog, + useCatalogResolution, useDnsResolution bool, +) (*reverseproxy.Proxy, error) { if !useCatalogResolution && !useDnsResolution { return nil, errors.New("catalog resolution and DNS resolution are both disabled, at least one must be enabled") } diff --git a/packages/client-proxy/main.go b/packages/client-proxy/main.go index c1572cfc33..c8ab634567 100644 --- a/packages/client-proxy/main.go +++ b/packages/client-proxy/main.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "log" + "math" "net" "net/http" "os" @@ -104,6 +105,11 @@ func run() int { zap.ReplaceGlobals(logger) proxyPort := internal.GetProxyServicePort() + if proxyPort <= 0 || proxyPort > int(math.MaxUint16) { + logger.Error("Proxy port is outside the valid uint16 range", zap.Int("value", proxyPort)) + return 1 + } + edgePort := internal.GetEdgeServicePort() edgeSecret := internal.GetEdgeServiceSecret() orchestratorPort := internal.GetOrchestratorServicePort() @@ -155,7 +161,7 @@ func run() int { } // Proxy sandbox http traffic to orchestrator nodes - trafficProxy, err := e2bproxy.NewClientProxy(tel.MeterProvider, serviceName, uint(proxyPort), catalog, useProxyCatalogResolution, useDnsResolution) + trafficProxy, err := e2bproxy.NewClientProxy(tel.MeterProvider, serviceName, uint16(proxyPort), catalog, useProxyCatalogResolution, useDnsResolution) if err != nil { logger.Error("Failed to create client proxy", zap.Error(err)) return 1 diff --git a/packages/orchestrator/benchmark_test.go b/packages/orchestrator/benchmark_test.go index 23011e0713..6e1dbef2f8 100644 --- a/packages/orchestrator/benchmark_test.go +++ b/packages/orchestrator/benchmark_test.go @@ -109,6 +109,11 @@ func BenchmarkBaseImageLaunch(b *testing.B) { b.Setenv("SNAPSHOT_CACHE_DIR", abs(filepath.Join(tempDir, "snapshot-cache"))) b.Setenv("LOCAL_TEMPLATE_STORAGE_BASE_PATH", abs(filepath.Join(persistenceDir, "templates"))) + networkConfig, err := network.ParseConfig() + if err != nil { + b.Fatalf("error parsing config: %v", err) + } + // prep directories for _, subdir := range []string{"build", "build-templates" /*"fc-vm",*/, "sandbox", "snapshot-cache", "template"} { fullDirName := filepath.Join(tempDir, subdir) @@ -123,7 +128,7 @@ func BenchmarkBaseImageLaunch(b *testing.B) { // sbxlogger.SetSandboxLoggerExternal(logger) networkPool, err := network.NewPool( - b.Context(), noop.MeterProvider{}, 8, 8, clientID, + b.Context(), noop.MeterProvider{}, 8, 8, clientID, networkConfig, ) require.NoError(b, err) defer func() { @@ -197,7 +202,7 @@ func BenchmarkBaseImageLaunch(b *testing.B) { persistenceBuild, err := storage.GetBuildCacheStorageProvider(b.Context(), nil) require.NoError(b, err) - var proxyPort uint = 5007 + var proxyPort uint16 = 5007 sandboxes := smap.New[*sandbox.Sandbox]() diff --git a/packages/orchestrator/cmd/build-template/main.go b/packages/orchestrator/cmd/build-template/main.go index 72eb5feed9..26d7fe7287 100644 --- a/packages/orchestrator/cmd/build-template/main.go +++ b/packages/orchestrator/cmd/build-template/main.go @@ -38,8 +38,7 @@ const ( ) func main() { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := context.Background() templateID := flag.String("template", "", "template id") buildID := flag.String("build", "", "build id") @@ -47,9 +46,14 @@ func main() { fcVersion := flag.String("firecracker", "", "firecracker version") flag.Parse() - err := buildTemplate(ctx, *kernelVersion, *fcVersion, *templateID, *buildID) + networkConfig, err := network.ParseConfig() + if err != nil { + log.Fatalf("error parsing config: %v", err) + } + + err = buildTemplate(ctx, *kernelVersion, *fcVersion, *templateID, *buildID, networkConfig) if err != nil { - log.Fatalf("error building template: %v", err) //nolint:gocritic // probably fine to bail if we're done? + log.Fatalf("error building template: %v", err) } } @@ -59,6 +63,7 @@ func buildTemplate( fcVersion, templateID, buildID string, + networkConfig network.Config, ) error { ctx, cancel := context.WithTimeout(parentCtx, time.Minute*5) defer cancel() @@ -121,7 +126,7 @@ func buildTemplate( } }() - networkPool, err := network.NewPool(ctx, noop.MeterProvider{}, 8, 8, clientID) + networkPool, err := network.NewPool(ctx, noop.MeterProvider{}, 8, 8, clientID, networkConfig) if err != nil { return fmt.Errorf("could not create network pool: %w", err) } diff --git a/packages/orchestrator/go.mod b/packages/orchestrator/go.mod index 7a070cb1ab..616f3d68bc 100644 --- a/packages/orchestrator/go.mod +++ b/packages/orchestrator/go.mod @@ -18,6 +18,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/ecr v1.44.0 github.com/bits-and-blooms/bitset v1.22.0 github.com/bmatcuk/doublestar/v4 v4.9.1 + github.com/caarlos0/env/v11 v11.3.1 github.com/containernetworking/plugins v1.6.0 github.com/containers/storage v1.58.0 github.com/coreos/go-iptables v0.8.0 diff --git a/packages/orchestrator/go.sum b/packages/orchestrator/go.sum index 8598a67f09..2bd38097b5 100644 --- a/packages/orchestrator/go.sum +++ b/packages/orchestrator/go.sum @@ -196,6 +196,8 @@ github.com/bytedance/sonic v1.13.2/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1 github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= github.com/bytedance/sonic/loader v0.2.4 h1:ZWCw4stuXUsn1/+zQDqeE7JKP+QO47tz7QCNan80NzY= github.com/bytedance/sonic/loader v0.2.4/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= +github.com/caarlos0/env/v11 v11.3.1 h1:cArPWC15hWmEt+gWk7YBi7lEXTXCvpaSdCiZE2X5mCA= +github.com/caarlos0/env/v11 v11.3.1/go.mod h1:qupehSf/Y0TUTsxKywqRt/vJjN5nz6vauiYEUUr8P4U= github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/packages/orchestrator/internal/cfg/model.go b/packages/orchestrator/internal/cfg/model.go new file mode 100644 index 0000000000..a3a00a74c9 --- /dev/null +++ b/packages/orchestrator/internal/cfg/model.go @@ -0,0 +1,29 @@ +package cfg + +import ( + "github.com/caarlos0/env/v11" + + "github.com/e2b-dev/infra/packages/orchestrator/internal/sandbox/network" +) + +type Config struct { + AllowSandboxInternet bool `env:"ALLOW_SANDBOX_INTERNET" envDefault:"true"` + ClickhouseConnectionString string `env:"CLICKHOUSE_CONNECTION_STRING"` + ForceStop bool `env:"FORCE_STOP"` + GRPCPort uint16 `env:"GRPC_PORT" envDefault:"5008"` + LaunchDarklyAPIKey string `env:"LAUNCH_DARKLY_API_KEY"` + OrchestratorBasePath string `env:"ORCHESTRATOR_BASE_PATH" envDefault:"/orchestrator"` + OrchestratorLockPath string `env:"ORCHESTRATOR_LOCK_PATH" envDefault:"/orchestrator.lock"` + ProxyPort uint16 `env:"PROXY_PORT" envDefault:"5007"` + RedisClusterURL string `env:"REDIS_CLUSTER_URL"` + RedisURL string `env:"REDIS_URL"` + Services []string `env:"ORCHESTRATOR_SERVICES" envDefault:"orchestrator"` + + NetworkConfig network.Config +} + +func Parse() (Config, error) { + var model Config + err := env.Parse(&model) + return model, err +} diff --git a/packages/orchestrator/internal/cfg/model_test.go b/packages/orchestrator/internal/cfg/model_test.go new file mode 100644 index 0000000000..bf8cbc8093 --- /dev/null +++ b/packages/orchestrator/internal/cfg/model_test.go @@ -0,0 +1,35 @@ +package cfg + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParse(t *testing.T) { + t.Run("network config local flag defaults to false", func(t *testing.T) { + config, err := Parse() + require.NoError(t, err) + + assert.False(t, config.NetworkConfig.UseLocalNamespaceStorage) + }) + + t.Run("network config is parsed correctly", func(t *testing.T) { + t.Setenv("USE_LOCAL_NAMESPACE_STORAGE", "true") + + config, err := Parse() + require.NoError(t, err) + + assert.True(t, config.NetworkConfig.UseLocalNamespaceStorage) + }) + + t.Run("multiple services parses correctly", func(t *testing.T) { + t.Setenv("ORCHESTRATOR_SERVICES", "service1,service2") + + config, err := Parse() + require.NoError(t, err) + + assert.Equal(t, []string{"service1", "service2"}, config.Services) + }) +} diff --git a/packages/orchestrator/internal/consts.go b/packages/orchestrator/internal/consts.go deleted file mode 100644 index 2b0681369c..0000000000 --- a/packages/orchestrator/internal/consts.go +++ /dev/null @@ -1,20 +0,0 @@ -package internal - -import ( - "github.com/e2b-dev/infra/packages/shared/pkg/env" -) - -const ( - // Using reserver IPv4 in range that is used for experiments and documentation - // https://en.wikipedia.org/wiki/Reserved_IP_addresses - defaultHyperloopIP = "192.0.2.1" - defaultHyperloopProxyPort = "5010" -) - -func GetHyperloopIP() string { - return env.GetEnv("SANDBOX_HYPERLOOP_IP", defaultHyperloopIP) -} - -func GetHyperloopProxyPort() string { - return env.GetEnv("SANDBOX_HYPERLOOP_PROXY_PORT", defaultHyperloopProxyPort) -} diff --git a/packages/orchestrator/internal/grpcserver/server.go b/packages/orchestrator/internal/grpcserver/server.go index f3185db0ea..53ddcb3d44 100644 --- a/packages/orchestrator/internal/grpcserver/server.go +++ b/packages/orchestrator/internal/grpcserver/server.go @@ -99,7 +99,7 @@ func (g *GRPCServer) GRPCServer() *grpc.Server { } // Start launches -func (g *GRPCServer) Start(ctx context.Context, port uint) error { +func (g *GRPCServer) Start(ctx context.Context, port uint16) error { var lisCfg net.ListenConfig lis, err := lisCfg.Listen(ctx, "tcp", fmt.Sprintf(":%d", port)) if err != nil { @@ -118,7 +118,7 @@ func (g *GRPCServer) Start(ctx context.Context, port uint) error { // Match gRPC requests. grpcL := m.Match(cmux.Any()) - zap.L().Info("Starting GRPC server", zap.Uint("port", port)) + zap.L().Info("Starting GRPC server", zap.Uint16("port", port)) go func() { if err := g.grpc.Serve(grpcL); err != nil { diff --git a/packages/orchestrator/internal/hyperloopserver/server.go b/packages/orchestrator/internal/hyperloopserver/server.go index ef9e76841e..602ee3e724 100644 --- a/packages/orchestrator/internal/hyperloopserver/server.go +++ b/packages/orchestrator/internal/hyperloopserver/server.go @@ -20,7 +20,7 @@ import ( const maxUploadLimit = 1 << 28 // 256 MiB -func NewHyperloopServer(ctx context.Context, port uint, logger *zap.Logger, sandboxes *smap.Map[*sandbox.Sandbox]) (*http.Server, error) { +func NewHyperloopServer(ctx context.Context, port uint16, logger *zap.Logger, sandboxes *smap.Map[*sandbox.Sandbox]) (*http.Server, error) { sandboxCollectorAddr := env.LogsCollectorAddress() store := handlers.NewHyperloopStore(logger, sandboxes, sandboxCollectorAddr) swagger, err := api.GetSwagger() diff --git a/packages/orchestrator/internal/proxy/proxy.go b/packages/orchestrator/internal/proxy/proxy.go index 3009ffdb67..5a2f431513 100644 --- a/packages/orchestrator/internal/proxy/proxy.go +++ b/packages/orchestrator/internal/proxy/proxy.go @@ -29,7 +29,7 @@ type SandboxProxy struct { proxy *reverseproxy.Proxy } -func NewSandboxProxy(meterProvider metric.MeterProvider, port uint, sandboxes *smap.Map[*sandbox.Sandbox]) (*SandboxProxy, error) { +func NewSandboxProxy(meterProvider metric.MeterProvider, port uint16, sandboxes *smap.Map[*sandbox.Sandbox]) (*SandboxProxy, error) { proxy := reverseproxy.New( port, idleTimeout, diff --git a/packages/orchestrator/internal/sandbox/network/network.go b/packages/orchestrator/internal/sandbox/network/network.go index 9bdf75a717..b17c982175 100644 --- a/packages/orchestrator/internal/sandbox/network/network.go +++ b/packages/orchestrator/internal/sandbox/network/network.go @@ -10,8 +10,6 @@ import ( "github.com/vishvananda/netlink" "github.com/vishvananda/netns" "go.uber.org/zap" - - consts "github.com/e2b-dev/infra/packages/orchestrator/internal" ) func (s *Slot) CreateNetwork() error { @@ -220,7 +218,7 @@ func (s *Slot) CreateNetwork() error { err = tables.Append( "nat", "PREROUTING", "-i", s.VethName(), "-p", "tcp", "-d", s.HyperloopIPString(), "--dport", "80", - "-j", "REDIRECT", "--to-port", consts.GetHyperloopProxyPort(), + "-j", "REDIRECT", "--to-port", s.hyperloopPort, ) if err != nil { return fmt.Errorf("error creating HTTP redirect rule to sandbox hyperloop proxy server: %w", err) @@ -262,7 +260,7 @@ func (s *Slot) RemoveNetwork() error { err = tables.Delete( "nat", "PREROUTING", "-i", s.VethName(), "-p", "tcp", "-d", s.HyperloopIPString(), "--dport", "80", - "-j", "REDIRECT", "--to-port", consts.GetHyperloopProxyPort(), + "-j", "REDIRECT", "--to-port", s.hyperloopPort, ) if err != nil { errs = append(errs, fmt.Errorf("error deleting sandbox hyperloop proxy redirect rule: %w", err)) diff --git a/packages/orchestrator/internal/sandbox/network/pool.go b/packages/orchestrator/internal/sandbox/network/pool.go index 1aa2b3db7f..7d2211ea94 100644 --- a/packages/orchestrator/internal/sandbox/network/pool.go +++ b/packages/orchestrator/internal/sandbox/network/pool.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" + "github.com/caarlos0/env/v11" "go.opentelemetry.io/otel/metric" "go.uber.org/zap" @@ -16,7 +17,21 @@ const ( ReusedSlotsPoolSize = 100 ) +type Config struct { + // Using reserver IPv4 in range that is used for experiments and documentation + // https://en.wikipedia.org/wiki/Reserved_IP_addresses + HyperloopIPAddress string `env:"SANDBOX_HYPERLOOP_IP" envDefault:"192.0.2.1"` + HyperloopProxyPort uint16 `env:"SANDBOX_HYPERLOOP_PROXY_PORT" envDefault:"5010"` + UseLocalNamespaceStorage bool `env:"USE_LOCAL_NAMESPACE_STORAGE"` +} + +func ParseConfig() (Config, error) { + return env.ParseAs[Config]() +} + type Pool struct { + config Config + cancel context.CancelFunc newSlots chan *Slot @@ -27,7 +42,7 @@ type Pool struct { slotStorage Storage } -func NewPool(ctx context.Context, meterProvider metric.MeterProvider, newSlotsPoolSize, reusedSlotsPoolSize int, nodeID string) (*Pool, error) { +func NewPool(ctx context.Context, meterProvider metric.MeterProvider, newSlotsPoolSize, reusedSlotsPoolSize int, nodeID string, config Config) (*Pool, error) { newSlots := make(chan *Slot, newSlotsPoolSize-1) reusedSlots := make(chan *Slot, reusedSlotsPoolSize) @@ -43,13 +58,14 @@ func NewPool(ctx context.Context, meterProvider metric.MeterProvider, newSlotsPo return nil, fmt.Errorf("failed to create reused slot counter: %w", err) } - slotStorage, err := NewStorage(vrtSlotsSize, nodeID) + slotStorage, err := NewStorage(vrtSlotsSize, nodeID, config) if err != nil { return nil, fmt.Errorf("failed to create slot storage: %w", err) } ctx, cancel := context.WithCancel(ctx) pool := &Pool{ + config: config, newSlots: newSlots, reusedSlots: reusedSlots, newSlotCounter: newSlotCounter, diff --git a/packages/orchestrator/internal/sandbox/network/slot.go b/packages/orchestrator/internal/sandbox/network/slot.go index ce45e75999..14a81709a2 100644 --- a/packages/orchestrator/internal/sandbox/network/slot.go +++ b/packages/orchestrator/internal/sandbox/network/slot.go @@ -6,6 +6,7 @@ import ( "log" "net" "path/filepath" + "strconv" "sync/atomic" "github.com/containernetworking/plugins/pkg/ns" @@ -14,7 +15,6 @@ import ( "go.opentelemetry.io/otel/trace" netutils "k8s.io/utils/net" - "github.com/e2b-dev/infra/packages/orchestrator/internal" "github.com/e2b-dev/infra/packages/shared/pkg/env" ) @@ -76,10 +76,10 @@ type Slot struct { hostNet *net.IPNet hostCIDR string - hyperloopIP string + hyperloopIP, hyperloopPort string } -func NewSlot(key string, idx int) (*Slot, error) { +func NewSlot(key string, idx int, config Config) (*Slot, error) { if idx < 1 || idx > vrtSlotsSize { return nil, fmt.Errorf("slot index %d is out of range [1, %d)", idx, vrtSlotsSize) } @@ -132,7 +132,8 @@ func NewSlot(key string, idx int) (*Slot, error) { hostNet: hostNet, hostCIDR: hostCIDR, - hyperloopIP: internal.GetHyperloopIP(), + hyperloopIP: config.HyperloopIPAddress, + hyperloopPort: strconv.FormatUint(uint64(config.HyperloopProxyPort), 10), } return slot, nil diff --git a/packages/orchestrator/internal/sandbox/network/storage.go b/packages/orchestrator/internal/sandbox/network/storage.go index 72e055473c..2080e3e2a1 100644 --- a/packages/orchestrator/internal/sandbox/network/storage.go +++ b/packages/orchestrator/internal/sandbox/network/storage.go @@ -2,7 +2,6 @@ package network import ( "context" - "os" "github.com/e2b-dev/infra/packages/shared/pkg/env" ) @@ -13,10 +12,10 @@ type Storage interface { } // NewStorage creates a new slot storage based on the environment, we are ok with using a memory storage for local -func NewStorage(slotsSize int, nodeID string) (Storage, error) { - if env.IsDevelopment() || os.Getenv("USE_LOCAL_NAMESPACE_STORAGE") == "true" { - return NewStorageLocal(slotsSize) +func NewStorage(slotsSize int, nodeID string, config Config) (Storage, error) { + if env.IsDevelopment() || config.UseLocalNamespaceStorage { + return NewStorageLocal(slotsSize, config) } - return NewStorageKV(slotsSize, nodeID) + return NewStorageKV(slotsSize, nodeID, config) } diff --git a/packages/orchestrator/internal/sandbox/network/storage_kv.go b/packages/orchestrator/internal/sandbox/network/storage_kv.go index 47f8ae35d4..c009cf4fa7 100644 --- a/packages/orchestrator/internal/sandbox/network/storage_kv.go +++ b/packages/orchestrator/internal/sandbox/network/storage_kv.go @@ -12,6 +12,7 @@ import ( ) type StorageKV struct { + config Config slotsSize int consulClient *consulApi.Client nodeID string @@ -21,7 +22,7 @@ func (s *StorageKV) getKVKey(slotIdx int) string { return fmt.Sprintf("%s/%d", s.nodeID, slotIdx) } -func NewStorageKV(slotsSize int, nodeID string) (*StorageKV, error) { +func NewStorageKV(slotsSize int, nodeID string, config Config) (*StorageKV, error) { consulToken := utils.RequiredEnv("CONSUL_TOKEN", "Consul token for authenticating requests to the Consul API") consulClient, err := newConsulClient(consulToken) @@ -30,6 +31,7 @@ func NewStorageKV(slotsSize int, nodeID string) (*StorageKV, error) { } return &StorageKV{ + config: config, slotsSize: slotsSize, consulClient: consulClient, nodeID: nodeID, @@ -63,7 +65,7 @@ func (s *StorageKV) Acquire(_ context.Context) (*Slot, error) { } if status { - return NewSlot(key, slotIdx) + return NewSlot(key, slotIdx, s.config) } return nil, nil diff --git a/packages/orchestrator/internal/sandbox/network/storage_local.go b/packages/orchestrator/internal/sandbox/network/storage_local.go index ea40bd5f48..69cb010708 100644 --- a/packages/orchestrator/internal/sandbox/network/storage_local.go +++ b/packages/orchestrator/internal/sandbox/network/storage_local.go @@ -13,6 +13,7 @@ import ( ) type StorageLocal struct { + config Config slotsSize int foreignNs map[string]struct{} acquiredNs map[string]struct{} @@ -21,7 +22,7 @@ type StorageLocal struct { const netNamespacesDir = "/var/run/netns" -func NewStorageLocal(slotsSize int) (*StorageLocal, error) { +func NewStorageLocal(slotsSize int, config Config) (*StorageLocal, error) { // get namespaces that we want to always skip foreignNs, err := getForeignNamespaces() if err != nil { @@ -35,6 +36,7 @@ func NewStorageLocal(slotsSize int) (*StorageLocal, error) { } return &StorageLocal{ + config: config, foreignNs: foreignNsMap, slotsSize: slotsSize, acquiredNs: make(map[string]struct{}, slotsSize), @@ -92,7 +94,7 @@ func (s *StorageLocal) Acquire(ctx context.Context) (*Slot, error) { s.acquiredNs[slotName] = struct{}{} slotKey := getLocalKey(slotIdx) - return NewSlot(slotKey, slotIdx) + return NewSlot(slotKey, slotIdx, s.config) } } } diff --git a/packages/orchestrator/internal/sandbox/network/storage_memory.go b/packages/orchestrator/internal/sandbox/network/storage_memory.go index a06adc2466..831354c5a4 100644 --- a/packages/orchestrator/internal/sandbox/network/storage_memory.go +++ b/packages/orchestrator/internal/sandbox/network/storage_memory.go @@ -8,13 +8,15 @@ import ( ) type StorageMemory struct { + config Config slotsSize int freeSlots []bool freeSlotsMu sync.Mutex } -func NewStorageMemory(slotsSize int) (*StorageMemory, error) { +func NewStorageMemory(slotsSize int, config Config) (*StorageMemory, error) { return &StorageMemory{ + config: config, slotsSize: slotsSize, freeSlots: make([]bool, slotsSize), freeSlotsMu: sync.Mutex{}, @@ -31,7 +33,7 @@ func (s *StorageMemory) Acquire(_ context.Context) (*Slot, error) { key := getMemoryKey(slotIdx) if !s.freeSlots[slotIdx] { s.freeSlots[slotIdx] = true - return NewSlot(key, slotIdx) + return NewSlot(key, slotIdx, s.config) } } diff --git a/packages/orchestrator/internal/service/info.go b/packages/orchestrator/internal/service/info.go index 31dca691e7..23ef31a851 100644 --- a/packages/orchestrator/internal/service/info.go +++ b/packages/orchestrator/internal/service/info.go @@ -6,6 +6,7 @@ import ( "go.uber.org/zap" + "github.com/e2b-dev/infra/packages/orchestrator/internal/cfg" orchestratorinfo "github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator-info" ) @@ -45,8 +46,8 @@ func (s *ServiceInfo) SetStatus(status orchestratorinfo.ServiceInfoStatus) { } } -func NewInfoContainer(clientId string, version string, commit string, instanceID string) *ServiceInfo { - services := GetServices() +func NewInfoContainer(clientId string, version string, commit string, instanceID string, config cfg.Config) *ServiceInfo { + services := GetServices(config) serviceRoles := make([]orchestratorinfo.ServiceInfoRole, 0) for _, service := range services { diff --git a/packages/orchestrator/internal/service/service.go b/packages/orchestrator/internal/service/service.go index 8314067f1f..06060936a7 100644 --- a/packages/orchestrator/internal/service/service.go +++ b/packages/orchestrator/internal/service/service.go @@ -3,7 +3,7 @@ package service import ( "strings" - "github.com/e2b-dev/infra/packages/shared/pkg/env" + "github.com/e2b-dev/infra/packages/orchestrator/internal/cfg" ) type ServiceType string @@ -29,9 +29,8 @@ func ParseServiceType(s string) ServiceType { // GetServices parses the ORCHESTRATOR_SERVICES environment variable // and returns a slice of known ServiceTypes. -func GetServices() []ServiceType { - servicesEnv := env.GetEnv("ORCHESTRATOR_SERVICES", string(Orchestrator)) - rawServiceNames := strings.Split(servicesEnv, ",") +func GetServices(config cfg.Config) []ServiceType { + rawServiceNames := config.Services var services []ServiceType for _, name := range rawServiceNames { diff --git a/packages/orchestrator/main.go b/packages/orchestrator/main.go index 387d4f4b8d..7825ec5598 100644 --- a/packages/orchestrator/main.go +++ b/packages/orchestrator/main.go @@ -3,15 +3,12 @@ package main import ( "context" "errors" - "flag" "fmt" "log" - "math" "net/http" "os" "os/signal" "slices" - "strconv" "syscall" "time" @@ -23,7 +20,7 @@ import ( clickhouse "github.com/e2b-dev/infra/packages/clickhouse/pkg" "github.com/e2b-dev/infra/packages/clickhouse/pkg/batcher" - "github.com/e2b-dev/infra/packages/orchestrator/internal" + "github.com/e2b-dev/infra/packages/orchestrator/internal/cfg" "github.com/e2b-dev/infra/packages/orchestrator/internal/events" "github.com/e2b-dev/infra/packages/orchestrator/internal/grpcserver" "github.com/e2b-dev/infra/packages/orchestrator/internal/hyperloopserver" @@ -56,45 +53,17 @@ type Closeable interface { Close(ctx context.Context) error } -const ( - defaultPort = 5008 - defaultProxyPort = 5007 +const version = "0.1.0" - version = "0.1.0" -) - -var ( - forceStop = env.GetEnv("FORCE_STOP", "false") == "true" - commitSHA string - fileLockName = "/orchestrator.lock" -) - -func init() { - if value := os.Getenv("ORCHESTRATOR_LOCK_PATH"); value != "" { - fileLockName = value - } -} +var commitSHA string func main() { - port := flag.Uint("port", defaultPort, "orchestrator server port") - proxyPort := flag.Uint("proxy-port", defaultProxyPort, "orchestrator proxy port") - - flag.Parse() - - if *port > math.MaxUint16 { - log.Fatalf("%d is larger than maximum possible port %d", port, math.MaxInt16) - } - - if *proxyPort > math.MaxUint16 { - log.Fatalf("%d is larger than maximum possible proxy port %d", proxyPort, math.MaxInt16) - } - - hyperloopPort, err := strconv.ParseInt(internal.GetHyperloopProxyPort(), 10, 32) + config, err := cfg.Parse() if err != nil { - log.Fatalf("failed to get hyperloop proxy port: %v", err) + log.Fatalf("failed to parse config: %v", err) } - success := run(*port, *proxyPort, uint(hyperloopPort)) + success := run(config) log.Println("Stopping orchestrator, success:", success) @@ -103,15 +72,16 @@ func main() { } } -func run(port, proxyPort, hyperloopPort uint) (success bool) { +func run(config cfg.Config) (success bool) { success = true - services := service.GetServices() + services := service.GetServices(config) // Check if the orchestrator crashed and restarted // Skip this check in development mode // We don't want to lock if the service is running with force stop; the subsequent start would fail. - if !env.IsDevelopment() && !forceStop && slices.Contains(services, service.Orchestrator) { + if !env.IsDevelopment() && !config.ForceStop && slices.Contains(services, service.Orchestrator) { + fileLockName := config.OrchestratorLockPath info, err := os.Stat(fileLockName) if err == nil { log.Fatalf("Orchestrator was already started at %s, exiting", info.ModTime()) @@ -145,7 +115,7 @@ func run(port, proxyPort, hyperloopPort uint) (success bool) { nodeID := env.GetNodeID() serviceName := service.GetServiceName(services) serviceInstanceID := uuid.NewString() - serviceInfo := service.NewInfoContainer(nodeID, version, commitSHA, serviceInstanceID) + serviceInfo := service.NewInfoContainer(nodeID, version, commitSHA, serviceInstanceID, config) serviceError := make(chan error) defer close(serviceError) @@ -238,12 +208,12 @@ func run(port, proxyPort, hyperloopPort uint) (success bool) { // to propagate information about sandbox routing. sandboxes := smap.New[*sandbox.Sandbox]() - sandboxProxy, err := proxy.NewSandboxProxy(tel.MeterProvider, proxyPort, sandboxes) + sandboxProxy, err := proxy.NewSandboxProxy(tel.MeterProvider, config.ProxyPort, sandboxes) if err != nil { zap.L().Fatal("failed to create sandbox proxy", zap.Error(err)) } - networkPool, err := network.NewPool(ctx, tel.MeterProvider, network.NewSlotsPoolSize, network.ReusedSlotsPoolSize, nodeID) + networkPool, err := network.NewPool(ctx, tel.MeterProvider, network.NewSlotsPoolSize, network.ReusedSlotsPoolSize, nodeID, config.NetworkConfig) if err != nil { zap.L().Fatal("failed to create network pool", zap.Error(err)) } @@ -282,7 +252,7 @@ func run(port, proxyPort, hyperloopPort uint) (success bool) { var sandboxEventBatcher batcher.ClickhouseInsertBatcher[clickhouse.SandboxEvent] - clickhouseConnectionString := os.Getenv("CLICKHOUSE_CONNECTION_STRING") + clickhouseConnectionString := config.ClickhouseConnectionString if clickhouseConnectionString == "" { sandboxEventBatcher = batcher.NewNoopBatcher[clickhouse.SandboxEvent]() } else { @@ -321,7 +291,7 @@ func run(port, proxyPort, hyperloopPort uint) (success bool) { } var redisClient redis.UniversalClient - if redisClusterUrl := os.Getenv("REDIS_CLUSTER_URL"); redisClusterUrl != "" { + if redisClusterUrl := config.RedisClusterURL; redisClusterUrl != "" { // For managed Redis Cluster in GCP we should use Cluster Client, because // > Redis node endpoints can change and can be recycled as nodes are added and removed over time. // https://cloud.google.com/memorystore/docs/cluster/cluster-node-specification#cluster_endpoints @@ -330,13 +300,13 @@ func run(port, proxyPort, hyperloopPort uint) (success bool) { Addrs: []string{redisClusterUrl}, MinIdleConns: 1, }) - } else if rurl := os.Getenv("REDIS_URL"); rurl != "" { + } else if rurl := config.RedisURL; rurl != "" { redisClient = redis.NewClient(&redis.Options{ Addr: rurl, MinIdleConns: 1, }) } else { - zap.L().Warn("REDIS_URL not set, using no-op pubsub") + zap.L().Warn("REDIS_URL|REDIS_CLUSTER_URL not set, using no-op pubsub") } if redisClient != nil { @@ -361,7 +331,7 @@ func run(port, proxyPort, hyperloopPort uint) (success bool) { zap.L().Fatal("failed to create sandbox observer", zap.Error(err)) } - defaultAllowSandboxInternet := env.GetEnv("ALLOW_SANDBOX_INTERNET", "true") != "false" + defaultAllowSandboxInternet := config.AllowSandboxInternet sandboxFactory := sandbox.NewFactory(networkPool, devicePool, featureFlags, defaultAllowSandboxInternet) @@ -397,7 +367,7 @@ func run(port, proxyPort, hyperloopPort uint) (success bool) { } }(tmplSbxLoggerExternal) - hyperloopSrv, err := hyperloopserver.NewHyperloopServer(ctx, hyperloopPort, globalLogger, sandboxes) + hyperloopSrv, err := hyperloopserver.NewHyperloopServer(ctx, config.NetworkConfig.HyperloopProxyPort, globalLogger, sandboxes) if err != nil { zap.L().Fatal("failed to create hyperloop server", zap.Error(err)) } @@ -483,7 +453,7 @@ func run(port, proxyPort, hyperloopPort uint) (success bool) { g.Go(func() (err error) { // this sets the error declared above so the function // in the defer can check it. - grpcErr := grpcSrv.Start(ctx, port) + grpcErr := grpcSrv.Start(ctx, config.GRPCPort) if grpcErr != nil { grpcErr = fmt.Errorf("grpc server: %w", grpcErr) zap.L().Error("grpc server error", zap.Error(grpcErr)) @@ -511,7 +481,7 @@ func run(port, proxyPort, hyperloopPort uint) (success bool) { closeCtx, cancelCloseCtx := context.WithCancel(context.Background()) defer cancelCloseCtx() - if forceStop { + if config.ForceStop { cancelCloseCtx() } @@ -522,7 +492,7 @@ func run(port, proxyPort, hyperloopPort uint) (success bool) { } for _, c := range closers { - zap.L().Info(fmt.Sprintf("Closing %T, forced: %v", c, forceStop)) + zap.L().Info(fmt.Sprintf("Closing %T, forced: %v", c, config.ForceStop)) if err := c.Close(closeCtx); err != nil { zap.L().Error("error during shutdown", zap.Error(err)) success = false diff --git a/packages/shared/pkg/proxy/proxy.go b/packages/shared/pkg/proxy/proxy.go index 6e39fc8913..65dc807959 100644 --- a/packages/shared/pkg/proxy/proxy.go +++ b/packages/shared/pkg/proxy/proxy.go @@ -25,7 +25,7 @@ type Proxy struct { } func New( - port uint, + port uint16, idleTimeout time.Duration, getDestination func(r *http.Request) (*pool.Destination, error), ) *Proxy { diff --git a/packages/shared/pkg/proxy/proxy_test.go b/packages/shared/pkg/proxy/proxy_test.go index 4750191399..a82ea0c1f5 100644 --- a/packages/shared/pkg/proxy/proxy_test.go +++ b/packages/shared/pkg/proxy/proxy_test.go @@ -120,7 +120,7 @@ func newTestProxy(t *testing.T, getDestination func(r *http.Request) (*pool.Dest // Set up the proxy server proxy := New( - uint(port), + uint16(port), 20*time.Second, // Short idle timeout getDestination, )