Skip to content

Commit

Permalink
feat: agent with operator v2 (#1161)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored Feb 6, 2024
1 parent 7ce86d9 commit a3783be
Show file tree
Hide file tree
Showing 26 changed files with 1,473 additions and 899 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ jobs:
- run: kubectl config use-context arn:aws:eks:eu-west-1:955332203423:cluster/staging-eu-west-1-hosting
- name: Deploy in staging
run: |
#kubectl set image deployment -n formance-system agent agent=ghcr.io/formancehq/agent:${GITHUB_SHA}-scratch
kubectl set image deployment -n formance-system agent agent=ghcr.io/formancehq/agent:${GITHUB_SHA}-scratch
kubectl set image deployment -n formance-system operator operator=ghcr.io/formancehq/operator:${GITHUB_SHA}-scratch
kubectl patch Versions.formance.com default -p "{\"spec\":{\"ledger\": \"${GITHUB_SHA}-scratch\"}}" --type=merge
kubectl patch Versions.formance.com default -p "{\"spec\":{\"payments\": \"${GITHUB_SHA}-scratch\"}}" --type=merge
Expand Down
26 changes: 19 additions & 7 deletions ee/agent/Earthfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ build-image:
ARG tag=latest
DO core+SAVE_IMAGE --COMPONENT=agent --REPOSITORY=${REPOSITORY} --TAG=$tag

tests:
FROM core+builder-image
COPY (+sources/*) /src
WORKDIR /src/ee/agent
DO --pass-args core+GO_TESTS

lint:
FROM core+builder-image
COPY (+sources/*) /src
Expand Down Expand Up @@ -92,4 +86,22 @@ grpc-generate:
RUN mkdir generated
COPY server.proto .
RUN protoc --go_out=generated --go_opt=paths=source_relative --go-grpc_out=generated --go-grpc_opt=paths=source_relative server.proto
SAVE ARTIFACT generated AS LOCAL internal/grpc/generated
SAVE ARTIFACT generated AS LOCAL internal/generated

tests:
FROM core+builder-image
RUN apk update && apk add bash
DO --pass-args core+GO_INSTALL --package=sigs.k8s.io/controller-runtime/tools/setup-envtest@latest
ENV ENVTEST_VERSION 1.28.0
RUN setup-envtest use $ENVTEST_VERSION -p path
ENV KUBEBUILDER_ASSETS /root/.local/share/kubebuilder-envtest/k8s/$ENVTEST_VERSION-linux-$(go env GOHOSTARCH)
DO --pass-args core+GO_INSTALL --package=github.com/onsi/ginkgo/v2/[email protected]
COPY --pass-args +sources/* /src
COPY --pass-args ../../components/operator+manifests/config /src/components/operator/config
WORKDIR /src/ee/agent
COPY tests tests
ARG GOPROXY
ARG focus
RUN --mount=type=cache,id=gomod,target=$GOPATH/pkg/mod \
--mount=type=cache,id=gobuild,target=/root/.cache/go-build \
ginkgo --focus=$focus ./tests/...
213 changes: 122 additions & 91 deletions ee/agent/cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package cmd

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net/url"
"os"
"path/filepath"

innerGrpc "github.com/formancehq/stack/components/agent/internal/grpc"
"github.com/formancehq/stack/components/agent/internal/k8s"
"github.com/formancehq/operator/api/formance.com/v1beta1"
"github.com/formancehq/stack/components/agent/internal"
sharedlogging "github.com/formancehq/stack/libs/go-libs/logging"
sharedotlptraces "github.com/formancehq/stack/libs/go-libs/otlp/otlptraces"
"github.com/formancehq/stack/libs/go-libs/service"
Expand All @@ -20,13 +17,17 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"net/url"
"os"
"path/filepath"
)

var (
ServiceName = "membership"
ServiceName = "agent"
Version = "develop"
BuildDate = "-"
Commit = "-"
Expand All @@ -48,111 +49,141 @@ const (
productionFlag = "production"
)

func newK8SConfig() (*rest.Config, error) {
config, err := rest.InClusterConfig()
if err != nil {
sharedlogging.Info("Does not seems to be in cluster, trying to load k8s client from kube config file")
config, err = clientcmd.BuildConfigFromFlags("", viper.GetString(kubeConfigFlag))
if err != nil {
return nil, err
}
func init() {
if err := v1beta1.AddToScheme(scheme.Scheme); err != nil {
panic(err)
}
return config, nil
}

var rootCmd = &cobra.Command{
SilenceUsage: true,
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
return bindFlagsToViper(cmd)
},
RunE: func(cmd *cobra.Command, args []string) error {
serverAddress := viper.GetString(serverAddressFlag)
if serverAddress == "" {
return errors.New("missing server address")
}
RunE: runAgent,
}

id := viper.GetString(idFlag)
if id == "" {
return errors.New("missing id")
}
func exitWithCode(code int, v ...any) {
fmt.Fprintln(os.Stdout, v...)
os.Exit(code)
}

dialOptions := make([]grpc.DialOption, 0)
var credential credentials.TransportCredentials
if !viper.GetBool(tlsEnabledFlag) {
sharedlogging.FromContext(cmd.Context()).Infof("TLS not enabled")
credential = insecure.NewCredentials()
} else {
sharedlogging.FromContext(cmd.Context()).Infof("TLS enabled")
certPool := x509.NewCertPool()
if ca := viper.GetString(tlsCACertificateFlag); ca != "" {
sharedlogging.FromContext(cmd.Context()).Infof("Load server certificate from config")
if !certPool.AppendCertsFromPEM([]byte(ca)) {
return fmt.Errorf("failed to add server CA's certificate")
}
}
func runAgent(cmd *cobra.Command, args []string) error {
serverAddress := viper.GetString(serverAddressFlag)
if serverAddress == "" {
return errors.New("missing server address")
}

if viper.GetBool(tlsInsecureSkipVerifyFlag) {
sharedlogging.FromContext(cmd.Context()).Infof("Disable certificate checks")
}
credential = credentials.NewTLS(&tls.Config{
InsecureSkipVerify: viper.GetBool(tlsInsecureSkipVerifyFlag),
RootCAs: certPool,
})
}
dialOptions = append(dialOptions, grpc.WithTransportCredentials(credential))
agentID := viper.GetString(idFlag)
if agentID == "" {
return errors.New("missing id")
}

baseUrlString := viper.GetString(baseUrlFlag)
if baseUrlString == "" {
return errors.New("missing base url")
}
baseUrl, err := url.Parse(baseUrlString)
credentials, err := createGRPCTransportCredentials(cmd.Context())
if err != nil {
return err
}

dialOptions := make([]grpc.DialOption, 0)
dialOptions = append(dialOptions, grpc.WithTransportCredentials(credentials))

baseUrlString := viper.GetString(baseUrlFlag)
if baseUrlString == "" {
return errors.New("missing base url")
}

baseUrl, err := url.Parse(baseUrlString)
if err != nil {
return err
}

authenticator, err := createAuthenticator(agentID)
if err != nil {
return err
}

options := []fx.Option{
fx.Provide(newK8SConfig),
fx.NopLogger,
internal.NewModule(serverAddress, authenticator, internal.ClientInfo{
ID: agentID,
BaseUrl: baseUrl,
Production: viper.GetBool(productionFlag),
Version: Version,
}, dialOptions...),
sharedotlptraces.CLITracesModule(viper.GetViper()),
}

return service.New(cmd.OutOrStdout(), options...).Run(cmd.Context())
}

func newK8SConfig() (*rest.Config, error) {
config, err := rest.InClusterConfig()
if err != nil {
sharedlogging.Info("Does not seems to be in cluster, trying to load k8s client from kube config file")
config, err = clientcmd.BuildConfigFromFlags("", viper.GetString(kubeConfigFlag))
if err != nil {
return err
return nil, err
}
}

var authenticator innerGrpc.Authenticator
switch viper.GetString(authenticationModeFlag) {
case "token":
token := viper.GetString(authenticationTokenFlag)
if token == "" {
return errors.New("missing authentication token")
}
authenticator = innerGrpc.TokenAuthenticator(token)
case "bearer":
clientSecret := viper.GetString(authenticationClientSecretFlag)
if clientSecret == "" {
return errors.New("missing client secret")
}
issuer := viper.GetString(authenticationIssuerFlag)
if issuer == "" {
return errors.New("missing issuer")
}
config.GroupVersion = &v1beta1.GroupVersion
config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
config.APIPath = "/apis"

authenticator = innerGrpc.BearerAuthenticator(issuer, id, clientSecret)
default:
return errors.New("authentication mode not specified")
}
return config, nil
}

options := []fx.Option{
fx.Provide(newK8SConfig),
fx.NopLogger,
k8s.NewModule(),
innerGrpc.NewModule(serverAddress, authenticator, innerGrpc.ClientInfo{
ID: id,
BaseUrl: baseUrl,
Production: viper.GetBool(productionFlag),
Version: Version,
}, dialOptions...),
sharedotlptraces.CLITracesModule(viper.GetViper()),
func createAuthenticator(agentID string) (internal.Authenticator, error) {
var authenticator internal.Authenticator
switch viper.GetString(authenticationModeFlag) {
case "token":
token := viper.GetString(authenticationTokenFlag)
if token == "" {
return nil, errors.New("missing authentication token")
}
authenticator = internal.TokenAuthenticator(token)
case "bearer":
clientSecret := viper.GetString(authenticationClientSecretFlag)
if clientSecret == "" {
return nil, errors.New("missing client secret")
}
issuer := viper.GetString(authenticationIssuerFlag)
if issuer == "" {
return nil, errors.New("missing issuer")
}

return service.New(cmd.OutOrStdout(), options...).Run(cmd.Context())
},
authenticator = internal.BearerAuthenticator(issuer, agentID, clientSecret)
default:
return nil, errors.New("authentication mode not specified")
}
return authenticator, nil
}

func exitWithCode(code int, v ...any) {
fmt.Fprintln(os.Stdout, v...)
os.Exit(code)
func createGRPCTransportCredentials(ctx context.Context) (credentials.TransportCredentials, error) {
var credential credentials.TransportCredentials
if !viper.GetBool(tlsEnabledFlag) {
sharedlogging.FromContext(ctx).Infof("TLS not enabled")
credential = insecure.NewCredentials()
} else {
sharedlogging.FromContext(ctx).Infof("TLS enabled")
certPool := x509.NewCertPool()
if ca := viper.GetString(tlsCACertificateFlag); ca != "" {
sharedlogging.FromContext(ctx).Infof("Load server certificate from config")
if !certPool.AppendCertsFromPEM([]byte(ca)) {
return nil, fmt.Errorf("failed to add server CA's certificate")
}
}

if viper.GetBool(tlsInsecureSkipVerifyFlag) {
sharedlogging.FromContext(ctx).Infof("Disable certificate checks")
}
credential = credentials.NewTLS(&tls.Config{
InsecureSkipVerify: viper.GetBool(tlsInsecureSkipVerifyFlag),
RootCAs: certPool,
})
}
return credential, nil
}

func Execute() {
Expand Down
Loading

0 comments on commit a3783be

Please sign in to comment.