Skip to content

Commit

Permalink
feat(agent): add otel grpc stats handler client side (#1535)
Browse files Browse the repository at this point in the history
Co-authored-by: David Ragot <[email protected]>
  • Loading branch information
Dav-14 and David Ragot committed Jun 6, 2024
1 parent 74af6f2 commit 0bd9627
Show file tree
Hide file tree
Showing 17 changed files with 334 additions and 100 deletions.
8 changes: 7 additions & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,10 @@ root = true
end_of_line = lf
charset = utf-8
indent_style = space
indent_size = 4
indent_size = 4

[*.tpl]
end_of_line = lf
charset = utf-8
indent_style = space
indent_size = 2
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,7 @@ openapi/node_modules

.aws

.kube
.kube

# Helm
*.tgz
8 changes: 7 additions & 1 deletion ee/agent/.earthly/values.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
global:
monitoring:
traces:
enabled: true
mode: grpc
exporter: otlp
endpoint: otel-collector-opentelemetry-collector.formance.svc.cluster.local
port: 4317
insecure: true
logs:
format: ""

image:
pullPolicy: Always

Expand Down
12 changes: 10 additions & 2 deletions ee/agent/Earthfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ VERSION 0.8
IMPORT github.com/formancehq/earthly:tags/v0.12.0 AS core

IMPORT ../.. AS stack
IMPORT ../../helm/libs AS helm-libs
IMPORT .. AS ee

FROM core+base-image
Expand Down Expand Up @@ -136,14 +137,21 @@ tests:
helm-validate:
FROM core+helm-base
WORKDIR /src
COPY helm .

COPY (helm-libs+sources/*) helm/libs/
COPY --dir helm ee/agent/

WORKDIR /src/ee/agent/helm
RUN helm dependencies update
DO --pass-args core+HELM_VALIDATE
SAVE ARTIFACT /src AS LOCAL helm
SAVE ARTIFACT /src/ee/agent/helm AS LOCAL helm

helm-package:
FROM +helm-validate
RUN helm package .
SAVE ARTIFACT /src

SAVE ARTIFACT . AS LOCAL helm

release:
BUILD --pass-args stack+goreleaser --path=ee/agent
78 changes: 37 additions & 41 deletions ee/agent/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/formancehq/stack/components/agent/internal"
"github.com/formancehq/stack/libs/go-libs/licence"
sharedlogging "github.com/formancehq/stack/libs/go-libs/logging"
sharedotlptraces "github.com/formancehq/stack/libs/go-libs/otlp/otlptraces"
"github.com/formancehq/stack/libs/go-libs/otlp/otlptraces"
"github.com/formancehq/stack/libs/go-libs/service"
"github.com/pkg/errors"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -50,12 +50,6 @@ const (
resyncPeriod = "resync-period"
)

func init() {
if err := v1beta1.AddToScheme(scheme.Scheme); err != nil {
panic(err)
}
}

var rootCmd = &cobra.Command{
SilenceUsage: true,
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
Expand All @@ -64,9 +58,41 @@ var rootCmd = &cobra.Command{
RunE: runAgent,
}

func exitWithCode(code int, v ...any) {
fmt.Fprintln(os.Stdout, v...)
os.Exit(code)
func init() {
if err := v1beta1.AddToScheme(scheme.Scheme); err != nil {
panic(err)
}

var kubeConfigFilePath string
if home := homedir.HomeDir(); home != "" {
kubeConfigFilePath = filepath.Join(home, ".kube", "config")
}

service.BindFlags(rootCmd)
otlptraces.InitOTLPTracesFlags(rootCmd.PersistentFlags())
licence.InitCLIFlags(rootCmd)

rootCmd.Flags().String(kubeConfigFlag, kubeConfigFilePath, "")
rootCmd.Flags().String(serverAddressFlag, "localhost:8081", "")
rootCmd.Flags().Bool(tlsEnabledFlag, false, "")
rootCmd.Flags().Bool(tlsInsecureSkipVerifyFlag, false, "")
rootCmd.Flags().String(tlsCACertificateFlag, "", "")
rootCmd.Flags().String(idFlag, "", "")
rootCmd.Flags().String(authenticationModeFlag, "", "")
rootCmd.Flags().String(authenticationTokenFlag, "", "")
rootCmd.Flags().String(authenticationClientSecretFlag, "", "")
rootCmd.Flags().String(authenticationIssuerFlag, "", "")
rootCmd.Flags().String(baseUrlFlag, "", "")
rootCmd.Flags().Bool(productionFlag, false, "Is a production agent")
rootCmd.Flags().Duration(resyncPeriod, 5*time.Minute, "Resync period of K8S resources")
rootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
}

func Execute() {
if err := rootCmd.Execute(); err != nil {
fmt.Fprintln(os.Stdout, err)
os.Exit(1)
}
}

func runAgent(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -117,7 +143,7 @@ func runAgent(cmd *cobra.Command, args []string) error {
Production: viper.GetBool(productionFlag),
Version: Version,
}, viper.GetDuration(resyncPeriod), dialOptions...),
sharedotlptraces.CLITracesModule(),
otlptraces.CLITracesModule(),
licence.CLIModule(ServiceName),
}

Expand Down Expand Up @@ -175,33 +201,3 @@ func createGRPCTransportCredentials(ctx context.Context) (credentials.TransportC
}
return credential, nil
}

func Execute() {
if err := rootCmd.Execute(); err != nil {
exitWithCode(1, err)
}
}

func init() {
var kubeConfigFilePath string
if home := homedir.HomeDir(); home != "" {
kubeConfigFilePath = filepath.Join(home, ".kube", "config")
}

service.BindFlags(rootCmd)
licence.InitCLIFlags(rootCmd)
rootCmd.Flags().String(kubeConfigFlag, kubeConfigFilePath, "")
rootCmd.Flags().String(serverAddressFlag, "localhost:8081", "")
rootCmd.Flags().Bool(tlsEnabledFlag, false, "")
rootCmd.Flags().Bool(tlsInsecureSkipVerifyFlag, false, "")
rootCmd.Flags().String(tlsCACertificateFlag, "", "")
rootCmd.Flags().String(idFlag, "", "")
rootCmd.Flags().String(authenticationModeFlag, "", "")
rootCmd.Flags().String(authenticationTokenFlag, "", "")
rootCmd.Flags().String(authenticationClientSecretFlag, "", "")
rootCmd.Flags().String(authenticationIssuerFlag, "", "")
rootCmd.Flags().String(baseUrlFlag, "", "")
rootCmd.Flags().Bool(productionFlag, false, "Is a production agent")
rootCmd.Flags().Duration(resyncPeriod, 5*time.Minute, "Resync period of K8S resources")
rootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
}
7 changes: 4 additions & 3 deletions ee/agent/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ require (
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.9.0
github.com/zitadel/oidc/v2 v2.12.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.45.0
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
go.uber.org/fx v1.20.1
go.uber.org/mock v0.4.0
golang.org/x/oauth2 v0.16.0
Expand All @@ -30,6 +32,7 @@ require (
)

require (
cloud.google.com/go/compute v1.23.4 // indirect
github.com/ThreeDotsLabs/watermill v1.3.5 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
Expand Down Expand Up @@ -91,14 +94,12 @@ require (
github.com/uptrace/opentelemetry-go-extra/otelutil v0.2.3 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect
go.opentelemetry.io/contrib/propagators/b3 v1.22.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.22.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.22.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.22.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.22.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/dig v1.17.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
10 changes: 5 additions & 5 deletions ee/agent/go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cloud.google.com/go v0.110.10 h1:LXy9GEO+timppncPIAZoOj3l58LIU9k+kn48AN7IO3Y=
cloud.google.com/go/compute v1.23.3 h1:6sVlXXBmbd7jNX0Ipq0trII3e4n1/MsADLK6a+aiVlk=
cloud.google.com/go/compute v1.23.3/go.mod h1:VCgBUoMnIVIR0CscqQiPJLAG25E3ZRZMzcFZeQ+h8CI=
cloud.google.com/go v0.112.0 h1:tpFCD7hpHFlQ8yPwT3x+QeXqc2T6+n6T+hmABHfDUSM=
cloud.google.com/go/compute v1.23.4 h1:EBT9Nw4q3zyE7G45Wvv3MzolIrCJEuHys5muLY0wvAw=
cloud.google.com/go/compute v1.23.4/go.mod h1:/EJMj55asU6kAFnuZET8zqgwgJ9FvXWXOkkfQZa4ioI=
cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA=
github.com/ThreeDotsLabs/watermill v1.3.5 h1:50JEPEhMGZQMh08ct0tfO1PsgMOAOhV3zxK2WofkbXg=
Expand Down Expand Up @@ -217,8 +217,8 @@ github.com/uptrace/opentelemetry-go-extra/otelutil v0.2.3/go.mod h1:RvCYhPchLhvQ
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.45.0 h1:RsQi0qJ2imFfCvZabqzM9cNXBG8k6gXMv1A0cXRmH6A=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.45.0/go.mod h1:vsh3ySueQCiKPxFLvjWC4Z135gIa34TQ/NSqkDTZYUM=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 h1:sv9kVfal0MK0wBMCOGr+HeJm9v803BkJxGrk2au7j08=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0/go.mod h1:SK2UL73Zy1quvRPonmOmRDiWk1KBV3LyIeeIxcEApWw=
go.opentelemetry.io/contrib/propagators/b3 v1.22.0 h1:Okbgv0pWHMQq+mF7H2o1mucJ5PvxKFq2c8cyqoXfeaQ=
Expand Down
4 changes: 4 additions & 0 deletions ee/agent/helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ server:
enabled: true
insecureSkipVerify: true

config:
monitoring:
serviceName: agent

agent:
id: "b7549a16-f74a-4815-ab1e-bb8ef1c3833b"
baseUrl: ""
Expand Down
32 changes: 24 additions & 8 deletions ee/agent/internal/membership_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"time"

"github.com/formancehq/stack/components/agent/internal/generated"
sharedlogging "github.com/formancehq/stack/libs/go-libs/logging"
"github.com/formancehq/stack/libs/go-libs/logging"

"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
Expand Down Expand Up @@ -66,28 +68,42 @@ func (c *membershipClient) connectMetadata(ctx context.Context, modules []string
return md, nil
}

func LoggingClientStreamInterceptor(l logging.Logger) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
span := trace.SpanFromContext(ctx)

logging.FromContext(ctx).
WithField("traceId", span.SpanContext().TraceID()).
WithField("spanId", span.SpanContext().SpanID()).
WithField("method", method).
Infof("Starting stream")
return streamer(logging.ContextWithLogger(ctx, l), desc, cc, method, opts...)
}
}

func (c *membershipClient) connect(ctx context.Context, modules []string, eeModules []string) error {
sharedlogging.FromContext(ctx).WithFields(map[string]any{
logging.FromContext(ctx).WithFields(map[string]any{
"id": c.clientInfo.ID,
}).Infof("Establish connection to server")
c.connectContext, c.connectCancel = context.WithCancel(ctx)

opts := append(c.opts,
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()),
grpc.WithChainStreamInterceptor(
LoggingClientStreamInterceptor(logging.FromContext(ctx)),
),
grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
)
conn, err := grpc.Dial(c.address, opts...)
if err != nil {
return err
}
sharedlogging.FromContext(ctx).Info("Connected to GRPC server!")

c.serverClient = generated.NewServerClient(conn)

md, err := c.connectMetadata(ctx, modules, eeModules)
if err != nil {
return err
}

connectContext := metadata.NewOutgoingContext(c.connectContext, md)
connectClient, err := c.serverClient.Join(connectContext)
if err != nil {
Expand All @@ -113,7 +129,7 @@ func (c *membershipClient) sendPong(ctx context.Context) {
Pong: &generated.Pong{},
},
}); err != nil {
sharedlogging.FromContext(ctx).Errorf("Unable to send pong to server: %s", err)
logging.FromContext(ctx).Errorf("Unable to send pong to server: %s", err)
if errors.Is(err, io.EOF) {
panic(err)
}
Expand Down Expand Up @@ -191,7 +207,7 @@ func (c *membershipClient) Start(ctx context.Context) error {
}
<-time.After(50 * time.Millisecond)
case err := <-errCh:
sharedlogging.FromContext(ctx).Errorf("Stream closed with error: %s", err)
logging.FromContext(ctx).Errorf("Stream closed with error: %s", err)
return err
}
}
Expand Down
Loading

0 comments on commit 0bd9627

Please sign in to comment.