Skip to content

Commit

Permalink
Applied comments (ipenv, signals, gracefull shutdown)
Browse files Browse the repository at this point in the history
Applied comments

appsec: stop storing span tags, directly call span.SetTag (#3044)

Signed-off-by: Eliott Bouhana <[email protected]>

ddtrace/tracer: Tracing as transport-only mode (APPSEC_STANDALONE) (#3033)

Signed-off-by: Eliott Bouhana <[email protected]>

fix: improving test logic for TestStreamSendsErrorCode to avoid flakiness (#3049)

vuln: upgrade golang.org/x/{crypto,net} to non-vulnerable versions (#3050)

contrib/miekg/dns: resolve flaky test in TestExchange* (#3045)

ddtrace/tracer: report datadog.tracer.api.errors health metric (#3024)

build(deps): bump google.golang.org/grpc from 1.64.0 to 1.64.1 (#3001)

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Rodrigo Argüello <[email protected]>

ddtrace/tracer: Report datadog.tracer.queue.enqueued.traces as health metric (#3019)

ddtrace/tracer: Tracing as transport-only mode (APPSEC_STANDALONE) (#3033)

Signed-off-by: Eliott Bouhana <[email protected]>

fix: improving test logic for TestStreamSendsErrorCode to avoid flakiness (#3049)

vuln: upgrade golang.org/x/{crypto,net} to non-vulnerable versions (#3050)

contrib/miekg/dns: resolve flaky test in TestExchange* (#3045)

ddtrace/tracer: report datadog.tracer.api.errors health metric (#3024)

build(deps): bump google.golang.org/grpc from 1.64.0 to 1.64.1 (#3001)

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Rodrigo Argüello <[email protected]>

ddtrace/tracer: Report datadog.tracer.queue.enqueued.traces as health metric (#3019)
  • Loading branch information
e-n-0 committed Dec 23, 2024
1 parent 87b5b57 commit 206b21a
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 68 deletions.
124 changes: 67 additions & 57 deletions contrib/envoyproxy/go-control-plane/cmd/serviceextensions/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,30 @@
package main

import (
"context"
"crypto/tls"
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"errors"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"

"golang.org/x/sync/errgroup"

gocontrolplane "gopkg.in/DataDog/dd-trace-go.v1/contrib/envoyproxy/go-control-plane"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/internal/version"

extproc "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"github.com/gorilla/mux"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/reflection"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

// AppsecCalloutExtensionService defines the struct that follows the ExternalProcessorServer interface.
Expand All @@ -38,54 +45,20 @@ type serviceExtensionConfig struct {

func loadConfig() serviceExtensionConfig {
extensionPortInt := internal.IntEnv("DD_SERVICE_EXTENSION_PORT", 443)
if extensionPortInt < 1 || extensionPortInt > 65535 {
log.Error("service_extension: invalid port number: %d\n", extensionPortInt)
os.Exit(1)
}

healthcheckPortInt := internal.IntEnv("DD_SERVICE_EXTENSION_HEALTHCHECK_PORT", 80)
if healthcheckPortInt < 1 || healthcheckPortInt > 65535 {
log.Error("service_extension: invalid port number: %d\n", healthcheckPortInt)
os.Exit(1)
}
extensionHostStr := internal.IpEnv("DD_SERVICE_EXTENSION_HOST", net.IP{0, 0, 0, 0}).String()

extensionHost := internal.IpEnv("DD_SERVICE_EXTENSION_HOST", "0.0.0.0")
extensionPortStr := strconv.FormatInt(int64(extensionPortInt), 10)
healthcheckPortStr := strconv.FormatInt(int64(healthcheckPortInt), 10)

// check if the ports are free
l, err := net.Listen("tcp", extensionHost+":"+extensionPortStr)
if err != nil {
log.Error("service_extension: failed to listen on extension %s:%s: %v\n", extensionHost, extensionPortStr, err)
os.Exit(1)
}
err = l.Close()
if err != nil {
log.Error("service_extension: failed to close listener on %s:%s: %v\n", extensionHost, extensionPortStr, err)
os.Exit(1)
}

l, err = net.Listen("tcp", extensionHost+":"+healthcheckPortStr)
if err != nil {
log.Error("service_extension: failed to listen on health check %s:%s: %v\n", extensionHost, healthcheckPortStr, err)
os.Exit(1)
}
err = l.Close()
if err != nil {
log.Error("service_extension: failed to close listener on %s:%s: %v\n", extensionHost, healthcheckPortStr, err)
os.Exit(1)
}

return serviceExtensionConfig{
extensionPort: extensionPortStr,
extensionHost: extensionHost,
extensionHost: extensionHostStr,
healthcheckPort: healthcheckPortStr,
}
}

func main() {
var extensionService AppsecCalloutExtensionService

// Set the DD_VERSION to the current tracer version if not set
if os.Getenv("DD_VERSION") == "" {
if err := os.Setenv("DD_VERSION", version.Tag); err != nil {
Expand All @@ -95,19 +68,42 @@ func main() {

config := loadConfig()

if err := startService(config); err != nil {
log.Error("service_extension: %v\n", err)
log.Flush()
os.Exit(1)
}

log.Info("service_extension: shutting down\n")
}

func startService(config serviceExtensionConfig) error {
var extensionService AppsecCalloutExtensionService

tracer.Start(tracer.WithAppSecEnabled(true))
defer tracer.Stop()
// TODO: Enable ASM standalone mode when it is developed (should be done for Q4 2024)

go StartGPRCSsl(&extensionService, config)
log.Info("service_extension: callout gRPC server started on %s:%s\n", config.extensionHost, config.extensionPort)
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
g, ctx := errgroup.WithContext(ctx)

go startHealthCheck(config)
log.Info("service_extension: health check server started on %s:%s\n", config.extensionHost, config.healthcheckPort)
g.Go(func() error {
return startGPRCSsl(ctx, &extensionService, config)
})

g.Go(func() error {
return startHealthCheck(ctx, config)
})

select {}
if err := g.Wait(); err != nil {
return err
}

return nil
}

func startHealthCheck(config serviceExtensionConfig) {
func startHealthCheck(ctx context.Context, config serviceExtensionConfig) error {
muxServer := mux.NewRouter()
muxServer.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
Expand All @@ -120,35 +116,49 @@ func startHealthCheck(config serviceExtensionConfig) {
Handler: muxServer,
}

if err := server.ListenAndServe(); err != nil {
log.Error("service_extension: error starting health check http server: %v\n", err)
go func() {
<-ctx.Done()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := server.Shutdown(shutdownCtx); err != nil {
log.Error("service_extension: health check server shutdown: %v\n", err)
}
}()

log.Info("service_extension: health check server started on %s:%s\n", config.extensionHost, config.healthcheckPort)
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("health check http server: %v", err)
}

return nil
}

func StartGPRCSsl(service extproc.ExternalProcessorServer, config serviceExtensionConfig) {
func startGPRCSsl(ctx context.Context, service extproc.ExternalProcessorServer, config serviceExtensionConfig) error {
cert, err := tls.LoadX509KeyPair("localhost.crt", "localhost.key")
if err != nil {
log.Error("service_extension: failed to load key pair: %v\n", err)
os.Exit(1)
return
return fmt.Errorf("failed to load key pair: %v", err)
}

lis, err := net.Listen("tcp", config.extensionHost+":"+config.extensionPort)
if err != nil {
log.Error("service_extension: gRPC server failed to listen: %v\n", err)
os.Exit(1)
return
return fmt.Errorf("gRPC server: %v", err)
}

grpcCredentials := credentials.NewServerTLSFromCert(&cert)
grpcServer := grpc.NewServer(grpc.Creds(grpcCredentials))

appsecEnvoyExternalProcessorServer := gocontrolplane.AppsecEnvoyExternalProcessorServer(service)

go func() {
<-ctx.Done()
grpcServer.GracefulStop()
}()

extproc.RegisterExternalProcessorServer(grpcServer, appsecEnvoyExternalProcessorServer)
reflection.Register(grpcServer)
log.Info("service_extension: callout gRPC server started on %s:%s\n", config.extensionHost, config.extensionPort)
if err := grpcServer.Serve(lis); err != nil {
log.Error("service_extension: error starting gRPC server: %v\n", err)
os.Exit(1)
return fmt.Errorf("error starting gRPC server: %v", err)
}

return nil
}
17 changes: 17 additions & 0 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,23 @@ func newConfig(opts ...StartOption) *config {
// This allows persisting the initial value of globalTags for future resets and updates.
globalTagsOrigin := c.globalTags.cfgOrigin
c.initGlobalTags(c.globalTags.get(), globalTagsOrigin)

// TODO: change the name once APM Platform RFC is approved
if internal.BoolEnv("DD_EXPERIMENTAL_APPSEC_STANDALONE_ENABLED", false) {
// Enable tracing as transport layer mode
// This means to stop sending trace metrics, send one trace per minute and those force-kept by other products
// using the tracer as transport layer for their data. And finally adding the _dd.apm.enabled=0 tag to all traces
// to let the backend know that it needs to keep APM UI disabled.
c.globalSampleRate = 1.0
c.traceRateLimitPerSecond = 1.0 / 60
c.tracingAsTransport = true
WithGlobalTag("_dd.apm.enabled", 0)(c)
// Disable runtime metrics. In `tracingAsTransport` mode, we'll still
// tell the agent we computed them, so it doesn't do it either.
c.runtimeMetrics = false
c.runtimeMetricsV2 = false
}

return c
}

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ require (
go.uber.org/goleak v1.3.0
golang.org/x/mod v0.20.0
golang.org/x/oauth2 v0.18.0
golang.org/x/sys v0.24.0
golang.org/x/sys v0.28.0
golang.org/x/time v0.6.0
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028
google.golang.org/api v0.169.0
google.golang.org/grpc v1.64.0
google.golang.org/grpc v1.64.1
google.golang.org/protobuf v1.34.2
gopkg.in/jinzhu/gorm.v1 v1.9.2
gopkg.in/olivere/elastic.v3 v3.0.75
Expand Down
4 changes: 1 addition & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3061,10 +3061,8 @@ google.golang.org/grpc v1.50.1/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCD
google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww=
google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw=
google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g=
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA=
google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0=
google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
Expand Down
12 changes: 6 additions & 6 deletions internal/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,20 @@ func DurationEnv(key string, def time.Duration) time.Duration {
return v
}

// IpEnv returns the valid IP string value of an environment variable, or
// def otherwise.
func IpEnv(key string, def string) string {
// IpEnv returns the valid IP value of an environment variable, or def otherwise.
func IpEnv(key string, def net.IP) net.IP {
vv, ok := os.LookupEnv(key)
if !ok {
return def
}

if net.ParseIP(vv) == nil {
log.Warn("Non-IP value for env var %s, defaulting to %s", key, def)
ip := net.ParseIP(vv)
if ip == nil {
log.Warn("Non-IP value for env var %s, defaulting to %s", key, def.String())
return def
}

return vv
return ip
}

// ForEachStringTag runs fn on every key val pair encountered in str.
Expand Down

0 comments on commit 206b21a

Please sign in to comment.