Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: AppInformer now uses GenericInformer as foundation #221

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,22 @@ func NewAgent(ctx context.Context, appclient appclientset.Interface, namespace s
return nil, fmt.Errorf("unexpected agent mode: %v", a.mode)
}

appInformer := appinformer.NewAppInformer(ctx, appclient, a.namespace,
appinformer.WithListAppCallback(a.listAppCallback),
appInformer, err := appinformer.NewAppInformer(ctx, appclient, a.namespace,
// appinformer.WithListAppCallback(a.listAppCallback),
appinformer.WithNewAppCallback(a.addAppCreationToQueue),
appinformer.WithUpdateAppCallback(a.addAppUpdateToQueue),
appinformer.WithDeleteAppCallback(a.addAppDeletionToQueue),
appinformer.WithFilterChain(a.DefaultFilterChain()),
)
if err != nil {
return nil, err
}

allowUpsert := false
if a.mode == types.AgentModeManaged {
allowUpsert = true
}

var err error

appProjectManagerOption := []appproject.AppProjectManagerOption{
appproject.WithAllowUpsert(true),
appproject.WithRole(manager.ManagerRoleAgent),
Expand Down
2 changes: 1 addition & 1 deletion internal/backend/kubernetes/application/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (be *KubernetesBackend) SupportsPatch() bool {
}

func (be *KubernetesBackend) StartInformer(ctx context.Context) {
be.appInformer.Start(ctx.Done())
be.appInformer.Start(ctx)
}

func (be *KubernetesBackend) EnsureSynced(duration time.Duration) error {
Expand Down
287 changes: 127 additions & 160 deletions internal/informer/application/appinformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@ package application
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/argoproj-labs/argocd-agent/internal/filter"
"github.com/argoproj-labs/argocd-agent/internal/informer"
"github.com/argoproj-labs/argocd-agent/internal/metrics"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"
applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/util/glob"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"

"github.com/sirupsen/logrus"
)
Expand All @@ -44,214 +44,181 @@ const defaultResyncPeriod = 1 * time.Minute
// the events.
type AppInformer struct {
appClient appclientset.Interface
options *AppInformerOptions
// options *AppInformerOptions

appInformer cache.SharedIndexInformer
appInformer *informer.GenericInformer
appLister applisters.ApplicationLister

// The logger for this informer
logger *logrus.Entry

// namespaces is a list of namespaces this informer is allowed to process
// resources in.
namespaces []string

metrics *metrics.ApplicationWatcherMetrics

// Callback functions for the events of the informer
addFunc func(proj *v1alpha1.Application)
updateFunc func(oldProj *v1alpha1.Application, newProj *v1alpha1.Application)
deleteFunc func(proj *v1alpha1.Application)

filterFunc func(proj *v1alpha1.Application) bool

filters *filter.Chain

// lock should be acquired when reading/writing from callbacks defined in 'options' field
lock sync.RWMutex

// synced indicates whether the informer is synced and the watch is set up
synced atomic.Bool

// clusterScope indicates the scope of the AppInformer
clusterScope bool
}

// NewAppInformer returns a new application informer for a given namespace
func NewAppInformer(ctx context.Context, client appclientset.Interface, namespace string, opts ...AppInformerOption) *AppInformer {
o := &AppInformerOptions{
resync: defaultResyncPeriod,
func NewAppInformer(ctx context.Context, client appclientset.Interface, namespace string, opts ...AppInformerOption) (*AppInformer, error) {
ai := &AppInformer{
clusterScope: false,
filters: filter.NewFilterChain(),
}
o.filters = filter.NewFilterChain()

for _, opt := range opts {
opt(o)
opt(ai)
}

if len(o.namespaces) > 0 {
o.namespaces = append(o.namespaces, namespace)
o.namespace = ""
} else {
o.namespace = namespace
if ai.logger == nil {
ai.logger = logrus.WithField("module", "ApplicationInformer")
}

i := &AppInformer{options: o, appClient: client}

i.appInformer = cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
logCtx := log().WithField("component", "ListWatch")
logCtx.Debugf("Listing apps into cache")
appList, err := i.appClient.ArgoprojV1alpha1().Applications(o.namespace).List(ctx, options)
if err != nil {
logCtx.Warnf("Error listing apps: %v", err)
return nil, err
}
logCtx.Info("Listing apps into cache successful")
iopts := []informer.InformerOption{}
if ai.metrics != nil {
iopts = append(iopts, informer.WithMetrics(ai.metrics.AppsAdded, ai.metrics.AppsUpdated, ai.metrics.AppsRemoved, ai.metrics.AppsWatched))
}

// The result of the list call will get pre-filtered to only
// contain apps that a) are in a namespace we are allowed to
// process and b) pass admission through the informer's chain
// of configured filters.
preFilteredItems := make([]v1alpha1.Application, 0)
for _, app := range appList.Items {
if i.shouldProcessApp(&app) {
preFilteredItems = append(preFilteredItems, app)
logCtx.Tracef("Allowing app %s in namespace %s", app.Name, app.Namespace)
} else {
logCtx.Tracef("Not allowing app %s in namespace %s", app.Name, app.Namespace)
i, err := informer.NewGenericInformer(&v1alpha1.Application{},
append([]informer.InformerOption{
informer.WithListCallback(func(options v1.ListOptions, namespace string) (runtime.Object, error) {
log().WithField("namespace", namespace).Infof("Listing Applications")
apps, err := client.ArgoprojV1alpha1().Applications(namespace).List(ctx, options)
log().Infof("List call returned %d Applications", len(apps.Items))
if ai.filterFunc != nil {
newItems := make([]v1alpha1.Application, 0)
for _, a := range apps.Items {
if ai.filterFunc(&a) {
newItems = append(newItems, a)
}
}
}

// The pre-filtered list is passed to the configured callback
// to perform custom filtering and tranformation.
if i.options.listCb != nil {
preFilteredItems = i.options.listCb(preFilteredItems)
}
appList.Items = preFilteredItems

if i.options.appMetrics != nil {
i.options.appMetrics.AppsWatched.Set(float64(len(appList.Items)))
}
logCtx.Tracef("Listed %d applications", len(appList.Items))
return appList, err
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
i.synced.Store(false)
ai.logger.Debugf("Listed %d Applications after filtering", len(newItems))
apps.Items = newItems
}
return apps, err
}),
informer.WithNamespaces(ai.namespaces...),
informer.WithWatchCallback(func(options v1.ListOptions, namespace string) (watch.Interface, error) {
log().Info("Watching Applications")
ai.synced.Store(false)
logCtx := log().WithField("component", "WatchFunc")
logCtx.Info("Starting application watcher")
defer i.synced.Store(true)
return i.appClient.ArgoprojV1alpha1().Applications(o.namespace).Watch(ctx, options)
},
},
&v1alpha1.Application{},
i.options.resync,
cache.Indexers{
cache.NamespaceIndex: func(obj interface{}) ([]string, error) {
return cache.MetaNamespaceIndexFunc(obj)
},
},
)
_, _ = i.appInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
defer ai.synced.Store(true)
return client.ArgoprojV1alpha1().Applications(namespace).Watch(ctx, options)
}),
informer.WithAddCallback(func(obj interface{}) {
log().Info("Add Applications Callback")
app, ok := obj.(*v1alpha1.Application)
if !ok || app == nil {
// if i.options.errorCb != nil {
// i.options.errorCb(nil, "invalid resource received by add event")
// }
return
}
logCtx := log().WithFields(logrus.Fields{
"component": "AddFunc",
"application": app.QualifiedName(),
})
logCtx.Trace("New application event")
if !i.shouldProcessApp(app) {
return
}
if i.options.newCb != nil {
i.options.newCb(app)
}
if i.options.appMetrics != nil {
i.options.appMetrics.AppsAdded.Inc()
i.options.appMetrics.AppsWatched.Inc()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldApp, oldOk := oldObj.(*v1alpha1.Application)
newApp, newOk := newObj.(*v1alpha1.Application)
if !newOk || !oldOk {
// if i.options.errorCb != nil {
// i.options.errorCb(nil, "invalid resource received by update event")
// }
return
}
logCtx := log().WithFields(logrus.Fields{}) //("component", "UpdateFunc")
logCtx.Tracef("Application update event")
logCtx = logCtx.WithField("application", newApp.Name)

// Namespace of new and old app must match. Theoretically, they
// should always match, but we safeguard.
if oldApp.Namespace != newApp.Namespace {
logCtx.Warnf("namespace mismatch between old and new app")
if !ok {
ai.logger.Errorf("Received add event for unknown type %T", obj)
return
}

if !i.shouldProcessApp(newApp) {
logCtx.Tracef("application not allowed")
return
}

if !i.options.filters.ProcessChange(oldApp, newApp) {
logCtx.Debugf("Change will not be processed")
ai.logger.Debugf("Application add event: %s", app.Name)
if ai.addFunc != nil {
ai.addFunc(app)
}
}),
informer.WithUpdateCallback(func(oldObj, newObj interface{}) {
log().Info("Update Applications Callback")
oldApp, oldAppOk := oldObj.(*v1alpha1.Application)
newApp, newAppOk := newObj.(*v1alpha1.Application)
if !newAppOk || !oldAppOk {
ai.logger.Errorf("Received update event for unknown type old:%T new:%T", oldObj, newObj)
return
}
updateCb := i.UpdateAppCallback()
if updateCb != nil {
updateCb(oldApp, newApp)
ai.logger.Debugf("Application update event: old:%s new:%s", oldApp.Name, newApp.Name)
if ai.updateFunc != nil {
ai.updateFunc(oldApp, newApp)
}
if i.options.appMetrics != nil {
i.options.appMetrics.AppsUpdated.Inc()
}
},
DeleteFunc: func(obj interface{}) {
logCtx := log().WithField("component", "DeleteFunc")
logCtx.Tracef("Application update event")
}),
informer.WithDeleteCallback(func(obj interface{}) {
log().Info("Delete AppProject Callback")
app, ok := obj.(*v1alpha1.Application)
if !ok || app == nil {
// if i.options.errorCb != nil {
// i.options.errorCb(nil, "invalid resource received by delete event")
// }
if !ok {
ai.logger.Errorf("Received delete event for unknown type %T", obj)
return
}
logCtx = logCtx.WithField("application", app.QualifiedName())
if !i.shouldProcessApp(app) {
logCtx.Tracef("Ignoring application delete event")
return
ai.logger.Debugf("AppProject delete event: %s", app.Name)
if ai.deleteFunc != nil {
ai.deleteFunc(app)
}
if i.options.deleteCb != nil {
i.options.deleteCb(app)
}),
informer.WithFilterFunc(func(obj interface{}) bool {
if ai.filterFunc == nil {
return true
}
if i.options.appMetrics != nil {
i.options.appMetrics.AppsRemoved.Inc()
i.options.appMetrics.AppsWatched.Dec()
app, ok := obj.(*v1alpha1.Application)
if !ok {
ai.logger.Errorf("Failed type conversion for unknown type %T", obj)
return false
}
},
},
)
i.appLister = applisters.NewApplicationLister(i.appInformer.GetIndexer())
// SetWatchErrorHandler only returns error when informer already started,
// so it should be safe to not handle the error.
_ = i.appInformer.SetWatchErrorHandler(cache.DefaultWatchErrorHandler)
return i
return ai.filterFunc(app)
}),
}, iopts...)...)

if err != nil {
return nil, err
}
ai.appInformer = i
ai.appLister = applisters.NewApplicationLister(ai.appInformer.Indexer())
return ai, nil
}

func (i *AppInformer) Start(ctx context.Context) {
scope := "namespace"
if len(i.namespaces) > 1 {
scope = "cluster"
}
log().Infof("Starting Application informer (scope: %s)", scope)
if err := i.appInformer.Start(ctx); err != nil {
i.logger.WithError(err).Error("Failed to start Application informer")
ctx.Done()
}
}

func (i *AppInformer) Start(stopch <-chan struct{}) {
ns := ""
if i.options.namespace != "" {
ns = i.options.namespace + ","
func (ai *AppInformer) Stop() error {
ai.logger.Infof("Stopping Application informer")
if err := ai.appInformer.Stop(); err != nil {
return err
}
log().Infof("Starting app informer (namespaces: %s)", strings.TrimSuffix(ns+strings.Join(i.options.namespaces, ","), ","))
i.appInformer.Run(stopch)
log().Infof("App informer has shutdown")
return nil
}

// shouldProcessApp returns true if the app is allowed to be processed
func (i *AppInformer) shouldProcessApp(app *v1alpha1.Application) bool {
return glob.MatchStringInList(append([]string{i.options.namespace}, i.options.namespaces...), app.Namespace, glob.REGEXP) &&
i.options.filters.Admit(app)
func (ai *AppInformer) shouldProcessApp(app *v1alpha1.Application) bool {
return glob.MatchStringInList(ai.namespaces, app.Namespace, glob.REGEXP) &&
ai.filters.Admit(app)
}

// EnsureSynced waits until either the AppInformer has fully synced or the
// timeout has been reached. In the latter case, an error will be returned.
// Note that this call blocks until either situation arises.
func (i *AppInformer) EnsureSynced(d time.Duration) error {
func (ai *AppInformer) EnsureSynced(d time.Duration) error {
tckr := time.NewTicker(d)
for {
select {
case <-tckr.C:
return fmt.Errorf("sync timeout reached")
default:
if i.synced.Load() {
if ai.synced.Load() {
return nil
}
}
Expand Down
Loading
Loading