From 206b21a547b8ed35b48b2b3f932ab09ce717442d Mon Sep 17 00:00:00 2001 From: Flavien Darche Date: Wed, 18 Dec 2024 15:13:45 +0100 Subject: [PATCH] Applied comments (ipenv, signals, gracefull shutdown) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Applied comments appsec: stop storing span tags, directly call span.SetTag (#3044) Signed-off-by: Eliott Bouhana ddtrace/tracer: Tracing as transport-only mode (APPSEC_STANDALONE) (#3033) Signed-off-by: Eliott Bouhana fix: improving test logic for TestStreamSendsErrorCode to avoid flakiness (#3049) vuln: upgrade golang.org/x/{crypto,net} to non-vulnerable versions (#3050) contrib/miekg/dns: resolve flaky test in TestExchange* (#3045) ddtrace/tracer: report datadog.tracer.api.errors health metric (#3024) build(deps): bump google.golang.org/grpc from 1.64.0 to 1.64.1 (#3001) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Rodrigo Argüello ddtrace/tracer: Report datadog.tracer.queue.enqueued.traces as health metric (#3019) ddtrace/tracer: Tracing as transport-only mode (APPSEC_STANDALONE) (#3033) Signed-off-by: Eliott Bouhana fix: improving test logic for TestStreamSendsErrorCode to avoid flakiness (#3049) vuln: upgrade golang.org/x/{crypto,net} to non-vulnerable versions (#3050) contrib/miekg/dns: resolve flaky test in TestExchange* (#3045) ddtrace/tracer: report datadog.tracer.api.errors health metric (#3024) build(deps): bump google.golang.org/grpc from 1.64.0 to 1.64.1 (#3001) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Rodrigo Argüello ddtrace/tracer: Report datadog.tracer.queue.enqueued.traces as health metric (#3019) --- .../cmd/serviceextensions/main.go | 124 ++++++++++-------- ddtrace/tracer/option.go | 17 +++ go.mod | 4 +- go.sum | 4 +- internal/env.go | 12 +- 5 files changed, 93 insertions(+), 68 deletions(-) diff --git a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main.go b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main.go index 856bd54666..3186ae76b4 100644 --- a/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main.go +++ b/contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main.go @@ -6,14 +6,23 @@ package main import ( + "context" "crypto/tls" - "gopkg.in/DataDog/dd-trace-go.v1/internal" + "errors" + "fmt" "net" "net/http" "os" + "os/signal" "strconv" + "syscall" + "time" + + "golang.org/x/sync/errgroup" gocontrolplane "gopkg.in/DataDog/dd-trace-go.v1/contrib/envoyproxy/go-control-plane" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + "gopkg.in/DataDog/dd-trace-go.v1/internal" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" "gopkg.in/DataDog/dd-trace-go.v1/internal/version" @@ -21,8 +30,6 @@ import ( "github.com/gorilla/mux" "google.golang.org/grpc" "google.golang.org/grpc/credentials" - "google.golang.org/grpc/reflection" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) // AppsecCalloutExtensionService defines the struct that follows the ExternalProcessorServer interface. @@ -38,54 +45,20 @@ type serviceExtensionConfig struct { func loadConfig() serviceExtensionConfig { extensionPortInt := internal.IntEnv("DD_SERVICE_EXTENSION_PORT", 443) - if extensionPortInt < 1 || extensionPortInt > 65535 { - log.Error("service_extension: invalid port number: %d\n", extensionPortInt) - os.Exit(1) - } - healthcheckPortInt := internal.IntEnv("DD_SERVICE_EXTENSION_HEALTHCHECK_PORT", 80) - if healthcheckPortInt < 1 || healthcheckPortInt > 65535 { - log.Error("service_extension: invalid port number: %d\n", healthcheckPortInt) - os.Exit(1) - } + extensionHostStr := internal.IpEnv("DD_SERVICE_EXTENSION_HOST", net.IP{0, 0, 0, 0}).String() - extensionHost := internal.IpEnv("DD_SERVICE_EXTENSION_HOST", "0.0.0.0") extensionPortStr := strconv.FormatInt(int64(extensionPortInt), 10) healthcheckPortStr := strconv.FormatInt(int64(healthcheckPortInt), 10) - // check if the ports are free - l, err := net.Listen("tcp", extensionHost+":"+extensionPortStr) - if err != nil { - log.Error("service_extension: failed to listen on extension %s:%s: %v\n", extensionHost, extensionPortStr, err) - os.Exit(1) - } - err = l.Close() - if err != nil { - log.Error("service_extension: failed to close listener on %s:%s: %v\n", extensionHost, extensionPortStr, err) - os.Exit(1) - } - - l, err = net.Listen("tcp", extensionHost+":"+healthcheckPortStr) - if err != nil { - log.Error("service_extension: failed to listen on health check %s:%s: %v\n", extensionHost, healthcheckPortStr, err) - os.Exit(1) - } - err = l.Close() - if err != nil { - log.Error("service_extension: failed to close listener on %s:%s: %v\n", extensionHost, healthcheckPortStr, err) - os.Exit(1) - } - return serviceExtensionConfig{ extensionPort: extensionPortStr, - extensionHost: extensionHost, + extensionHost: extensionHostStr, healthcheckPort: healthcheckPortStr, } } func main() { - var extensionService AppsecCalloutExtensionService - // Set the DD_VERSION to the current tracer version if not set if os.Getenv("DD_VERSION") == "" { if err := os.Setenv("DD_VERSION", version.Tag); err != nil { @@ -95,19 +68,42 @@ func main() { config := loadConfig() + if err := startService(config); err != nil { + log.Error("service_extension: %v\n", err) + log.Flush() + os.Exit(1) + } + + log.Info("service_extension: shutting down\n") +} + +func startService(config serviceExtensionConfig) error { + var extensionService AppsecCalloutExtensionService + tracer.Start(tracer.WithAppSecEnabled(true)) + defer tracer.Stop() // TODO: Enable ASM standalone mode when it is developed (should be done for Q4 2024) - go StartGPRCSsl(&extensionService, config) - log.Info("service_extension: callout gRPC server started on %s:%s\n", config.extensionHost, config.extensionPort) + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + g, ctx := errgroup.WithContext(ctx) - go startHealthCheck(config) - log.Info("service_extension: health check server started on %s:%s\n", config.extensionHost, config.healthcheckPort) + g.Go(func() error { + return startGPRCSsl(ctx, &extensionService, config) + }) + + g.Go(func() error { + return startHealthCheck(ctx, config) + }) - select {} + if err := g.Wait(); err != nil { + return err + } + + return nil } -func startHealthCheck(config serviceExtensionConfig) { +func startHealthCheck(ctx context.Context, config serviceExtensionConfig) error { muxServer := mux.NewRouter() muxServer.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") @@ -120,24 +116,32 @@ func startHealthCheck(config serviceExtensionConfig) { Handler: muxServer, } - if err := server.ListenAndServe(); err != nil { - log.Error("service_extension: error starting health check http server: %v\n", err) + go func() { + <-ctx.Done() + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := server.Shutdown(shutdownCtx); err != nil { + log.Error("service_extension: health check server shutdown: %v\n", err) + } + }() + + log.Info("service_extension: health check server started on %s:%s\n", config.extensionHost, config.healthcheckPort) + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return fmt.Errorf("health check http server: %v", err) } + + return nil } -func StartGPRCSsl(service extproc.ExternalProcessorServer, config serviceExtensionConfig) { +func startGPRCSsl(ctx context.Context, service extproc.ExternalProcessorServer, config serviceExtensionConfig) error { cert, err := tls.LoadX509KeyPair("localhost.crt", "localhost.key") if err != nil { - log.Error("service_extension: failed to load key pair: %v\n", err) - os.Exit(1) - return + return fmt.Errorf("failed to load key pair: %v", err) } lis, err := net.Listen("tcp", config.extensionHost+":"+config.extensionPort) if err != nil { - log.Error("service_extension: gRPC server failed to listen: %v\n", err) - os.Exit(1) - return + return fmt.Errorf("gRPC server: %v", err) } grpcCredentials := credentials.NewServerTLSFromCert(&cert) @@ -145,10 +149,16 @@ func StartGPRCSsl(service extproc.ExternalProcessorServer, config serviceExtensi appsecEnvoyExternalProcessorServer := gocontrolplane.AppsecEnvoyExternalProcessorServer(service) + go func() { + <-ctx.Done() + grpcServer.GracefulStop() + }() + extproc.RegisterExternalProcessorServer(grpcServer, appsecEnvoyExternalProcessorServer) - reflection.Register(grpcServer) + log.Info("service_extension: callout gRPC server started on %s:%s\n", config.extensionHost, config.extensionPort) if err := grpcServer.Serve(lis); err != nil { - log.Error("service_extension: error starting gRPC server: %v\n", err) - os.Exit(1) + return fmt.Errorf("error starting gRPC server: %v", err) } + + return nil } diff --git a/ddtrace/tracer/option.go b/ddtrace/tracer/option.go index 6ea99d89eb..c1ad01edf9 100644 --- a/ddtrace/tracer/option.go +++ b/ddtrace/tracer/option.go @@ -587,6 +587,23 @@ func newConfig(opts ...StartOption) *config { // This allows persisting the initial value of globalTags for future resets and updates. globalTagsOrigin := c.globalTags.cfgOrigin c.initGlobalTags(c.globalTags.get(), globalTagsOrigin) + + // TODO: change the name once APM Platform RFC is approved + if internal.BoolEnv("DD_EXPERIMENTAL_APPSEC_STANDALONE_ENABLED", false) { + // Enable tracing as transport layer mode + // This means to stop sending trace metrics, send one trace per minute and those force-kept by other products + // using the tracer as transport layer for their data. And finally adding the _dd.apm.enabled=0 tag to all traces + // to let the backend know that it needs to keep APM UI disabled. + c.globalSampleRate = 1.0 + c.traceRateLimitPerSecond = 1.0 / 60 + c.tracingAsTransport = true + WithGlobalTag("_dd.apm.enabled", 0)(c) + // Disable runtime metrics. In `tracingAsTransport` mode, we'll still + // tell the agent we computed them, so it doesn't do it either. + c.runtimeMetrics = false + c.runtimeMetricsV2 = false + } + return c } diff --git a/go.mod b/go.mod index ed327de9e8..bbe71fe323 100644 --- a/go.mod +++ b/go.mod @@ -101,11 +101,11 @@ require ( go.uber.org/goleak v1.3.0 golang.org/x/mod v0.20.0 golang.org/x/oauth2 v0.18.0 - golang.org/x/sys v0.24.0 + golang.org/x/sys v0.28.0 golang.org/x/time v0.6.0 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 google.golang.org/api v0.169.0 - google.golang.org/grpc v1.64.0 + google.golang.org/grpc v1.64.1 google.golang.org/protobuf v1.34.2 gopkg.in/jinzhu/gorm.v1 v1.9.2 gopkg.in/olivere/elastic.v3 v3.0.75 diff --git a/go.sum b/go.sum index c351013d05..aed16d7b16 100644 --- a/go.sum +++ b/go.sum @@ -3061,10 +3061,8 @@ google.golang.org/grpc v1.50.1/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCD google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww= google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= -google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY= -google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg= +google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA= google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0= -google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= diff --git a/internal/env.go b/internal/env.go index e6a6bb0b19..2e760526ac 100644 --- a/internal/env.go +++ b/internal/env.go @@ -60,20 +60,20 @@ func DurationEnv(key string, def time.Duration) time.Duration { return v } -// IpEnv returns the valid IP string value of an environment variable, or -// def otherwise. -func IpEnv(key string, def string) string { +// IpEnv returns the valid IP value of an environment variable, or def otherwise. +func IpEnv(key string, def net.IP) net.IP { vv, ok := os.LookupEnv(key) if !ok { return def } - if net.ParseIP(vv) == nil { - log.Warn("Non-IP value for env var %s, defaulting to %s", key, def) + ip := net.ParseIP(vv) + if ip == nil { + log.Warn("Non-IP value for env var %s, defaulting to %s", key, def.String()) return def } - return vv + return ip } // ForEachStringTag runs fn on every key val pair encountered in str.