From 492afa7db026a6fd8c3912ce3f631e76a6ccf758 Mon Sep 17 00:00:00 2001 From: Jonathan West Date: Wed, 14 Aug 2024 15:04:17 -0400 Subject: [PATCH] chore: add code/struct comments and sanity tests Signed-off-by: Jonathan West --- agent/agent.go | 69 +++++++++++++----- agent/agent_test.go | 2 +- agent/connection.go | 29 ++------ agent/inbound_test.go | 17 ++++- cmd/agent/main.go | 4 ++ internal/auth/userpass/userpass.go | 8 ++- internal/backend/interface.go | 17 ++++- .../kubernetes/application/kubernetes.go | 9 ++- internal/event/event.go | 3 + internal/informer/application/appinformer.go | 3 +- internal/informer/application/options.go | 1 + internal/informer/informer.go | 32 +++++---- internal/manager/application/application.go | 42 ++++++++--- .../manager/application/application_test.go | 66 +++++++++++------ internal/queue/queue.go | 5 +- pkg/client/remote.go | 2 +- pkg/client/remote_test.go | 6 +- principal/apis/eventstream/eventstream.go | 17 ++++- principal/listen.go | 1 + principal/options.go | 37 +++++----- principal/server.go | 71 ++++++++++++------- test/e2e/e2e_test.go | 10 +-- 22 files changed, 299 insertions(+), 152 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index dafa569..ff28ef3 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -24,6 +24,7 @@ import ( kubeapp "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/application" "github.com/argoproj-labs/argocd-agent/internal/event" appinformer "github.com/argoproj-labs/argocd-agent/internal/informer/application" + "github.com/argoproj-labs/argocd-agent/internal/manager" "github.com/argoproj-labs/argocd-agent/internal/manager/application" "github.com/argoproj-labs/argocd-agent/internal/queue" "github.com/argoproj-labs/argocd-agent/internal/version" @@ -40,30 +41,41 @@ const waitForSyncedDuration = 10 * time.Second // Agent is a controller that synchronizes Application resources type Agent struct { - context context.Context - cancelFn context.CancelFunc - client kubernetes.Interface - appclient appclientset.Interface - options AgentOptions - namespace string + context context.Context + cancelFn context.CancelFunc + client kubernetes.Interface + appclient appclientset.Interface + options AgentOptions + // namespace is the namespace to manage applications in + namespace string + // allowedNamespaces is not currently used. See also 'namespaces' field in AgentOptions allowedNamespaces []string - informer *appinformer.AppInformer - infStopCh chan struct{} - connected atomic.Bool - syncCh chan bool - remote *client.Remote - appManager *application.ApplicationManager - mode types.AgentMode - queues *queue.SendRecvQueues - emitter *event.EventSource - watchLock sync.RWMutex - version *version.Version + // informer is used to watch for change events for Argo CD Application resources on the cluster + informer *appinformer.AppInformer + // infStopCh is not currently used + infStopCh chan struct{} + connected atomic.Bool + // syncCh is not currently used + syncCh chan bool + remote *client.Remote + appManager *application.ApplicationManager + mode types.AgentMode + // queues is a queue of create/update/delete events to send to the principal + queues *queue.SendRecvQueues + emitter *event.EventSource + // At present, 'watchLock' is only acquired on calls to 'addAppUpdateToQueue'. This behaviour was added as a short-term attempt to preserve update event ordering. However, this is known to be problematic due to the potential for race conditions, both within itself, and between other event processors like deleteAppCallback. + watchLock sync.RWMutex + version *version.Version } const defaultQueueName = "default" // AgentOptions defines the options for a given Controller type AgentOptions struct { + // In the future, the 'namespaces' field may be used to support multiple Argo CD namespaces (for example, apps in any namespace) from a single agent instance, on a workload cluster. See 'filters.go' (in this package) for logic that reads from this value and avoids processing events outside of the specified namespaces. + // - However, note that the 'namespace' field of Agent is automatically included by default. + // + // However, as of this writing, this feature is not available. namespaces []string } @@ -88,6 +100,10 @@ func NewAgent(ctx context.Context, client kubernetes.Interface, appclient appcli } } + if a.remote == nil { + return nil, fmt.Errorf("remote not defined") + } + // Initial state of the agent is disconnected a.connected.Store(false) @@ -99,6 +115,15 @@ func NewAgent(ctx context.Context, client kubernetes.Interface, appclient appcli return nil, fmt.Errorf("unable to create default queue: %w", err) } + var managerMode manager.ManagerMode + if a.mode == types.AgentModeAutonomous { + managerMode = manager.ManagerModeAutonomous + } else if a.mode == types.AgentModeManaged { + managerMode = manager.ManagerModeManaged + } else { + return nil, fmt.Errorf("unexpected agent mode: %v", a.mode) + } + a.informer = appinformer.NewAppInformer(ctx, a.appclient, a.namespace, appinformer.WithListAppCallback(a.listAppCallback), appinformer.WithNewAppCallback(a.addAppCreationToQueue), @@ -112,13 +137,21 @@ func NewAgent(ctx context.Context, client kubernetes.Interface, appclient appcli allowUpsert = true } + var err error + // The agent only supports Kubernetes as application backend - a.appManager = application.NewApplicationManager( + a.appManager, err = application.NewApplicationManager( kubeapp.NewKubernetesBackend(a.appclient, a.namespace, a.informer, true), a.namespace, application.WithAllowUpsert(allowUpsert), + application.WithRole(manager.ManagerRoleAgent), + application.WithMode(managerMode), ) + if err != nil { + return nil, err + } + a.syncCh = make(chan bool, 1) return a, nil } diff --git a/agent/agent_test.go b/agent/agent_test.go index 8355a4f..2f70c64 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -40,7 +40,7 @@ func newAgent(t *testing.T) *Agent { func Test_NewAgent(t *testing.T) { fakec := fakekube.NewFakeClientsetWithResources() appc := fakeappclient.NewSimpleClientset() - agent, err := NewAgent(context.TODO(), fakec, appc, "agent") + agent, err := NewAgent(context.TODO(), fakec, appc, "agent", WithRemote(&client.Remote{})) require.NotNil(t, agent) require.NoError(t, err) } diff --git a/agent/connection.go b/agent/connection.go index d0a860b..3abf675 100644 --- a/agent/connection.go +++ b/agent/connection.go @@ -61,6 +61,7 @@ func (a *Agent) sender(stream eventstreamapi.EventStream_SubscribeClient) error "direction": "Send", "client_addr": grpcutil.AddressFromContext(stream.Context()), }) + q := a.queues.SendQ(a.remote.ClientID()) if q == nil { return fmt.Errorf("no send queue found for the remote principal") @@ -75,7 +76,7 @@ func (a *Agent) sender(stream eventstreamapi.EventStream_SubscribeClient) error } logCtx.Tracef("Grabbed an item") if item == nil { - // FIXME: Is this really the right thing to do? + // TODO: Is this really the right thing to do? return nil } @@ -98,7 +99,7 @@ func (a *Agent) sender(stream eventstreamapi.EventStream_SubscribeClient) error if grpcutil.NeedReconnectOnError(err) { return err } else { - logCtx.Infof("Error while sending: %v", err) + logCtx.Errorf("Error while sending: %v", err) return nil } } @@ -119,7 +120,7 @@ func (a *Agent) receiver(stream eventstreamapi.EventStream_SubscribeClient) erro if grpcutil.NeedReconnectOnError(err) { return err } else { - logCtx.Infof("Error while receiving: %v", err) + logCtx.Errorf("Error while receiving: %v", err) return nil } } @@ -133,26 +134,6 @@ func (a *Agent) receiver(stream eventstreamapi.EventStream_SubscribeClient) erro if err != nil { logCtx.WithError(err).Errorf("Unable to process incoming event") } - // switch ev.Type() { - // case event.Create: - // _, err := a.createApplication(incomingApp) - // if err != nil { - // logCtx.Errorf("Error creating application: %v", err) - // } - // case event.SpecUpdate: - // _, err = a.updateApplication(incomingApp) - // if err != nil { - // logCtx.Errorf("Error updating application: %v", err) - // } - // case event.Delete: - // err = a.deleteApplication(incomingApp) - // if err != nil { - // logCtx.Errorf("Error deleting application: %v", err) - // } - // default: - // logCtx.Warnf("Received an unknown event: %s. Protocol mismatch?", ev.Type()) - // } - return nil } @@ -175,6 +156,7 @@ func (a *Agent) handleStreamEvents() error { }) logCtx.Info("Starting to receive events from event stream") var err error + // Continuously retrieve events from the event stream 'inbox' and process them, while the stream is connected for a.IsConnected() && err == nil { err = a.receiver(stream) if err != nil { @@ -194,6 +176,7 @@ func (a *Agent) handleStreamEvents() error { }) logCtx.Info("Starting to send events to event stream") var err error + // Continuously read events from the 'outbox', and send them to principal, while the stream is connected for a.IsConnected() && err == nil { err = a.sender(stream) if err != nil { diff --git a/agent/inbound_test.go b/agent/inbound_test.go index a7bb4f5..0fb4f53 100644 --- a/agent/inbound_test.go +++ b/agent/inbound_test.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" backend_mocks "github.com/argoproj-labs/argocd-agent/internal/backend/mocks" + "github.com/argoproj-labs/argocd-agent/internal/manager" "github.com/argoproj-labs/argocd-agent/internal/manager/application" "github.com/argoproj-labs/argocd-agent/pkg/types" ) @@ -31,7 +32,9 @@ import ( func Test_CreateApplication(t *testing.T) { a := newAgent(t) be := backend_mocks.NewApplication(t) - a.appManager = application.NewApplicationManager(be, "argocd", application.WithAllowUpsert(true)) + var err error + a.appManager, err = application.NewApplicationManager(be, "argocd", application.WithAllowUpsert(true)) + require.NoError(t, err) require.NotNil(t, a) app := &v1alpha1.Application{ObjectMeta: v1.ObjectMeta{ Name: "test", @@ -67,7 +70,9 @@ func Test_CreateApplication(t *testing.T) { func Test_UpdateApplication(t *testing.T) { a := newAgent(t) be := backend_mocks.NewApplication(t) - a.appManager = application.NewApplicationManager(be, "argocd", application.WithAllowUpsert(true)) + var err error + a.appManager, err = application.NewApplicationManager(be, "argocd", application.WithAllowUpsert(true)) + require.NoError(t, err) require.NotNil(t, a) app := &v1alpha1.Application{ ObjectMeta: v1.ObjectMeta{ @@ -85,6 +90,8 @@ func Test_UpdateApplication(t *testing.T) { t.Run("Update application using patch in managed mode", func(t *testing.T) { a.mode = types.AgentModeManaged + a.appManager.Role = manager.ManagerRoleAgent + a.appManager.Mode = manager.ManagerModeManaged getEvent := be.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&v1alpha1.Application{}, nil) defer getEvent.Unset() supportsPatchEvent := be.On("SupportsPatch").Return(true) @@ -98,6 +105,8 @@ func Test_UpdateApplication(t *testing.T) { t.Run("Update application using update in managed mode", func(t *testing.T) { a.mode = types.AgentModeManaged + a.appManager.Role = manager.ManagerRoleAgent + a.appManager.Mode = manager.ManagerModeManaged getEvent := be.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&v1alpha1.Application{}, nil) defer getEvent.Unset() supportsPatchEvent := be.On("SupportsPatch").Return(false) @@ -111,6 +120,8 @@ func Test_UpdateApplication(t *testing.T) { t.Run("Update application using patch in autonomous mode", func(t *testing.T) { a.mode = types.AgentModeAutonomous + a.appManager.Role = manager.ManagerRoleAgent + a.appManager.Mode = manager.ManagerModeAutonomous getEvent := be.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&v1alpha1.Application{}, nil) defer getEvent.Unset() supportsPatchEvent := be.On("SupportsPatch").Return(true) @@ -124,6 +135,8 @@ func Test_UpdateApplication(t *testing.T) { t.Run("Update application using update in autonomous mode", func(t *testing.T) { a.mode = types.AgentModeAutonomous + a.appManager.Role = manager.ManagerRoleAgent + a.appManager.Mode = manager.ManagerModeAutonomous getEvent := be.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&v1alpha1.Application{}, nil) defer getEvent.Unset() supportsPatchEvent := be.On("SupportsPatch").Return(false) diff --git a/cmd/agent/main.go b/cmd/agent/main.go index d540ece..fc5ba53 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -107,6 +107,10 @@ func NewAgentRunCommand() *cobra.Command { cmd.Fatal("No remote specified") } + if namespace == "" { + cmd.Fatal("namespace value is empty and must be specified") + } + kubeConfig, err := cmd.GetKubeConfig(ctx, namespace, kubeConfig, kubeContext) if err != nil { cmd.Fatal("Could not load Kubernetes config: %v", err) diff --git a/internal/auth/userpass/userpass.go b/internal/auth/userpass/userpass.go index fc0df6b..d275dc5 100644 --- a/internal/auth/userpass/userpass.go +++ b/internal/auth/userpass/userpass.go @@ -53,11 +53,17 @@ type UserPassAuthentication struct { dbpath string lock sync.RWMutex userdb map[string]string - dummy []byte + + // dummy is not a password itself, but rather is used to make timing attacks + // a little more complex: in the case where an invalid username is sent, we + // will hash the dummy password instead (to prevent attackers from + // determining whether a username exists by timing response times) + dummy []byte } // NewUserPassAuthentication creates a new instance of UserPassAuthentication func NewUserPassAuthentication(path string) *UserPassAuthentication { + // see 'dummy' field of UserPassAuthentication for explanation of this value dummy, _ := bcrypt.GenerateFromPassword([]byte("bdf3fdc6da5b5029e83f3024858c3c1e6aa3d1e71fa09e4691212f7571b5a3e3"), bcrypt.DefaultCost) return &UserPassAuthentication{ userdb: make(map[string]string), diff --git a/internal/backend/interface.go b/internal/backend/interface.go index 5ff361a..6533a27 100644 --- a/internal/backend/interface.go +++ b/internal/backend/interface.go @@ -24,12 +24,23 @@ import ( ) type ApplicationSelector struct { - Labels map[string]string - Names []string + + // Labels is not currently implemented. + Labels map[string]string + + // Names is not currently implemented. + Names []string + + // Namespaces is used by the 'List' Application interface function to restrict the list of Applications returned to a specific set of Namespaces. Namespaces []string - Projects []string + + // Projects is not currently implemented. + Projects []string } +// Application defines a generic interface to store/track Argo CD Application state, via ApplicationManager. +// +// As of this writing (August 2024), the only implementation is a Kubernetes-based backend (KubernetesBackend in 'internal/backend/kubernetes/application') but other backends (e.g. RDBMS-backed) could be implemented in the future. type Application interface { List(ctx context.Context, selector ApplicationSelector) ([]v1alpha1.Application, error) Create(ctx context.Context, app *v1alpha1.Application) (*v1alpha1.Application, error) diff --git a/internal/backend/kubernetes/application/kubernetes.go b/internal/backend/kubernetes/application/kubernetes.go index 49257b2..6355a41 100644 --- a/internal/backend/kubernetes/application/kubernetes.go +++ b/internal/backend/kubernetes/application/kubernetes.go @@ -32,9 +32,16 @@ import ( var _ backend.Application = &KubernetesBackend{} +// KubernetesBackend is an implementation of the backend.Application interface, which is used by ApplicationManager to track/update the state of Argo CD Applications. +// KubernetesBackend stores/retrieves all data from Argo CD Application CRs on the cluster that is local to the agent/principal. +// +// KubernetesBackend is used by both the principal and agent components. type KubernetesBackend struct { + // appClient is used to interfact with Argo CD Application resources on the cluster on which agent/principal is installed. appClient appclientset.Interface - informer *appinformer.AppInformer + // informer is used to watch for change events for Argo CD Application resources on the cluster + informer *appinformer.AppInformer + // namespace is not currently read, is not guaranteed to be non-empty, and is not guaranteed to contain the source of Argo CD Application CRs in all cases namespace string usePatch bool } diff --git a/internal/event/event.go b/internal/event/event.go index c78e937..b15f61c 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -33,6 +33,7 @@ type EventTarget string const TypePrefix = "io.argoproj.argocd-agent.event" +// Supported EventTypes that are sent agent <-> principal const ( Ping EventType = TypePrefix + ".ping" Pong EventType = TypePrefix + ".pong" @@ -63,10 +64,12 @@ func (t EventTarget) String() string { return string(t) } +// EventSource is a utility to construct new 'cloudevents.Event' events for a given 'source' type EventSource struct { source string } +// Event is the 'on the wire' representation of an event, and is parsed by from protobuf via FromWire type Event struct { event *cloudevents.Event target EventTarget diff --git a/internal/informer/application/appinformer.go b/internal/informer/application/appinformer.go index a4ffb0c..c948b0c 100644 --- a/internal/informer/application/appinformer.go +++ b/internal/informer/application/appinformer.go @@ -40,7 +40,7 @@ const defaultResyncPeriod = 1 * time.Minute // AppInformer is a filtering and customizable SharedIndexInformer for Argo CD // Application resources in a cluster. It works across a configurable set of // namespaces, lets you define a list of filters to indicate interest in the -// resource of a particular even and allows you to set up callbacks to handle +// resource of a particular event and allows you to set up callbacks to handle // the events. type AppInformer struct { appClient appclientset.Interface @@ -49,6 +49,7 @@ type AppInformer struct { AppInformer cache.SharedIndexInformer AppLister applisters.ApplicationLister + // lock should be acquired when reading/writing from callbacks defined in 'options' field lock sync.RWMutex // synced indicates whether the informer is synced and the watch is set up diff --git a/internal/informer/application/options.go b/internal/informer/application/options.go index 9c307fc..4984c01 100644 --- a/internal/informer/application/options.go +++ b/internal/informer/application/options.go @@ -26,6 +26,7 @@ import ( // Options should not be modified concurrently, they are not implemented in a // thread-safe way. type AppInformerOptions struct { + // namespace will be set to "", if len(namespaces) > 0 namespace string namespaces []string appMetrics *metrics.ApplicationWatcherMetrics diff --git a/internal/informer/informer.go b/internal/informer/informer.go index 9357727..a29516b 100644 --- a/internal/informer/informer.go +++ b/internal/informer/informer.go @@ -30,22 +30,26 @@ import ( ) type GenericInformer struct { - listFunc ListFunc - watchFunc WatchFunc - addFunc AddFunc - updateFunc UpdateFunc - deleteFunc DeleteFunc - filterFunc FilterFunc - synced atomic.Bool - running atomic.Bool - resyncPeriod time.Duration - informer cache.SharedIndexInformer - runch chan struct{} - logger *logrus.Entry - mutex sync.RWMutex + listFunc ListFunc + watchFunc WatchFunc + addFunc AddFunc + updateFunc UpdateFunc + deleteFunc DeleteFunc + filterFunc FilterFunc + synced atomic.Bool + running atomic.Bool + resyncPeriod time.Duration + informer cache.SharedIndexInformer + runch chan struct{} + logger *logrus.Entry + // mutex prevents unsynchronized access to namespaces, and ensures that Start/Stop logic is only called once. + mutex sync.RWMutex + // labelSelector is not currently implemented labelSelector string + // fieldSelector is not currently implemented fieldSelector string - namespaces map[string]interface{} + // mutex should be owned before accessing namespaces + namespaces map[string]interface{} } type InformerOption func(i *GenericInformer) error diff --git a/internal/manager/application/application.go b/internal/manager/application/application.go index 5f9776e..75729a3 100644 --- a/internal/manager/application/application.go +++ b/internal/manager/application/application.go @@ -49,11 +49,18 @@ type ApplicationManager struct { Application backend.Application Metrics *metrics.ApplicationClientMetrics Role manager.ManagerRole - Mode manager.ManagerMode - Namespace string - managedApps map[string]bool // Managed apps is a list of apps we manage + // Mode is only set when Role is ManagerRoleAgent + Mode manager.ManagerMode + // Namespace is not guaranteed to have a value in all cases. For instance, this value is empty for principal when the principal is running on cluster. + Namespace string + // managedApps is a list of apps we manage, key is qualified name in form '(namespace of Application CR)/(name of Application CR)', value is not used. + // - acquire 'lock' before accessing + managedApps map[string]bool + // observedApp, key is qualified name of the application, value is the Application's .metadata.resourceValue field + // - acquire 'lock' before accessing observedApp map[string]string - lock sync.RWMutex + // lock should be acquired before accessing managedApps/observedApps + lock sync.RWMutex } // ApplicationManagerOption is a callback function to set an option to the Application @@ -90,7 +97,7 @@ func WithMode(mode manager.ManagerMode) ApplicationManagerOption { // NewApplicationManager initializes and returns a new Manager with the given backend and // options. -func NewApplicationManager(be backend.Application, namespace string, opts ...ApplicationManagerOption) *ApplicationManager { +func NewApplicationManager(be backend.Application, namespace string, opts ...ApplicationManagerOption) (*ApplicationManager, error) { m := &ApplicationManager{} for _, o := range opts { o(m) @@ -99,7 +106,12 @@ func NewApplicationManager(be backend.Application, namespace string, opts ...App m.observedApp = make(map[string]string) m.managedApps = make(map[string]bool) m.Namespace = namespace - return m + + if m.Role == manager.ManagerRolePrincipal && m.Mode != manager.ManagerModeUnset { + return nil, fmt.Errorf("mode should be unset when role is principal") + } + + return m, nil } // stampLastUpdated "stamps" an application with the last updated label @@ -161,9 +173,11 @@ func (m *ApplicationManager) UpdateManagedApp(ctx context.Context, incoming *v1a var err error incoming.SetNamespace(m.Namespace) - if m.Role == manager.ManagerRolePrincipal { - stampLastUpdated(incoming) + + if !(m.Role == manager.ManagerRoleAgent && m.Mode == manager.ManagerModeManaged) { + return nil, fmt.Errorf("updatedManagedApp should be called on a managed agent, only") } + updated, err = m.update(ctx, m.AllowUpsert, incoming, func(existing, incoming *v1alpha1.Application) { existing.ObjectMeta.Annotations = incoming.ObjectMeta.Annotations existing.ObjectMeta.Labels = incoming.ObjectMeta.Labels @@ -243,7 +257,10 @@ func (m *ApplicationManager) UpdateAutonomousApp(ctx context.Context, namespace incoming.SetNamespace(namespace) if m.Role == manager.ManagerRolePrincipal { stampLastUpdated(incoming) + } else { + return nil, fmt.Errorf("UpdateAutonomousApp should only be called from principal") } + updated, err = m.update(ctx, true, incoming, func(existing, incoming *v1alpha1.Application) { existing.ObjectMeta.Annotations = incoming.ObjectMeta.Annotations existing.ObjectMeta.Labels = incoming.ObjectMeta.Labels @@ -323,7 +340,10 @@ func (m *ApplicationManager) UpdateStatus(ctx context.Context, namespace string, incoming.SetNamespace(namespace) if m.Role == manager.ManagerRolePrincipal { stampLastUpdated(incoming) + } else { + return nil, fmt.Errorf("UpdateStatus should only be called on principal") } + updated, err = m.update(ctx, false, incoming, func(existing, incoming *v1alpha1.Application) { existing.ObjectMeta.Annotations = incoming.ObjectMeta.Annotations existing.ObjectMeta.Labels = incoming.ObjectMeta.Labels @@ -399,9 +419,11 @@ func (m *ApplicationManager) UpdateOperation(ctx context.Context, incoming *v1al var updated *v1alpha1.Application var err error - if m.Role.IsPrincipal() { - stampLastUpdated(incoming) + + if !(m.Role.IsAgent() && m.Mode.IsAutonomous()) { + return nil, fmt.Errorf("UpdateOperation should only be called by an agent in autonomous mode: %v %v", m.Role, m.Mode) } + updated, err = m.update(ctx, false, incoming, func(existing, incoming *v1alpha1.Application) { existing.ObjectMeta.Annotations = incoming.ObjectMeta.Annotations existing.ObjectMeta.Labels = incoming.ObjectMeta.Labels diff --git a/internal/manager/application/application_test.go b/internal/manager/application/application_test.go index 4327214..49a7541 100644 --- a/internal/manager/application/application_test.go +++ b/internal/manager/application/application_test.go @@ -23,6 +23,7 @@ import ( "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/application" appmock "github.com/argoproj-labs/argocd-agent/internal/backend/mocks" appinformer "github.com/argoproj-labs/argocd-agent/internal/informer/application" + "github.com/argoproj-labs/argocd-agent/internal/manager" "github.com/argoproj-labs/argocd-agent/internal/metrics" "github.com/sirupsen/logrus" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -41,27 +42,34 @@ import ( var appExistsError = errors.NewAlreadyExists(schema.GroupResource{Group: "argoproj.io", Resource: "application"}, "existing") var appNotFoundError = errors.NewNotFound(schema.GroupResource{Group: "argoproj.io", Resource: "application"}, "existing") -func fakeAppManager(objects ...runtime.Object) (*fakeappclient.Clientset, *ApplicationManager) { +func fakeAppManager(t *testing.T, objects ...runtime.Object) (*fakeappclient.Clientset, *ApplicationManager) { appC := fakeappclient.NewSimpleClientset(objects...) informer := appinformer.NewAppInformer(context.Background(), appC, "argocd") be := application.NewKubernetesBackend(appC, "", informer, true) - return appC, NewApplicationManager(be, "argocd") + + am, err := NewApplicationManager(be, "argocd") + assert.NoError(t, err) + + return appC, am } func Test_ManagerOptions(t *testing.T) { t.Run("NewManager with default options", func(t *testing.T) { - m := NewApplicationManager(nil, "") + m, err := NewApplicationManager(nil, "") + require.NoError(t, err) assert.Equal(t, false, m.AllowUpsert) assert.Nil(t, m.Metrics) }) t.Run("NewManager with metrics", func(t *testing.T) { - m := NewApplicationManager(nil, "", WithMetrics(metrics.NewApplicationClientMetrics())) + m, err := NewApplicationManager(nil, "", WithMetrics(metrics.NewApplicationClientMetrics())) + require.NoError(t, err) assert.NotNil(t, m.Metrics) }) t.Run("NewManager with upsert enabled", func(t *testing.T) { - m := NewApplicationManager(nil, "", WithAllowUpsert(true)) + m, err := NewApplicationManager(nil, "", WithAllowUpsert(true)) + require.NoError(t, err) assert.True(t, m.AllowUpsert) }) } @@ -77,8 +85,9 @@ func Test_ManagerCreate(t *testing.T) { return nil, nil } }) - m := NewApplicationManager(mockedBackend, "") - _, err := m.Create(context.TODO(), &v1alpha1.Application{ObjectMeta: v1.ObjectMeta{Name: "existing", Namespace: "default"}}) + m, err := NewApplicationManager(mockedBackend, "") + require.NoError(t, err) + _, err = m.Create(context.TODO(), &v1alpha1.Application{ObjectMeta: v1.ObjectMeta{Name: "existing", Namespace: "default"}}) assert.ErrorIs(t, err, appExistsError) }) @@ -90,7 +99,8 @@ func Test_ManagerCreate(t *testing.T) { }, } mockedBackend := appmock.NewApplication(t) - m := NewApplicationManager(mockedBackend, "") + m, err := NewApplicationManager(mockedBackend, "") + require.NoError(t, err) mockedBackend.On("Create", mock.Anything, mock.Anything).Return(app, nil) rapp, err := m.Create(context.TODO(), app) assert.NoError(t, err) @@ -177,7 +187,8 @@ func Test_ManagerUpdateManaged(t *testing.T) { appC := fakeappclient.NewSimpleClientset(existing) informer := appinformer.NewAppInformer(context.Background(), appC, "argocd") be := application.NewKubernetesBackend(appC, "", informer, true) - mgr := NewApplicationManager(be, "argocd") + mgr, err := NewApplicationManager(be, "argocd", WithMode(manager.ManagerModeManaged), WithRole(manager.ManagerRoleAgent)) + require.NoError(t, err) updated, err := mgr.UpdateManagedApp(context.Background(), incoming) require.NoError(t, err) @@ -263,7 +274,10 @@ func Test_ManagerUpdateStatus(t *testing.T) { appC := fakeappclient.NewSimpleClientset(existing) informer := appinformer.NewAppInformer(context.Background(), appC, "argocd") be := application.NewKubernetesBackend(appC, "", informer, true) - mgr := NewApplicationManager(be, "argocd") + mgr, err := NewApplicationManager(be, "argocd") + require.NoError(t, err) + mgr.Mode = manager.ManagerModeManaged + mgr.Role = manager.ManagerRolePrincipal updated, err := mgr.UpdateStatus(context.Background(), "cluster-1", incoming) require.NoError(t, err) b, err := json.MarshalIndent(updated, "", " ") @@ -336,7 +350,9 @@ func Test_ManagerUpdateAutonomous(t *testing.T) { appC := fakeappclient.NewSimpleClientset(existing) informer := appinformer.NewAppInformer(context.Background(), appC, "argocd") be := application.NewKubernetesBackend(appC, "", informer, true) - mgr := NewApplicationManager(be, "argocd") + mgr, err := NewApplicationManager(be, "argocd") + require.NoError(t, err) + mgr.Role = manager.ManagerRolePrincipal updated, err := mgr.UpdateAutonomousApp(context.TODO(), "cluster-1", incoming) require.NoError(t, err) require.NotNil(t, updated) @@ -403,7 +419,9 @@ func Test_ManagerUpdateOperation(t *testing.T) { // informer := appinformer.NewAppInformer(context.Background(), appC, "argocd") // be := kubernetes.NewKubernetesBackend(appC, "", informer, true) // mgr := NewApplicationManager(be, "argocd") - _, mgr := fakeAppManager(existing) + _, mgr := fakeAppManager(t, existing) + mgr.Mode = manager.ManagerModeAutonomous + mgr.Role = manager.ManagerRoleAgent updated, err := mgr.UpdateOperation(context.TODO(), incoming) require.NoError(t, err) require.NotNil(t, updated) @@ -437,7 +455,7 @@ func Test_DeleteApp(t *testing.T) { InitiatedBy: v1alpha1.OperationInitiator{Username: "foobar"}, }, } - appC, mgr := fakeAppManager(existing) + appC, mgr := fakeAppManager(t, existing) app, err := appC.ArgoprojV1alpha1().Applications("argocd").Get(context.TODO(), "foobar", v1.GetOptions{}) assert.NoError(t, err) assert.NotNil(t, app) @@ -473,7 +491,7 @@ func Test_DeleteApp(t *testing.T) { InitiatedBy: v1alpha1.OperationInitiator{Username: "foobar"}, }, } - appC, mgr := fakeAppManager(existing) + appC, mgr := fakeAppManager(t, existing) app, err := appC.ArgoprojV1alpha1().Applications("argocd").Get(context.TODO(), "foobar", v1.GetOptions{}) assert.NoError(t, err) assert.NotNil(t, app) @@ -485,9 +503,10 @@ func Test_DeleteApp(t *testing.T) { func Test_ManageApp(t *testing.T) { t.Run("Mark app as managed", func(t *testing.T) { - appm := NewApplicationManager(nil, "") + appm, err := NewApplicationManager(nil, "") + require.NoError(t, err) assert.False(t, appm.IsManaged("foo")) - err := appm.Manage("foo") + err = appm.Manage("foo") assert.NoError(t, err) assert.True(t, appm.IsManaged("foo")) err = appm.Manage("foo") @@ -499,8 +518,9 @@ func Test_ManageApp(t *testing.T) { }) t.Run("Mark app as unmanaged", func(t *testing.T) { - appm := NewApplicationManager(nil, "") - err := appm.Manage("foo") + appm, err := NewApplicationManager(nil, "") + require.NoError(t, err) + err = appm.Manage("foo") assert.True(t, appm.IsManaged("foo")) assert.NoError(t, err) err = appm.Unmanage("foo") @@ -514,9 +534,10 @@ func Test_ManageApp(t *testing.T) { func Test_IgnoreChange(t *testing.T) { t.Run("Ignore a change", func(t *testing.T) { - appm := NewApplicationManager(nil, "") + appm, err := NewApplicationManager(nil, "") + require.NoError(t, err) assert.False(t, appm.IsChangeIgnored("foo", "1")) - err := appm.IgnoreChange("foo", "1") + err = appm.IgnoreChange("foo", "1") assert.NoError(t, err) assert.True(t, appm.IsChangeIgnored("foo", "1")) err = appm.IgnoreChange("foo", "1") @@ -528,8 +549,9 @@ func Test_IgnoreChange(t *testing.T) { }) t.Run("Unignore a change", func(t *testing.T) { - appm := NewApplicationManager(nil, "") - err := appm.UnignoreChange("foo") + appm, err := NewApplicationManager(nil, "") + require.NoError(t, err) + err = appm.UnignoreChange("foo") assert.Error(t, err) assert.False(t, appm.IsChangeIgnored("foo", "1")) err = appm.IgnoreChange("foo", "1") diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 8136a47..13a409a 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -23,6 +23,7 @@ import ( var _ QueuePair = &SendRecvQueues{} +// QueuePair maintains a map (indexed by name) of send/receive queue pairs type QueuePair interface { Names() []string HasQueuePair(name string) bool @@ -63,7 +64,7 @@ func (q *SendRecvQueues) Names() []string { return names } -// HasQueuePair retruns true if a queue pair with name currently exists +// HasQueuePair returns true if a queue pair with name currently exists func (q *SendRecvQueues) HasQueuePair(name string) bool { q.queuelock.RLock() defer q.queuelock.RUnlock() @@ -78,7 +79,7 @@ func (q *SendRecvQueues) Len() int { return len(q.queues) } -// RecvQ will return the send queue from the queue pair named name. If no such +// SendQ will return the send queue from the queue pair named name. If no such // queue pair exists, returns nil func (q *SendRecvQueues) SendQ(name string) workqueue.RateLimitingInterface { q.queuelock.RLock() diff --git a/pkg/client/remote.go b/pkg/client/remote.go index dad4c2f..ddc487a 100644 --- a/pkg/client/remote.go +++ b/pkg/client/remote.go @@ -62,7 +62,7 @@ func NewToken(tok string) (*token, error) { return r, nil } -// Remote represents a remote argocd-agent server component +// Remote represents a remote argocd-agent server component. Remote is used only by the agent component, and not by principal. type Remote struct { hostname string port int diff --git a/pkg/client/remote_test.go b/pkg/client/remote_test.go index 69f9612..56a94b6 100644 --- a/pkg/client/remote_test.go +++ b/pkg/client/remote_test.go @@ -46,7 +46,7 @@ func Test_Connect(t *testing.T) { am := userpass.NewUserPassAuthentication("") am.UpsertUser("default", "password") - s.AuthMethods().RegisterMethod("userpass", am) + s.AuthMethodsForE2EOnly().RegisterMethod("userpass", am) require.NoError(t, err) errch := make(chan error) @@ -54,7 +54,7 @@ func Test_Connect(t *testing.T) { require.NoError(t, err) t.Run("Connect to a server", func(t *testing.T) { - r, err := NewRemote("127.0.0.1", s.Listener().Port(), + r, err := NewRemote("127.0.0.1", s.ListenerForE2EOnly().Port(), WithInsecureSkipTLSVerify(), WithAuth("userpass", auth.Credentials{userpass.ClientIDField: "default", userpass.ClientSecretField: "password"}), WithClientMode(types.AgentModeManaged), @@ -76,7 +76,7 @@ func Test_Connect(t *testing.T) { }) t.Run("Invalid auth and context deadline reached", func(t *testing.T) { - r, err := NewRemote("127.0.0.1", s.Listener().Port(), + r, err := NewRemote("127.0.0.1", s.ListenerForE2EOnly().Port(), WithInsecureSkipTLSVerify(), WithAuth("userpass", auth.Credentials{userpass.ClientIDField: "default", userpass.ClientSecretField: "passwor"}), ) diff --git a/principal/apis/eventstream/eventstream.go b/principal/apis/eventstream/eventstream.go index ae6ce00..aab4d42 100644 --- a/principal/apis/eventstream/eventstream.go +++ b/principal/apis/eventstream/eventstream.go @@ -33,6 +33,15 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" ) +var _ eventstreamapi.EventStreamServer = &Server{} + +// Server: +// - Reads Application CR events from GRPC stream and writes them to the relevant agent receive queue (the 'inbox') in 'queues' for processing (see recvFunc) +// - Reads Application CR events from agent send queue in 'queues' (the 'outbox'), and writes to GRPC stream (see sendFunc) +// +// Server implements the pkg/api/grpc/evenstreamapi/EventStreamServer interface +// +// Server is only used by principal. type Server struct { eventstreamapi.UnimplementedEventStreamServer @@ -53,8 +62,9 @@ type client struct { agentName string wg *sync.WaitGroup start time.Time - end time.Time - lock sync.RWMutex + // lock must be owned before read/writing to 'end' var + end time.Time + lock sync.RWMutex } func WithMaxStreamDuration(d time.Duration) ServerOption { @@ -243,6 +253,8 @@ func (s *Server) sendFunc(c *client, subs eventstreamapi.EventStream_SubscribeSe // // The connection is kept open until the agent closes it, and the stream tries // to send updates to the agent as long as possible. +// +// Subscribe is called by GRPC machinery. func (s *Server) Subscribe(subs eventstreamapi.EventStream_SubscribeServer) error { c, err := newClientConnection(subs.Context(), s.options.MaxStreamDuration) if err != nil { @@ -297,6 +309,7 @@ func (s *Server) Subscribe(subs eventstreamapi.EventStream_SubscribeServer) erro // Push implements a client-side stream to receive updates for the client's // Application resources. +// Push is called by GRPC machinery. func (s *Server) Push(pushs eventstreamapi.EventStream_PushServer) error { logCtx := log().WithField("method", "Push") diff --git a/principal/listen.go b/principal/listen.go index 3a88df1..6e24f58 100644 --- a/principal/listen.go +++ b/principal/listen.go @@ -43,6 +43,7 @@ var listenerBackoff = wait.Backoff{ Jitter: 0.1, } +// Listener is a utility wrapper around net.Listener and associated data type Listener struct { host string port int diff --git a/principal/options.go b/principal/options.go index c436111..65cf334 100644 --- a/principal/options.go +++ b/principal/options.go @@ -40,22 +40,27 @@ var supportedTLSVersion map[string]int = map[string]int{ } type ServerOptions struct { - serverName string - port int - address string - tlsCertPath string - tlsKeyPath string - tlsCert *x509.Certificate - tlsKey crypto.PrivateKey - tlsCiphers *tls.CipherSuite - tlsMinVersion int - gracePeriod time.Duration - namespaces []string - signingKey crypto.PrivateKey - unauthMethods map[string]bool - serveGRPC bool - serveREST bool - eventProcessors int64 + serverName string + port int + address string + tlsCertPath string + tlsKeyPath string + tlsCert *x509.Certificate + tlsKey crypto.PrivateKey + // tlsCiphers is not currently read + tlsCiphers *tls.CipherSuite + // tlsMinVersion is not currently read + tlsMinVersion int + gracePeriod time.Duration + namespaces []string + signingKey crypto.PrivateKey + // unauthMethods is not currently implemented + unauthMethods map[string]bool + serveGRPC bool + // serveREST is not currently implemented + serveREST bool + eventProcessors int64 + // metricsEnabled is not currently read metricsEnabled bool metricsPort int requireClientCerts bool diff --git a/principal/server.go b/principal/server.go index 00bcf2d..4c75c06 100644 --- a/principal/server.go +++ b/principal/server.go @@ -40,26 +40,39 @@ import ( ) type Server struct { - options *ServerOptions - tlsConfig *tls.Config - listener *Listener - server *http.Server - grpcServer *grpc.Server - authMethods *auth.Methods - queues *queue.SendRecvQueues - namespace string - issuer issuer.Issuer - noauth map[string]bool // noauth contains endpoints accessible without authentication - ctx context.Context - ctxCancel context.CancelFunc - appManager *application.ApplicationManager - appInformer *appinformer.AppInformer - watchLock sync.RWMutex - clientMap map[string]string + options *ServerOptions + tlsConfig *tls.Config + // listener contains GRPC server listener + listener *Listener + // server is not currently used + server *http.Server + grpcServer *grpc.Server + authMethods *auth.Methods + // queues contains events that are EITHER queued to be sent to the agent ('outbox'), OR that have been received by the agent and are waiting to be processed ('inbox'). + // Server uses clientID/namespace as a key, to refer to each specific agent's queue + queues *queue.SendRecvQueues + // namespace is the namespace the server will use for configuration. Set only when running out of cluster. + namespace string + issuer issuer.Issuer + noauth map[string]bool // noauth contains endpoints accessible without authentication + ctx context.Context + ctxCancel context.CancelFunc + appManager *application.ApplicationManager + // appInformer is used to watch for change events for Argo CD Application resources on the cluster + appInformer *appinformer.AppInformer + // At present, 'watchLock' is only acquired on calls to 'updateAppCallback'. This behaviour was added as a short-term attempt to preserve update event ordering. However, this is known to be problematic due to the potential for race conditions, both within itself, and between other event processors like deleteAppCallback. + watchLock sync.RWMutex + // clientMap is not currently used + clientMap map[string]string + // namespaceMap keeps track of which local namespaces are managed by agents using which mode + // The key of namespaceMap is the client id which the agent used to authenticate with principal, via AuthSubject.ClientID (which, it is also assumed here, corresponds to a control plane namespace of the same name) + // NOTE: clientLock should be owned before accessing namespaceMap namespaceMap map[string]types.AgentMode - clientLock sync.RWMutex - events *event.EventSource - version *version.Version + // clientLock should be owned before accessing namespaceMap + clientLock sync.RWMutex + // events is used to construct events to pass on the wire to connected agents. + events *event.EventSource + version *version.Version } // noAuthEndpoints is a list of endpoints that are available without the need @@ -126,9 +139,12 @@ func NewServer(ctx context.Context, appClient appclientset.Interface, namespace informerOpts..., ) - s.appManager = application.NewApplicationManager(kubeapp.NewKubernetesBackend(appClient, s.namespace, s.appInformer, true), s.namespace, + s.appManager, err = application.NewApplicationManager(kubeapp.NewKubernetesBackend(appClient, s.namespace, s.appInformer, true), s.namespace, managerOpts..., ) + if err != nil { + return nil, err + } s.clientMap = map[string]string{ `{"clientID":"argocd","mode":"autonomous"}`: "argocd", } @@ -238,13 +254,13 @@ func (s *Server) loadTLSConfig() (*tls.Config, error) { return tlsConfig, nil } -// Listener returns the listener of Server s -func (s *Server) Listener() *Listener { +// ListenerForE2EOnly returns the listener of Server s +func (s *Server) ListenerForE2EOnly() *Listener { return s.listener } -// TokenIssuer returns the token issuer of Server s -func (s *Server) TokenIssuer() issuer.Issuer { +// TokenIssuerForE2EOnly returns the token issuer of Server s +func (s *Server) TokenIssuerForE2EOnly() issuer.Issuer { return s.issuer } @@ -252,15 +268,16 @@ func log() *logrus.Entry { return logrus.WithField("module", "server") } -func (s *Server) AuthMethods() *auth.Methods { +func (s *Server) AuthMethodsForE2EOnly() *auth.Methods { return s.authMethods } -func (s *Server) Queues() *queue.SendRecvQueues { +func (s *Server) QueuesForE2EOnly() *queue.SendRecvQueues { return s.queues } -func (s *Server) AppManager() *application.ApplicationManager { +// AppManagerForE2EOnly should only be used for E2E tests +func (s *Server) AppManagerForE2EOnly() *application.ApplicationManager { return s.appManager } diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index b1c6286..ba25205 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -84,12 +84,12 @@ func newConn(t *testing.T, appC *fakeappclient.Clientset) (*grpc.ClientConn, *pr am := userpass.NewUserPassAuthentication("") am.UpsertUser("default", "password") - err = s.AuthMethods().RegisterMethod("userpass", am) + err = s.AuthMethodsForE2EOnly().RegisterMethod("userpass", am) require.NoError(t, err) tlsC := &tls.Config{InsecureSkipVerify: true} creds := credentials.NewTLS(tlsC) - conn, err := grpc.Dial(s.Listener().Address(), + conn, err := grpc.Dial(s.ListenerForE2EOnly().Address(), grpc.WithTransportCredentials(creds)) require.NoError(t, err) return conn, s @@ -221,11 +221,11 @@ func Test_EndToEnd_Push(t *testing.T) { _ = s.Shutdown() // Should have been grabbed by queue processor - q := s.Queues() + q := s.QueuesForE2EOnly() assert.Equal(t, 0, q.RecvQ("default").Len()) // All applications should have been created by now on the server - apps, err := s.AppManager().Application.List(context.Background(), backend.ApplicationSelector{}) + apps, err := s.AppManagerForE2EOnly().Application.List(context.Background(), backend.ApplicationSelector{}) assert.NoError(t, err) assert.Len(t, apps, 10) } @@ -261,7 +261,7 @@ func Test_AgentServer(t *testing.T) { defer scancel() defer acancel() - remote, err := client.NewRemote(s.Listener().Host(), s.Listener().Port(), + remote, err := client.NewRemote(s.ListenerForE2EOnly().Host(), s.ListenerForE2EOnly().Port(), client.WithInsecureSkipTLSVerify(), client.WithAuth("userpass", auth.Credentials{userpass.ClientIDField: "client", userpass.ClientSecretField: "insecure"}), )