Skip to content

Commit

Permalink
refactor: Provide additional kubernetes client to servers (#176)
Browse files Browse the repository at this point in the history
Signed-off-by: jannfis <[email protected]>
  • Loading branch information
jannfis authored Sep 9, 2024
1 parent 331e721 commit aefc4b6
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 39 deletions.
2 changes: 1 addition & 1 deletion cmd/principal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func NewPrincipalRunCommand() *cobra.Command {
observer(10 * time.Second)
}

s, err := principal.NewServer(ctx, kubeConfig.ApplicationsClientset, namespace, opts...)
s, err := principal.NewServer(ctx, kubeConfig, namespace, opts...)
if err != nil {
cmd.Fatal("Could not create new server instance: %v", err)
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/client/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ import (
"testing"
"time"

fakeappclient "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned/fake"

"github.com/argoproj-labs/argocd-agent/internal/auth"
"github.com/argoproj-labs/argocd-agent/internal/auth/userpass"
"github.com/argoproj-labs/argocd-agent/pkg/types"
"github.com/argoproj-labs/argocd-agent/principal"
"github.com/argoproj-labs/argocd-agent/test/fake/kube"
"github.com/argoproj-labs/argocd-agent/test/fake/testcerts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -37,7 +36,7 @@ func Test_Connect(t *testing.T) {
tempDir := t.TempDir()
basePath := path.Join(tempDir, "certs")
testcerts.WriteSelfSignedCert(t, "rsa", basePath, x509.Certificate{SerialNumber: big.NewInt(1)})
s, err := principal.NewServer(context.TODO(), fakeappclient.NewSimpleClientset(), "default",
s, err := principal.NewServer(context.TODO(), kube.NewKubernetesFakeClient(), "default",
principal.WithGRPC(true),
principal.WithListenerPort(0),
principal.WithTLSKeyPairFromPath(basePath+".crt", basePath+".key"),
Expand Down
5 changes: 3 additions & 2 deletions principal/apis/eventstream/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func NewServer(queues queue.QueuePair, opts ...ServerOption) *Server {

// newClientConnection returns a new client object to be used to read from and
// send to the subscription stream.
func newClientConnection(ctx context.Context, timeout time.Duration) (*client, error) {
func (s *Server) newClientConnection(ctx context.Context, timeout time.Duration) (*client, error) {
c := &client{}
c.wg = &sync.WaitGroup{}

Expand Down Expand Up @@ -122,6 +122,7 @@ func agentName(ctx context.Context) (string, error) {
if !ok {
return "", fmt.Errorf("invalid context: no agent name")
}
// TODO: check agentName for validity
return agentName, nil
}

Expand Down Expand Up @@ -256,7 +257,7 @@ func (s *Server) sendFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe
//
// Subscribe is called by GRPC machinery.
func (s *Server) Subscribe(subs eventstreamapi.EventStream_SubscribeServer) error {
c, err := newClientConnection(subs.Context(), s.options.MaxStreamDuration)
c, err := s.newClientConnection(subs.Context(), s.options.MaxStreamDuration)
if err != nil {
return err
}
Expand Down
28 changes: 14 additions & 14 deletions principal/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
"context"
"testing"

"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
fakeappclient "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned/fake"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/argoproj-labs/argocd-agent/internal/event"
"github.com/argoproj-labs/argocd-agent/pkg/types"
"github.com/argoproj-labs/argocd-agent/test/fake/kube"
wqmock "github.com/argoproj-labs/argocd-agent/test/mocks/k8s-workqueue"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -36,7 +36,7 @@ func Test_InvalidEvents(t *testing.T) {
item := "foo"
wq.On("Get").Return(&item, false)
wq.On("Done", &item)
s, err := NewServer(context.Background(), fakeappclient.NewSimpleClientset(), "argocd", WithGeneratedTokenSigningKey())
s, err := NewServer(context.Background(), kube.NewKubernetesFakeClient(), "argocd", WithGeneratedTokenSigningKey())
require.NoError(t, err)
err = s.processRecvQueue(context.Background(), "foo", wq)
assert.ErrorContains(t, err, "invalid data in queue")
Expand All @@ -49,7 +49,7 @@ func Test_InvalidEvents(t *testing.T) {
wq := wqmock.NewRateLimitingInterface(t)
wq.On("Get").Return(&ev, false)
wq.On("Done", &ev)
s, err := NewServer(context.Background(), fakeappclient.NewSimpleClientset(), "argocd", WithGeneratedTokenSigningKey())
s, err := NewServer(context.Background(), kube.NewKubernetesFakeClient(), "argocd", WithGeneratedTokenSigningKey())
require.NoError(t, err)
err = s.processRecvQueue(context.Background(), "foo", wq)
assert.ErrorContains(t, err, "unable to process event with unknown target")
Expand All @@ -62,7 +62,7 @@ func Test_InvalidEvents(t *testing.T) {
wq := wqmock.NewRateLimitingInterface(t)
wq.On("Get").Return(&ev, false)
wq.On("Done", &ev)
s, err := NewServer(context.Background(), fakeappclient.NewSimpleClientset(), "argocd", WithGeneratedTokenSigningKey())
s, err := NewServer(context.Background(), kube.NewKubernetesFakeClient(), "argocd", WithGeneratedTokenSigningKey())
require.NoError(t, err)
err = s.processRecvQueue(context.Background(), "foo", wq)
assert.ErrorContains(t, err, "unable to process event of type application")
Expand All @@ -76,7 +76,7 @@ func Test_InvalidEvents(t *testing.T) {
wq := wqmock.NewRateLimitingInterface(t)
wq.On("Get").Return(&ev, false)
wq.On("Done", &ev)
s, err := NewServer(context.Background(), fakeappclient.NewSimpleClientset(), "argocd", WithGeneratedTokenSigningKey())
s, err := NewServer(context.Background(), kube.NewKubernetesFakeClient(), "argocd", WithGeneratedTokenSigningKey())
require.NoError(t, err)
err = s.processRecvQueue(context.Background(), "foo", wq)
assert.ErrorContains(t, err, "failed to unmarshal")
Expand All @@ -91,7 +91,7 @@ func Test_CreateEvents(t *testing.T) {
wq := wqmock.NewRateLimitingInterface(t)
wq.On("Get").Return(&ev, false)
wq.On("Done", &ev)
s, err := NewServer(context.Background(), fakeappclient.NewSimpleClientset(), "argocd", WithGeneratedTokenSigningKey())
s, err := NewServer(context.Background(), kube.NewKubernetesFakeClient(), "argocd", WithGeneratedTokenSigningKey())
require.NoError(t, err)
err = s.processRecvQueue(context.Background(), "foo", wq)
assert.ErrorIs(t, err, event.ErrEventDiscarded)
Expand Down Expand Up @@ -119,7 +119,7 @@ func Test_CreateEvents(t *testing.T) {
Sync: v1alpha1.SyncStatus{Status: v1alpha1.SyncStatusCodeSynced},
},
}
fac := fakeappclient.NewSimpleClientset()
fac := kube.NewKubernetesFakeClient(app)
ev := cloudevents.NewEvent()
ev.SetDataSchema("application")
ev.SetType(event.Create.String())
Expand All @@ -132,7 +132,7 @@ func Test_CreateEvents(t *testing.T) {
s.setAgentMode("foo", types.AgentModeAutonomous)
err = s.processRecvQueue(context.Background(), "foo", wq)
assert.NoError(t, err)
napp, err := fac.ArgoprojV1alpha1().Applications("foo").Get(context.TODO(), "test", v1.GetOptions{})
napp, err := fac.ApplicationsClientset.ArgoprojV1alpha1().Applications("foo").Get(context.TODO(), "test", v1.GetOptions{})
assert.NoError(t, err)
require.NotNil(t, napp)
assert.Equal(t, "HEAD", napp.Spec.Source.TargetRevision)
Expand Down Expand Up @@ -163,7 +163,7 @@ func Test_CreateEvents(t *testing.T) {
}
exapp := app.DeepCopy()
exapp.Namespace = "foo"
fac := fakeappclient.NewSimpleClientset(exapp)
fac := kube.NewKubernetesFakeClient(exapp)
ev := cloudevents.NewEvent()
ev.SetDataSchema("application")
ev.SetType(event.Create.String())
Expand Down Expand Up @@ -224,7 +224,7 @@ func Test_UpdateEvents(t *testing.T) {
Sync: v1alpha1.SyncStatus{Status: v1alpha1.SyncStatusCodeSynced},
},
}
fac := fakeappclient.NewSimpleClientset(exApp)
fac := kube.NewKubernetesFakeClient(exApp)
ev := cloudevents.NewEvent()
ev.SetDataSchema("application")
ev.SetType(event.SpecUpdate.String())
Expand All @@ -237,7 +237,7 @@ func Test_UpdateEvents(t *testing.T) {
s.setAgentMode("foo", types.AgentModeAutonomous)
err = s.processRecvQueue(context.Background(), "foo", wq)
assert.NoError(t, err)
napp, err := fac.ArgoprojV1alpha1().Applications("foo").Get(context.TODO(), "test", v1.GetOptions{})
napp, err := fac.ApplicationsClientset.ArgoprojV1alpha1().Applications("foo").Get(context.TODO(), "test", v1.GetOptions{})
assert.NoError(t, err)
require.NotNil(t, napp)
assert.Equal(t, "HEAD", napp.Spec.Source.TargetRevision)
Expand Down Expand Up @@ -267,7 +267,7 @@ func Test_UpdateEvents(t *testing.T) {
Sync: v1alpha1.SyncStatus{Status: v1alpha1.SyncStatusCodeSynced},
},
}
fac := fakeappclient.NewSimpleClientset()
fac := kube.NewKubernetesFakeClient()
ev := cloudevents.NewEvent()
ev.SetDataSchema("application")
ev.SetType(event.SpecUpdate.String())
Expand Down
8 changes: 4 additions & 4 deletions principal/listen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import (
"testing"
"time"

fakeappclient "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned/fake"
"github.com/argoproj-labs/argocd-agent/internal/auth"
"github.com/argoproj-labs/argocd-agent/internal/auth/userpass"
"github.com/argoproj-labs/argocd-agent/pkg/api/grpc/authapi"
"github.com/argoproj-labs/argocd-agent/pkg/api/grpc/versionapi"
"github.com/argoproj-labs/argocd-agent/pkg/types"
"github.com/argoproj-labs/argocd-agent/test/fake/kube"
fakecerts "github.com/argoproj-labs/argocd-agent/test/fake/testcerts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -79,7 +79,7 @@ func Test_Listen(t *testing.T) {
templ := certTempl
fakecerts.WriteSelfSignedCert(t, "rsa", path.Join(tempDir, "test-cert"), templ)
t.Run("Auto-select port for listener", func(t *testing.T) {
s, err := NewServer(context.TODO(), fakeappclient.NewSimpleClientset(), testNamespace,
s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClient(), testNamespace,
WithTLSKeyPairFromPath(path.Join(tempDir, "test-cert.crt"), path.Join(tempDir, "test-cert.key")),
WithListenerPort(0),
WithGeneratedTokenSigningKey(),
Expand All @@ -94,7 +94,7 @@ func Test_Listen(t *testing.T) {
})

t.Run("Listen on privileged port", func(t *testing.T) {
s, err := NewServer(context.TODO(), fakeappclient.NewSimpleClientset(), testNamespace,
s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClient(), testNamespace,
WithTLSKeyPairFromPath(path.Join(tempDir, "test-cert.crt"), path.Join(tempDir, "test-cert.key")),
WithGeneratedTokenSigningKey(),
WithListenerPort(443),
Expand Down Expand Up @@ -145,7 +145,7 @@ func Test_Serve(t *testing.T) {
fakecerts.WriteSelfSignedCert(t, "rsa", path.Join(tempDir, "test-cert"), templ)

// We start a real (non-mocked) server
s, err := NewServer(context.TODO(), fakeappclient.NewSimpleClientset(), testNamespace,
s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClient(), testNamespace,
WithTLSKeyPairFromPath(path.Join(tempDir, "test-cert.crt"), path.Join(tempDir, "test-cert.key")),
WithGeneratedTokenSigningKey(),
WithListenerPort(0),
Expand Down
8 changes: 4 additions & 4 deletions principal/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ import (
"github.com/argoproj-labs/argocd-agent/internal/event"
appinformer "github.com/argoproj-labs/argocd-agent/internal/informer/application"
"github.com/argoproj-labs/argocd-agent/internal/issuer"
"github.com/argoproj-labs/argocd-agent/internal/kube"
"github.com/argoproj-labs/argocd-agent/internal/manager"
"github.com/argoproj-labs/argocd-agent/internal/manager/application"
"github.com/argoproj-labs/argocd-agent/internal/metrics"
"github.com/argoproj-labs/argocd-agent/internal/queue"
"github.com/argoproj-labs/argocd-agent/internal/tlsutil"
"github.com/argoproj-labs/argocd-agent/internal/version"
"github.com/argoproj-labs/argocd-agent/pkg/types"
appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -82,7 +82,7 @@ var noAuthEndpoints = map[string]bool{

const waitForSyncedDuration = 1 * time.Second

func NewServer(ctx context.Context, appClient appclientset.Interface, namespace string, opts ...ServerOption) (*Server, error) {
func NewServer(ctx context.Context, kubeClient *kube.KubernetesClient, namespace string, opts ...ServerOption) (*Server, error) {
s := &Server{
options: defaultOptions(),
queues: queue.NewSendRecvQueues(),
Expand Down Expand Up @@ -132,12 +132,12 @@ func NewServer(ctx context.Context, appClient appclientset.Interface, namespace
managerOpts = append(managerOpts, application.WithMetrics(metrics.NewApplicationClientMetrics()))
}

appInformer := appinformer.NewAppInformer(s.ctx, appClient,
appInformer := appinformer.NewAppInformer(s.ctx, kubeClient.ApplicationsClientset,
s.namespace,
informerOpts...,
)

s.appManager, err = application.NewApplicationManager(kubeapp.NewKubernetesBackend(appClient, s.namespace, appInformer, true), s.namespace,
s.appManager, err = application.NewApplicationManager(kubeapp.NewKubernetesBackend(kubeClient.ApplicationsClientset, s.namespace, appInformer, true), s.namespace,
managerOpts...,
)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions principal/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"testing"
"time"

fakeappclient "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned/fake"
"github.com/argoproj-labs/argocd-agent/test/fake/kube"
fakecerts "github.com/argoproj-labs/argocd-agent/test/fake/testcerts"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
Expand All @@ -46,7 +46,7 @@ func Test_ServerWithTLSConfig(t *testing.T) {
t.Run("Valid TLS key pair", func(t *testing.T) {
templ := certTempl
fakecerts.WriteSelfSignedCert(t, "rsa", path.Join(tempDir, "test-cert"), templ)
s, err := NewServer(context.TODO(), fakeappclient.NewSimpleClientset(), testNamespace,
s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClient(), testNamespace,
WithTLSKeyPairFromPath(path.Join(tempDir, "test-cert.crt"), path.Join(tempDir, "test-cert.key")),
WithGeneratedTokenSigningKey(),
)
Expand All @@ -56,7 +56,7 @@ func Test_ServerWithTLSConfig(t *testing.T) {
assert.NotNil(t, tlsConfig)
})
t.Run("Non-existing TLS key pair", func(t *testing.T) {
s, err := NewServer(context.TODO(), fakeappclient.NewSimpleClientset(), testNamespace,
s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClient(), testNamespace,
WithTLSKeyPairFromPath(path.Join(tempDir, "other-cert.crt"), path.Join(tempDir, "other-cert.key")),
WithGeneratedTokenSigningKey(),
)
Expand All @@ -67,7 +67,7 @@ func Test_ServerWithTLSConfig(t *testing.T) {
})

t.Run("Invalid TLS certificate", func(t *testing.T) {
s, err := NewServer(context.TODO(), fakeappclient.NewSimpleClientset(), testNamespace,
s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClient(), testNamespace,
WithTLSKeyPairFromPath("server_test.go", "server_test.go"),
WithGeneratedTokenSigningKey(),
)
Expand All @@ -81,15 +81,15 @@ func Test_ServerWithTLSConfig(t *testing.T) {

func Test_NewServer(t *testing.T) {
t.Run("Instantiate new server object with non-default options", func(t *testing.T) {
s, err := NewServer(context.TODO(), fakeappclient.NewSimpleClientset(), testNamespace, WithListenerAddress("0.0.0.0"), WithGeneratedTokenSigningKey())
s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClient(), testNamespace, WithListenerAddress("0.0.0.0"), WithGeneratedTokenSigningKey())
assert.NoError(t, err)
assert.NotNil(t, s)
assert.NotEqual(t, defaultOptions(), s.options)
assert.Equal(t, "0.0.0.0", s.options.address)
})

t.Run("Instantiate new server object with invalid option", func(t *testing.T) {
s, err := NewServer(context.TODO(), fakeappclient.NewSimpleClientset(), testNamespace, WithListenerPort(-1), WithGeneratedTokenSigningKey())
s, err := NewServer(context.TODO(), kube.NewKubernetesFakeClient(), testNamespace, WithListenerPort(-1), WithGeneratedTokenSigningKey())
assert.Error(t, err)
assert.Nil(t, s)
})
Expand Down
11 changes: 6 additions & 5 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/argoproj-labs/argocd-agent/internal/auth"
"github.com/argoproj-labs/argocd-agent/internal/auth/userpass"
"github.com/argoproj-labs/argocd-agent/internal/event"
"github.com/argoproj-labs/argocd-agent/internal/kube"
"github.com/argoproj-labs/argocd-agent/pkg/api/grpc/authapi"
"github.com/argoproj-labs/argocd-agent/pkg/api/grpc/eventstreamapi"
"github.com/argoproj-labs/argocd-agent/pkg/client"
Expand Down Expand Up @@ -61,14 +62,14 @@ var certTempl = x509.Certificate{

var testNamespace = "default"

func newConn(t *testing.T, appC *fakeappclient.Clientset) (*grpc.ClientConn, *principal.Server) {
func newConn(t *testing.T, kubeClt *kube.KubernetesClient) (*grpc.ClientConn, *principal.Server) {
t.Helper()
tempDir := t.TempDir()
templ := certTempl
fakecerts.WriteSelfSignedCert(t, "rsa", path.Join(tempDir, "test-cert"), templ)
errch := make(chan error)

s, err := principal.NewServer(context.TODO(), appC, testNamespace,
s, err := principal.NewServer(context.TODO(), kubeClt, testNamespace,
principal.WithTLSKeyPairFromPath(path.Join(tempDir, "test-cert.crt"), path.Join(tempDir, "test-cert.key")),
principal.WithListenerPort(0),
principal.WithListenerAddress("127.0.0.1"),
Expand Down Expand Up @@ -99,7 +100,7 @@ func Test_EndToEnd_Subscribe(t *testing.T) {
// require.NoError(t, err)

appC := fakeappclient.NewSimpleClientset()
conn, s := newConn(t, appC)
conn, s := newConn(t, fakekube.NewKubernetesFakeClient())
defer conn.Close()

authC := authapi.NewAuthenticationClient(conn)
Expand Down Expand Up @@ -178,7 +179,7 @@ func Test_EndToEnd_Push(t *testing.T) {
objs[i] = runtime.Object(&v1alpha1.Application{ObjectMeta: v1.ObjectMeta{Name: fmt.Sprintf("test%d", i), Namespace: "default"}})
}
appC := fakeappclient.NewSimpleClientset(objs...)
conn, s := newConn(t, appC)
conn, s := newConn(t, fakekube.NewKubernetesFakeClient())
defer conn.Close()
authC := authapi.NewAuthenticationClient(conn)
eventC := eventstreamapi.NewEventStreamClient(conn)
Expand Down Expand Up @@ -244,7 +245,7 @@ func Test_AgentServer(t *testing.T) {
err := am.RegisterMethod("userpass", up)
require.NoError(t, err)
up.UpsertUser("client", "insecure")
s, err := principal.NewServer(sctx, fakeAppcServer, "server",
s, err := principal.NewServer(sctx, fakekube.NewKubernetesFakeClient(), "server",
principal.WithGRPC(true),
principal.WithListenerPort(0),
principal.WithServerName("control-plane"),
Expand Down
9 changes: 9 additions & 0 deletions test/fake/kube/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package kube

import (
"github.com/argoproj-labs/argocd-agent/internal/kube"
fakeappclient "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned/fake"
"k8s.io/apimachinery/pkg/runtime"
kubefake "k8s.io/client-go/kubernetes/fake"
)
Expand All @@ -28,3 +30,10 @@ func NewFakeClientsetWithResources(objects ...runtime.Object) *kubefake.Clientse
clientset := kubefake.NewSimpleClientset(objects...)
return clientset
}

func NewKubernetesFakeClient(objects ...runtime.Object) *kube.KubernetesClient {
c := &kube.KubernetesClient{}
c.Clientset = NewFakeClientsetWithResources()
c.ApplicationsClientset = fakeappclient.NewSimpleClientset(objects...)
return c
}

0 comments on commit aefc4b6

Please sign in to comment.