From 0bd962734063928cb8a2d7a2b416fdb7844df722 Mon Sep 17 00:00:00 2001 From: RAGOT David <35502263+Dav-14@users.noreply.github.com> Date: Thu, 6 Jun 2024 17:33:39 +0200 Subject: [PATCH] feat(agent): add otel grpc stats handler client side (#1535) Co-authored-by: David Ragot --- .editorconfig | 8 +- .gitignore | 5 +- ee/agent/.earthly/values.yaml | 8 +- ee/agent/Earthfile | 12 ++- ee/agent/cmd/root.go | 78 +++++++++---------- ee/agent/go.mod | 7 +- ee/agent/go.sum | 10 +-- ee/agent/helm/values.yaml | 4 + ee/agent/internal/membership_client.go | 32 ++++++-- ee/agent/internal/membership_listener.go | 45 ++++++----- ee/agent/internal/module.go | 4 +- helm/Earthfile | 32 +++++--- helm/libs/Earthfile | 19 +++++ helm/libs/core/Chart.yaml | 14 ++++ helm/libs/core/templates/_helpers.tpl | 45 +++++++++++ helm/libs/core/templates/_monitoring.tpl | 98 ++++++++++++++++++++++++ libs/go-libs/otlp/otlptraces/traces.go | 13 ++-- 17 files changed, 334 insertions(+), 100 deletions(-) create mode 100644 helm/libs/Earthfile create mode 100644 helm/libs/core/Chart.yaml create mode 100644 helm/libs/core/templates/_helpers.tpl create mode 100644 helm/libs/core/templates/_monitoring.tpl diff --git a/.editorconfig b/.editorconfig index 2facba1702..3e0ebe38d2 100644 --- a/.editorconfig +++ b/.editorconfig @@ -4,4 +4,10 @@ root = true end_of_line = lf charset = utf-8 indent_style = space -indent_size = 4 \ No newline at end of file +indent_size = 4 + +[*.tpl] +end_of_line = lf +charset = utf-8 +indent_style = space +indent_size = 2 \ No newline at end of file diff --git a/.gitignore b/.gitignore index 1fdbcc9fcc..c37041316e 100644 --- a/.gitignore +++ b/.gitignore @@ -37,4 +37,7 @@ openapi/node_modules .aws -.kube \ No newline at end of file +.kube + +# Helm +*.tgz \ No newline at end of file diff --git a/ee/agent/.earthly/values.yaml b/ee/agent/.earthly/values.yaml index 35bff6ef41..f87c84dc41 100644 --- a/ee/agent/.earthly/values.yaml +++ b/ee/agent/.earthly/values.yaml @@ -1,8 +1,14 @@ global: monitoring: + traces: + enabled: true + mode: grpc + exporter: otlp + endpoint: otel-collector-opentelemetry-collector.formance.svc.cluster.local + port: 4317 + insecure: true logs: format: "" - image: pullPolicy: Always diff --git a/ee/agent/Earthfile b/ee/agent/Earthfile index 117e760787..fe562cae94 100644 --- a/ee/agent/Earthfile +++ b/ee/agent/Earthfile @@ -3,6 +3,7 @@ VERSION 0.8 IMPORT github.com/formancehq/earthly:tags/v0.12.0 AS core IMPORT ../.. AS stack +IMPORT ../../helm/libs AS helm-libs IMPORT .. AS ee FROM core+base-image @@ -136,14 +137,21 @@ tests: helm-validate: FROM core+helm-base WORKDIR /src - COPY helm . + + COPY (helm-libs+sources/*) helm/libs/ + COPY --dir helm ee/agent/ + + WORKDIR /src/ee/agent/helm + RUN helm dependencies update DO --pass-args core+HELM_VALIDATE - SAVE ARTIFACT /src AS LOCAL helm + SAVE ARTIFACT /src/ee/agent/helm AS LOCAL helm helm-package: FROM +helm-validate RUN helm package . SAVE ARTIFACT /src + SAVE ARTIFACT . AS LOCAL helm + release: BUILD --pass-args stack+goreleaser --path=ee/agent \ No newline at end of file diff --git a/ee/agent/cmd/root.go b/ee/agent/cmd/root.go index 59f9207fa5..b4477b0b3d 100644 --- a/ee/agent/cmd/root.go +++ b/ee/agent/cmd/root.go @@ -14,7 +14,7 @@ import ( "github.com/formancehq/stack/components/agent/internal" "github.com/formancehq/stack/libs/go-libs/licence" sharedlogging "github.com/formancehq/stack/libs/go-libs/logging" - sharedotlptraces "github.com/formancehq/stack/libs/go-libs/otlp/otlptraces" + "github.com/formancehq/stack/libs/go-libs/otlp/otlptraces" "github.com/formancehq/stack/libs/go-libs/service" "github.com/pkg/errors" "github.com/spf13/cobra" @@ -50,12 +50,6 @@ const ( resyncPeriod = "resync-period" ) -func init() { - if err := v1beta1.AddToScheme(scheme.Scheme); err != nil { - panic(err) - } -} - var rootCmd = &cobra.Command{ SilenceUsage: true, PersistentPreRunE: func(cmd *cobra.Command, args []string) error { @@ -64,9 +58,41 @@ var rootCmd = &cobra.Command{ RunE: runAgent, } -func exitWithCode(code int, v ...any) { - fmt.Fprintln(os.Stdout, v...) - os.Exit(code) +func init() { + if err := v1beta1.AddToScheme(scheme.Scheme); err != nil { + panic(err) + } + + var kubeConfigFilePath string + if home := homedir.HomeDir(); home != "" { + kubeConfigFilePath = filepath.Join(home, ".kube", "config") + } + + service.BindFlags(rootCmd) + otlptraces.InitOTLPTracesFlags(rootCmd.PersistentFlags()) + licence.InitCLIFlags(rootCmd) + + rootCmd.Flags().String(kubeConfigFlag, kubeConfigFilePath, "") + rootCmd.Flags().String(serverAddressFlag, "localhost:8081", "") + rootCmd.Flags().Bool(tlsEnabledFlag, false, "") + rootCmd.Flags().Bool(tlsInsecureSkipVerifyFlag, false, "") + rootCmd.Flags().String(tlsCACertificateFlag, "", "") + rootCmd.Flags().String(idFlag, "", "") + rootCmd.Flags().String(authenticationModeFlag, "", "") + rootCmd.Flags().String(authenticationTokenFlag, "", "") + rootCmd.Flags().String(authenticationClientSecretFlag, "", "") + rootCmd.Flags().String(authenticationIssuerFlag, "", "") + rootCmd.Flags().String(baseUrlFlag, "", "") + rootCmd.Flags().Bool(productionFlag, false, "Is a production agent") + rootCmd.Flags().Duration(resyncPeriod, 5*time.Minute, "Resync period of K8S resources") + rootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") +} + +func Execute() { + if err := rootCmd.Execute(); err != nil { + fmt.Fprintln(os.Stdout, err) + os.Exit(1) + } } func runAgent(cmd *cobra.Command, args []string) error { @@ -117,7 +143,7 @@ func runAgent(cmd *cobra.Command, args []string) error { Production: viper.GetBool(productionFlag), Version: Version, }, viper.GetDuration(resyncPeriod), dialOptions...), - sharedotlptraces.CLITracesModule(), + otlptraces.CLITracesModule(), licence.CLIModule(ServiceName), } @@ -175,33 +201,3 @@ func createGRPCTransportCredentials(ctx context.Context) (credentials.TransportC } return credential, nil } - -func Execute() { - if err := rootCmd.Execute(); err != nil { - exitWithCode(1, err) - } -} - -func init() { - var kubeConfigFilePath string - if home := homedir.HomeDir(); home != "" { - kubeConfigFilePath = filepath.Join(home, ".kube", "config") - } - - service.BindFlags(rootCmd) - licence.InitCLIFlags(rootCmd) - rootCmd.Flags().String(kubeConfigFlag, kubeConfigFilePath, "") - rootCmd.Flags().String(serverAddressFlag, "localhost:8081", "") - rootCmd.Flags().Bool(tlsEnabledFlag, false, "") - rootCmd.Flags().Bool(tlsInsecureSkipVerifyFlag, false, "") - rootCmd.Flags().String(tlsCACertificateFlag, "", "") - rootCmd.Flags().String(idFlag, "", "") - rootCmd.Flags().String(authenticationModeFlag, "", "") - rootCmd.Flags().String(authenticationTokenFlag, "", "") - rootCmd.Flags().String(authenticationClientSecretFlag, "", "") - rootCmd.Flags().String(authenticationIssuerFlag, "", "") - rootCmd.Flags().String(baseUrlFlag, "", "") - rootCmd.Flags().Bool(productionFlag, false, "Is a production agent") - rootCmd.Flags().Duration(resyncPeriod, 5*time.Minute, "Resync period of K8S resources") - rootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") -} diff --git a/ee/agent/go.mod b/ee/agent/go.mod index 6c9eff9512..2081e454e6 100644 --- a/ee/agent/go.mod +++ b/ee/agent/go.mod @@ -17,7 +17,9 @@ require ( github.com/spf13/viper v1.18.2 github.com/stretchr/testify v1.9.0 github.com/zitadel/oidc/v2 v2.12.0 - go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.45.0 + go.opentelemetry.io/otel v1.24.0 + go.opentelemetry.io/otel/trace v1.24.0 go.uber.org/fx v1.20.1 go.uber.org/mock v0.4.0 golang.org/x/oauth2 v0.16.0 @@ -30,6 +32,7 @@ require ( ) require ( + cloud.google.com/go/compute v1.23.4 // indirect github.com/ThreeDotsLabs/watermill v1.3.5 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -91,14 +94,12 @@ require ( github.com/uptrace/opentelemetry-go-extra/otelutil v0.2.3 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect go.opentelemetry.io/contrib/propagators/b3 v1.22.0 // indirect - go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.22.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.22.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.22.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.22.0 // indirect go.opentelemetry.io/otel/metric v1.24.0 // indirect go.opentelemetry.io/otel/sdk v1.24.0 // indirect - go.opentelemetry.io/otel/trace v1.24.0 // indirect go.opentelemetry.io/proto/otlp v1.0.0 // indirect go.uber.org/dig v1.17.1 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/ee/agent/go.sum b/ee/agent/go.sum index 8319857d4d..b95d8a842b 100644 --- a/ee/agent/go.sum +++ b/ee/agent/go.sum @@ -1,6 +1,6 @@ -cloud.google.com/go v0.110.10 h1:LXy9GEO+timppncPIAZoOj3l58LIU9k+kn48AN7IO3Y= -cloud.google.com/go/compute v1.23.3 h1:6sVlXXBmbd7jNX0Ipq0trII3e4n1/MsADLK6a+aiVlk= -cloud.google.com/go/compute v1.23.3/go.mod h1:VCgBUoMnIVIR0CscqQiPJLAG25E3ZRZMzcFZeQ+h8CI= +cloud.google.com/go v0.112.0 h1:tpFCD7hpHFlQ8yPwT3x+QeXqc2T6+n6T+hmABHfDUSM= +cloud.google.com/go/compute v1.23.4 h1:EBT9Nw4q3zyE7G45Wvv3MzolIrCJEuHys5muLY0wvAw= +cloud.google.com/go/compute v1.23.4/go.mod h1:/EJMj55asU6kAFnuZET8zqgwgJ9FvXWXOkkfQZa4ioI= cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= github.com/ThreeDotsLabs/watermill v1.3.5 h1:50JEPEhMGZQMh08ct0tfO1PsgMOAOhV3zxK2WofkbXg= @@ -217,8 +217,8 @@ github.com/uptrace/opentelemetry-go-extra/otelutil v0.2.3/go.mod h1:RvCYhPchLhvQ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.45.0 h1:RsQi0qJ2imFfCvZabqzM9cNXBG8k6gXMv1A0cXRmH6A= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.45.0/go.mod h1:vsh3ySueQCiKPxFLvjWC4Z135gIa34TQ/NSqkDTZYUM= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 h1:sv9kVfal0MK0wBMCOGr+HeJm9v803BkJxGrk2au7j08= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0/go.mod h1:SK2UL73Zy1quvRPonmOmRDiWk1KBV3LyIeeIxcEApWw= go.opentelemetry.io/contrib/propagators/b3 v1.22.0 h1:Okbgv0pWHMQq+mF7H2o1mucJ5PvxKFq2c8cyqoXfeaQ= diff --git a/ee/agent/helm/values.yaml b/ee/agent/helm/values.yaml index ff58b70cac..1abc696293 100644 --- a/ee/agent/helm/values.yaml +++ b/ee/agent/helm/values.yaml @@ -56,6 +56,10 @@ server: enabled: true insecureSkipVerify: true +config: + monitoring: + serviceName: agent + agent: id: "b7549a16-f74a-4815-ab1e-bb8ef1c3833b" baseUrl: "" diff --git a/ee/agent/internal/membership_client.go b/ee/agent/internal/membership_client.go index c55fb92186..b2328dd8e4 100644 --- a/ee/agent/internal/membership_client.go +++ b/ee/agent/internal/membership_client.go @@ -6,9 +6,11 @@ import ( "time" "github.com/formancehq/stack/components/agent/internal/generated" - sharedlogging "github.com/formancehq/stack/libs/go-libs/logging" + "github.com/formancehq/stack/libs/go-libs/logging" + "github.com/pkg/errors" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -66,28 +68,42 @@ func (c *membershipClient) connectMetadata(ctx context.Context, modules []string return md, nil } +func LoggingClientStreamInterceptor(l logging.Logger) grpc.StreamClientInterceptor { + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + span := trace.SpanFromContext(ctx) + + logging.FromContext(ctx). + WithField("traceId", span.SpanContext().TraceID()). + WithField("spanId", span.SpanContext().SpanID()). + WithField("method", method). + Infof("Starting stream") + return streamer(logging.ContextWithLogger(ctx, l), desc, cc, method, opts...) + } +} + func (c *membershipClient) connect(ctx context.Context, modules []string, eeModules []string) error { - sharedlogging.FromContext(ctx).WithFields(map[string]any{ + logging.FromContext(ctx).WithFields(map[string]any{ "id": c.clientInfo.ID, }).Infof("Establish connection to server") c.connectContext, c.connectCancel = context.WithCancel(ctx) opts := append(c.opts, - grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), - grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()), + grpc.WithChainStreamInterceptor( + LoggingClientStreamInterceptor(logging.FromContext(ctx)), + ), + grpc.WithStatsHandler(otelgrpc.NewClientHandler()), ) conn, err := grpc.Dial(c.address, opts...) if err != nil { return err } - sharedlogging.FromContext(ctx).Info("Connected to GRPC server!") + c.serverClient = generated.NewServerClient(conn) md, err := c.connectMetadata(ctx, modules, eeModules) if err != nil { return err } - connectContext := metadata.NewOutgoingContext(c.connectContext, md) connectClient, err := c.serverClient.Join(connectContext) if err != nil { @@ -113,7 +129,7 @@ func (c *membershipClient) sendPong(ctx context.Context) { Pong: &generated.Pong{}, }, }); err != nil { - sharedlogging.FromContext(ctx).Errorf("Unable to send pong to server: %s", err) + logging.FromContext(ctx).Errorf("Unable to send pong to server: %s", err) if errors.Is(err, io.EOF) { panic(err) } @@ -191,7 +207,7 @@ func (c *membershipClient) Start(ctx context.Context) error { } <-time.After(50 * time.Millisecond) case err := <-errCh: - sharedlogging.FromContext(ctx).Errorf("Stream closed with error: %s", err) + logging.FromContext(ctx).Errorf("Stream closed with error: %s", err) return err } } diff --git a/ee/agent/internal/membership_listener.go b/ee/agent/internal/membership_listener.go index 25a5cf431d..b840e5378a 100644 --- a/ee/agent/internal/membership_listener.go +++ b/ee/agent/internal/membership_listener.go @@ -11,6 +11,8 @@ import ( "strings" "sync" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" @@ -19,7 +21,6 @@ import ( "github.com/formancehq/stack/components/agent/internal/generated" "github.com/formancehq/stack/libs/go-libs/collectionutils" "github.com/formancehq/stack/libs/go-libs/logging" - sharedlogging "github.com/formancehq/stack/libs/go-libs/logging" "github.com/pkg/errors" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -91,15 +92,24 @@ func (c *membershipListener) Start(ctx context.Context) { } c.wp.Submit(func() { - sharedlogging.FromContext(ctx).Infof("Got message from membership: %T", msg.GetMessage()) + ctx, span := otel.GetTracerProvider().Tracer("com.formance.agent").Start(ctx, "newOrder", trace.WithNewRoot()) + defer span.End() + logging.FromContext(ctx). + WithField("traceId", span.SpanContext().TraceID()). + WithField("spanId", span.SpanContext().SpanID()). + Infof("Got message from membership: %T", msg.GetMessage()) switch msg := msg.Message.(type) { case *generated.Order_ExistingStack: + span.SetName("syncExistingStack") c.syncExistingStack(ctx, msg.ExistingStack) case *generated.Order_DeletedStack: + span.SetName("deleteStack") c.deleteStack(ctx, msg.DeletedStack) case *generated.Order_DisabledStack: + span.SetName("disableStack") c.disableStack(ctx, msg.DisabledStack) case *generated.Order_EnabledStack: + span.SetName("enableStack") c.enableStack(ctx, msg.EnabledStack) } }) @@ -108,7 +118,6 @@ func (c *membershipListener) Start(ctx context.Context) { } func (c *membershipListener) syncExistingStack(ctx context.Context, membershipStack *generated.Stack) { - versions := membershipStack.Versions if versions == "" { versions = "default" @@ -125,7 +134,7 @@ func (c *membershipListener) syncExistingStack(ctx context.Context, membershipSt }, }) if err != nil { - sharedlogging.FromContext(ctx).Errorf("Unable to create stack cluster side: %s", err) + logging.FromContext(ctx).Errorf("Unable to create stack cluster side: %s", err) return } @@ -133,7 +142,7 @@ func (c *membershipListener) syncExistingStack(ctx context.Context, membershipSt c.syncStargate(ctx, metadata, stack, membershipStack) c.syncAuthClients(ctx, metadata, stack, membershipStack.StaticClients) - sharedlogging.FromContext(ctx).Infof("Stack %s updated cluster side", stack.GetName()) + logging.FromContext(ctx).Infof("Stack %s updated cluster side", stack.GetName()) } func (c *membershipListener) generateMetadata(membershipStack *generated.Stack) map[string]any { @@ -247,11 +256,11 @@ func (c *membershipListener) syncStargate(ctx context.Context, metadata map[stri }, }, }); err != nil { - sharedlogging.FromContext(ctx).Errorf("Unable to create module Stargate cluster side: %s", err) + logging.FromContext(ctx).Errorf("Unable to create module Stargate cluster side: %s", err) } } else { if err := c.client.EnsureNotExists(ctx, "Stargates", stargateName); err != nil { - sharedlogging.FromContext(ctx).Errorf("Unable to delete module Stargate cluster side: %s", err) + logging.FromContext(ctx).Errorf("Unable to delete module Stargate cluster side: %s", err) } } } @@ -268,7 +277,7 @@ func (c *membershipListener) syncAuthClients(ctx context.Context, metadata map[s }, }) if err != nil { - sharedlogging.FromContext(ctx).Errorf("Unable to create AuthClient cluster side: %s", err) + logging.FromContext(ctx).Errorf("Unable to create AuthClient cluster side: %s", err) continue } expectedAuthClients = append(expectedAuthClients, authClient) @@ -276,7 +285,7 @@ func (c *membershipListener) syncAuthClients(ctx context.Context, metadata map[s authClients, err := c.client.List(ctx, "AuthClients", stackLabels(stack.GetName())) if err != nil { - sharedlogging.FromContext(ctx).Errorf("Unable to list AuthClient cluster side: %s", err) + logging.FromContext(ctx).Errorf("Unable to list AuthClient cluster side: %s", err) return } @@ -290,43 +299,43 @@ func (c *membershipListener) syncAuthClients(ctx context.Context, metadata map[s }, []string{}) for _, name := range authClientsToDelete { - sharedlogging.FromContext(ctx).Infof("Deleting AuthClient %s", name) + logging.FromContext(ctx).Infof("Deleting AuthClient %s", name) if err := c.client.EnsureNotExists(ctx, "AuthClients", name); err != nil { - sharedlogging.FromContext(ctx).Errorf("Unable to delete AuthClient %s cluster side: %s", name, err) + logging.FromContext(ctx).Errorf("Unable to delete AuthClient %s cluster side: %s", name, err) } } } func (c *membershipListener) deleteStack(ctx context.Context, stack *generated.DeletedStack) { if err := c.client.EnsureNotExists(ctx, "Stacks", stack.ClusterName); err != nil { - sharedlogging.FromContext(ctx).Errorf("Deleting cluster side: %s", err) + logging.FromContext(ctx).Errorf("Deleting cluster side: %s", err) return } - sharedlogging.FromContext(ctx).Infof("Stack %s deleted", stack.ClusterName) + logging.FromContext(ctx).Infof("Stack %s deleted", stack.ClusterName) } func (c *membershipListener) disableStack(ctx context.Context, stack *generated.DisabledStack) { if err := c.client.Patch(ctx, "Stacks", stack.ClusterName, []byte(`{"spec": {"disabled": true}}`)); err != nil { - sharedlogging.FromContext(ctx).Errorf("Disabling cluster side: %s", err) + logging.FromContext(ctx).Errorf("Disabling cluster side: %s", err) return } - sharedlogging.FromContext(ctx).Infof("Stack %s disabled", stack.ClusterName) + logging.FromContext(ctx).Infof("Stack %s disabled", stack.ClusterName) } func (c *membershipListener) enableStack(ctx context.Context, stack *generated.EnabledStack) { if err := c.client.Patch(ctx, "Stacks", stack.ClusterName, []byte(`{"spec": {"disabled": false}}`)); err != nil { - sharedlogging.FromContext(ctx).Errorf("Disabling cluster side: %s", err) + logging.FromContext(ctx).Errorf("Disabling cluster side: %s", err) return } - sharedlogging.FromContext(ctx).Infof("Stack %s enabled", stack.ClusterName) + logging.FromContext(ctx).Infof("Stack %s enabled", stack.ClusterName) } func (c *membershipListener) createOrUpdate(ctx context.Context, gvk schema.GroupVersionKind, name string, stackName string, owner *metav1.OwnerReference, content map[string]any) (*unstructured.Unstructured, error) { - logger := sharedlogging.FromContext(ctx).WithFields(map[string]any{ + logger := logging.FromContext(ctx).WithFields(map[string]any{ "gvk": gvk, }) logger.Infof("creating object '%s'", name) diff --git a/ee/agent/internal/module.go b/ee/agent/internal/module.go index d2bcc279f3..a293c959a4 100644 --- a/ee/agent/internal/module.go +++ b/ee/agent/internal/module.go @@ -180,7 +180,7 @@ func runMembershipClient(lc fx.Lifecycle, membershipClient *membershipClient, lo return err } go func() { - if err := membershipClient.Start(logging.ContextWithLogger(context.Background(), logger)); err != nil { + if err := membershipClient.Start(logging.ContextWithLogger(ctx, logger)); err != nil { panic(err) } }() @@ -193,7 +193,7 @@ func runMembershipClient(lc fx.Lifecycle, membershipClient *membershipClient, lo func runMembershipListener(lc fx.Lifecycle, client *membershipListener, logger logging.Logger) { lc.Append(fx.Hook{ OnStart: func(ctx context.Context) error { - go client.Start(logging.ContextWithLogger(context.Background(), logger)) + go client.Start(logging.ContextWithLogger(ctx, logger)) return nil }, }) diff --git a/helm/Earthfile b/helm/Earthfile index 816f064397..69f2cb5a25 100644 --- a/helm/Earthfile +++ b/helm/Earthfile @@ -5,22 +5,30 @@ IMPORT .. AS stack IMPORT ../components/operator AS operator sources: - FROM core+base-image - WORKDIR /src - COPY --dir regions regions - SAVE ARTIFACT /src + FROM core+base-image + WORKDIR /src + COPY --dir regions regions + SAVE ARTIFACT /src helm-validate: - FROM core+helm-base + FROM core+helm-base - WORKDIR /src/helm - COPY . . - FOR chart IN $(ls -d */) - BUILD ./$chart+helm-validate - END + WORKDIR /src/helm + COPY . . + FOR chart IN $(ls -d */) + IF [ "$chart" != "libs/" ] + RUN echo "Validating $chart" + WORKDIR /src/helm + COPY ./$chart $chart + WORKDIR /src/helm/$chart + RUN helm dependency update + DO --pass-args core+HELM_VALIDATE + END + END pre-commit: - BUILD --pass-args ./regions+helm-validate + BUILD --pass-args ./regions+helm-validate + BUILD --pass-args ./libs+helm-validate publish: - BUILD --pass-args ./regions+helm-publish \ No newline at end of file + BUILD --pass-args ./regions+helm-publish \ No newline at end of file diff --git a/helm/libs/Earthfile b/helm/libs/Earthfile new file mode 100644 index 0000000000..bcc21d975d --- /dev/null +++ b/helm/libs/Earthfile @@ -0,0 +1,19 @@ +VERSION 0.8 + +IMPORT github.com/formancehq/earthly:tags/v0.12.0 AS core + +sources: + FROM core+base-image + WORKDIR /src + COPY --dir core . + SAVE ARTIFACT /src + +helm-validate: + FROM core+helm-base + WORKDIR /src + COPY (+sources/*) . + FOR chart IN $(ls -d */) + WORKDIR /src/$chart + RUN helm lint . + END + SAVE ARTIFACT /src/* AS LOCAL . \ No newline at end of file diff --git a/helm/libs/core/Chart.yaml b/helm/libs/core/Chart.yaml new file mode 100644 index 0000000000..ea5b07d58e --- /dev/null +++ b/helm/libs/core/Chart.yaml @@ -0,0 +1,14 @@ +apiVersion: v2 +name: core +description: Formance Core Helm Chart +home: "https://formance.com" +sources: + - "https://github.com/formancehq/stack" +maintainers: + - name: "Formance Team" + email: "support@formance.com" +icon: "https://avatars.githubusercontent.com/u/84325077?s=200&v=4" + +type: library +version: 0.1.0 +appVersion: "latest" diff --git a/helm/libs/core/templates/_helpers.tpl b/helm/libs/core/templates/_helpers.tpl new file mode 100644 index 0000000000..b384ea814c --- /dev/null +++ b/helm/libs/core/templates/_helpers.tpl @@ -0,0 +1,45 @@ +{{/* + .Values: Values to search within + .Key: Key to find in the .config. then in . + .Default: default an object where each key is a string +*/}} +{{- define "resolveGlobalOrServiceValue" -}} + {{- $values := .Values -}} + {{- $key := .Key -}} + {{- $default := .Default -}} + + {{- $keys := splitList "." $key -}} + + + {{- $configkeys := splitList "." (print "config." $key) -}} + {{- $subchartValue := $values -}} + {{- $found := true -}} + {{- range $configkeys -}} + {{- if hasKey $subchartValue . -}} + {{- $subchartValue = index $subchartValue . -}} + {{- else -}} + {{- $found = false -}} + {{- break -}} + {{- end -}} + {{- end -}} + + {{- if not $found -}} + {{- $subchartValue = $values.global -}} + {{- $found = true -}} + {{- range $keys -}} + {{- if hasKey $subchartValue . -}} + {{- $subchartValue = index $subchartValue . -}} + {{- else -}} + {{- $subchartValue = $default -}} + {{- $found = false -}} + {{- break -}} + {{- end -}} + {{- end -}} + {{- end -}} + + {{- if not $found -}} + {{- $subchartValue = $default -}} + {{- end -}} + + {{- $subchartValue -}} +{{- end -}} diff --git a/helm/libs/core/templates/_monitoring.tpl b/helm/libs/core/templates/_monitoring.tpl new file mode 100644 index 0000000000..ac06b7f7d7 --- /dev/null +++ b/helm/libs/core/templates/_monitoring.tpl @@ -0,0 +1,98 @@ +{{/** + This now can be included in every chart folowing: + + global: + monitoring: + traces: + enabled: true + endpoint: "localhost" + exporter: "otlp" + insecure: "true" + mode: "grpc" + port: 4317 + logs: + enabled: true + level: "info" + format: "json" + metrics: + enabled: true + exporter: "otlp" + insecure: "true" + mode: "grpc" + port: 4317 + + monitoring: + serviceName: "" + + Each component who want to use monitoring should include this snippet in their deployment.yaml + +**/}} + + +{{- define "core.monitoring.traces" }} +- name: OTEL_TRACES + value: {{ include "resolveGlobalOrServiceValue" (dict "Values" .Values "Key" "monitoring.traces.enabled" "Default" "") | quote}} +- name: OTEL_TRACES_ENDPOINT + value: {{ include "resolveGlobalOrServiceValue" (dict "Values" .Values "Key" "monitoring.traces.endpoint" "Default" "")| quote }} +- name: OTEL_TRACES_EXPORTER + value: {{ include "resolveGlobalOrServiceValue" (dict "Values" .Values "Key" "monitoring.traces.exporter" "Default" "") | quote }} +- name: OTEL_TRACES_EXPORTER_OTLP_INSECURE + value: {{ include "resolveGlobalOrServiceValue" (dict "Values" .Values "Key" "monitoring.traces.insecure" "Default" "") | quote}} +- name: OTEL_TRACES_EXPORTER_OTLP_MODE + value: {{ include "resolveGlobalOrServiceValue" (dict "Values" .Values "Key" "monitoring.traces.mode" "Default" "") | quote}} +- name: OTEL_TRACES_PORT + value: {{ include "resolveGlobalOrServiceValue" (dict "Values" .Values "Key" "monitoring.traces.port" "Default" "") | quote }} +- name: OTEL_TRACES_EXPORTER_OTLP_ENDPOINT + value: "$(OTEL_TRACES_ENDPOINT):$(OTEL_TRACES_PORT)" +{{- end }} + +{{- define "core.monitoring.metrics" }} +- name: OTEL_METRICS + value: {{ include "resolveGlobalOrServiceValue" (dict "Values" .Values "Key" "monitoring.metrics.enabled" "Default" "") | quote }} +- name: OTEL_METRICS_ENDPOINT + value: {{ include "resolveGlobalOrServiceValue" (dict "Values" .Values "Key" "monitoring.metrics.endpoint" "Default" "") | quote}} +- name: OTEL_METRICS_EXPORTER + value: {{ include "resolveGlobalOrServiceValue" (dict "Values" .Values "Key" "monitoring.metrics.exporter" "Default" "") | quote}} +- name: OTEL_METRICS_EXPORTER_OTLP_INSECURE + value: {{ include "resolveGlobalOrServiceValue" (dict "Values" .Values "Key" "monitoring.metrics.insecure" "Default" "") | quote }} +- name: OTEL_METRICS_EXPORTER_OTLP_MODE + value: {{ include "resolveGlobalOrServiceValue" (dict "Values" .Values "Key" "monitoring.metrics.mode" "Default" "") | quote }} +- name: OTEL_METRICS_PORT + value: {{ include "resolveGlobalOrServiceValue" (dict "Values" .Values "Key" "monitoring.metrics.port" "Default" "") | quote }} +- name: OTEL_METRICS_EXPORTER_OTLP_ENDPOINT + value: "$(OTEL_TRACES_ENDPOINT):$(OTEL_METRICS_PORT)" +{{- end -}} + +{{- define "core.monitoring.logs" }} +- name: LOGS_ENABLED + value: {{ include "resolveGlobalOrServiceValue" (dict "Values" .Values "Key" "monitoring.logs.enabled" "Default" "") | quote }} +- name: LOGS_LEVEL + value: {{ include "resolveGlobalOrServiceValue" (dict "Values" .Values "Key" "monitoring.logs.level" "Default" "")| quote }} +{{- end -}} + + +{{- define "core.monitoring.common" -}} +- name: OTEL_SERVICE_NAME + value: "{{ .Values.config.monitoring.serviceName | default .Release.Name }}" +{{- if eq (include "resolveGlobalOrServiceValue" (dict "Values" .Values "Key" "monitoring.logs.format" "Default" "")) "json" }} +- name: JSON_FORMATTING_LOGGER + value: "true" +{{- end -}} +{{- end -}} + +{{- define "core.monitoring" -}} +{{- include "core.monitoring.common" . }} +{{- $traces := include "resolveGlobalOrServiceValue" (dict "Values" .Values "Key" "monitoring.traces.enabled" "Default" "") }} +{{- $logs := include "resolveGlobalOrServiceValue" (dict "Values" .Values "Key" "monitoring.logs.enabled" "Default" "") }} +{{- $metrics := include "resolveGlobalOrServiceValue" (dict "Values" .Values "Key" "monitoring.metrics.enabled" "Default" "") }} +{{- if eq $traces "true" }} +{{- include "core.monitoring.traces" . }} +{{- end }} +{{- if eq $logs "true" }} +{{- include "core.monitoring.logs" . }} +{{- end }} +{{- if eq $metrics "true" }} +{{- include "core.monitoring.metrics" . }} +{{- end }} +{{- end -}} + diff --git a/libs/go-libs/otlp/otlptraces/traces.go b/libs/go-libs/otlp/otlptraces/traces.go index 14876465f1..a1eb2aea9a 100644 --- a/libs/go-libs/otlp/otlptraces/traces.go +++ b/libs/go-libs/otlp/otlptraces/traces.go @@ -52,19 +52,20 @@ func TracesModule(cfg ModuleConfig) fx.Option { options = append(options, fx.Supply(cfg), otlp.LoadResource(cfg.ServiceName, cfg.ResourceAttributes), - fx.Provide(func(tp *tracesdk.TracerProvider) trace.TracerProvider { return tp }), fx.Provide(fx.Annotate(func(options ...tracesdk.TracerProviderOption) *tracesdk.TracerProvider { return tracesdk.NewTracerProvider(options...) }, fx.ParamTags(TracerProviderOptionKey))), - fx.Invoke(func(lc fx.Lifecycle, tracerProvider *tracesdk.TracerProvider) { + fx.Invoke(func(tp *tracesdk.TracerProvider) trace.TracerProvider { + otel.SetTracerProvider(tp) + // set global propagator to tracecontext (the default is no-op). otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( b3.New(), propagation.TraceContext{})) // B3 format is common and used by zipkin. Always enabled right now. + + return tp + }), + fx.Invoke(func(lc fx.Lifecycle, tracerProvider *tracesdk.TracerProvider) { lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - otel.SetTracerProvider(tracerProvider) - return nil - }, OnStop: func(ctx context.Context) error { return tracerProvider.Shutdown(ctx) },