Skip to content

Commit

Permalink
chore: add additional comments to code/structs, and sanity tests to v…
Browse files Browse the repository at this point in the history
…erify functions are called within the expected context (#151)

Signed-off-by: Jonathan West <[email protected]>
  • Loading branch information
jgwest authored Aug 15, 2024
1 parent 66e198f commit e93b66d
Show file tree
Hide file tree
Showing 22 changed files with 299 additions and 152 deletions.
69 changes: 51 additions & 18 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
kubeapp "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/application"
"github.com/argoproj-labs/argocd-agent/internal/event"
appinformer "github.com/argoproj-labs/argocd-agent/internal/informer/application"
"github.com/argoproj-labs/argocd-agent/internal/manager"
"github.com/argoproj-labs/argocd-agent/internal/manager/application"
"github.com/argoproj-labs/argocd-agent/internal/queue"
"github.com/argoproj-labs/argocd-agent/internal/version"
Expand All @@ -40,30 +41,41 @@ const waitForSyncedDuration = 10 * time.Second

// Agent is a controller that synchronizes Application resources
type Agent struct {
context context.Context
cancelFn context.CancelFunc
client kubernetes.Interface
appclient appclientset.Interface
options AgentOptions
namespace string
context context.Context
cancelFn context.CancelFunc
client kubernetes.Interface
appclient appclientset.Interface
options AgentOptions
// namespace is the namespace to manage applications in
namespace string
// allowedNamespaces is not currently used. See also 'namespaces' field in AgentOptions
allowedNamespaces []string
informer *appinformer.AppInformer
infStopCh chan struct{}
connected atomic.Bool
syncCh chan bool
remote *client.Remote
appManager *application.ApplicationManager
mode types.AgentMode
queues *queue.SendRecvQueues
emitter *event.EventSource
watchLock sync.RWMutex
version *version.Version
// informer is used to watch for change events for Argo CD Application resources on the cluster
informer *appinformer.AppInformer
// infStopCh is not currently used
infStopCh chan struct{}
connected atomic.Bool
// syncCh is not currently used
syncCh chan bool
remote *client.Remote
appManager *application.ApplicationManager
mode types.AgentMode
// queues is a queue of create/update/delete events to send to the principal
queues *queue.SendRecvQueues
emitter *event.EventSource
// At present, 'watchLock' is only acquired on calls to 'addAppUpdateToQueue'. This behaviour was added as a short-term attempt to preserve update event ordering. However, this is known to be problematic due to the potential for race conditions, both within itself, and between other event processors like deleteAppCallback.
watchLock sync.RWMutex
version *version.Version
}

const defaultQueueName = "default"

// AgentOptions defines the options for a given Controller
type AgentOptions struct {
// In the future, the 'namespaces' field may be used to support multiple Argo CD namespaces (for example, apps in any namespace) from a single agent instance, on a workload cluster. See 'filters.go' (in this package) for logic that reads from this value and avoids processing events outside of the specified namespaces.
// - However, note that the 'namespace' field of Agent is automatically included by default.
//
// However, as of this writing, this feature is not available.
namespaces []string
}

Expand All @@ -88,6 +100,10 @@ func NewAgent(ctx context.Context, client kubernetes.Interface, appclient appcli
}
}

if a.remote == nil {
return nil, fmt.Errorf("remote not defined")
}

// Initial state of the agent is disconnected
a.connected.Store(false)

Expand All @@ -99,6 +115,15 @@ func NewAgent(ctx context.Context, client kubernetes.Interface, appclient appcli
return nil, fmt.Errorf("unable to create default queue: %w", err)
}

var managerMode manager.ManagerMode
if a.mode == types.AgentModeAutonomous {
managerMode = manager.ManagerModeAutonomous
} else if a.mode == types.AgentModeManaged {
managerMode = manager.ManagerModeManaged
} else {
return nil, fmt.Errorf("unexpected agent mode: %v", a.mode)
}

a.informer = appinformer.NewAppInformer(ctx, a.appclient, a.namespace,
appinformer.WithListAppCallback(a.listAppCallback),
appinformer.WithNewAppCallback(a.addAppCreationToQueue),
Expand All @@ -112,13 +137,21 @@ func NewAgent(ctx context.Context, client kubernetes.Interface, appclient appcli
allowUpsert = true
}

var err error

// The agent only supports Kubernetes as application backend
a.appManager = application.NewApplicationManager(
a.appManager, err = application.NewApplicationManager(
kubeapp.NewKubernetesBackend(a.appclient, a.namespace, a.informer, true),
a.namespace,
application.WithAllowUpsert(allowUpsert),
application.WithRole(manager.ManagerRoleAgent),
application.WithMode(managerMode),
)

if err != nil {
return nil, err
}

a.syncCh = make(chan bool, 1)
return a, nil
}
Expand Down
2 changes: 1 addition & 1 deletion agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func newAgent(t *testing.T) *Agent {
func Test_NewAgent(t *testing.T) {
fakec := fakekube.NewFakeClientsetWithResources()
appc := fakeappclient.NewSimpleClientset()
agent, err := NewAgent(context.TODO(), fakec, appc, "agent")
agent, err := NewAgent(context.TODO(), fakec, appc, "agent", WithRemote(&client.Remote{}))
require.NotNil(t, agent)
require.NoError(t, err)
}
Expand Down
29 changes: 6 additions & 23 deletions agent/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (a *Agent) sender(stream eventstreamapi.EventStream_SubscribeClient) error
"direction": "Send",
"client_addr": grpcutil.AddressFromContext(stream.Context()),
})

q := a.queues.SendQ(a.remote.ClientID())
if q == nil {
return fmt.Errorf("no send queue found for the remote principal")
Expand All @@ -75,7 +76,7 @@ func (a *Agent) sender(stream eventstreamapi.EventStream_SubscribeClient) error
}
logCtx.Tracef("Grabbed an item")
if item == nil {
// FIXME: Is this really the right thing to do?
// TODO: Is this really the right thing to do?
return nil
}

Expand All @@ -98,7 +99,7 @@ func (a *Agent) sender(stream eventstreamapi.EventStream_SubscribeClient) error
if grpcutil.NeedReconnectOnError(err) {
return err
} else {
logCtx.Infof("Error while sending: %v", err)
logCtx.Errorf("Error while sending: %v", err)
return nil
}
}
Expand All @@ -119,7 +120,7 @@ func (a *Agent) receiver(stream eventstreamapi.EventStream_SubscribeClient) erro
if grpcutil.NeedReconnectOnError(err) {
return err
} else {
logCtx.Infof("Error while receiving: %v", err)
logCtx.Errorf("Error while receiving: %v", err)
return nil
}
}
Expand All @@ -133,26 +134,6 @@ func (a *Agent) receiver(stream eventstreamapi.EventStream_SubscribeClient) erro
if err != nil {
logCtx.WithError(err).Errorf("Unable to process incoming event")
}
// switch ev.Type() {
// case event.Create:
// _, err := a.createApplication(incomingApp)
// if err != nil {
// logCtx.Errorf("Error creating application: %v", err)
// }
// case event.SpecUpdate:
// _, err = a.updateApplication(incomingApp)
// if err != nil {
// logCtx.Errorf("Error updating application: %v", err)
// }
// case event.Delete:
// err = a.deleteApplication(incomingApp)
// if err != nil {
// logCtx.Errorf("Error deleting application: %v", err)
// }
// default:
// logCtx.Warnf("Received an unknown event: %s. Protocol mismatch?", ev.Type())
// }

return nil
}

Expand All @@ -175,6 +156,7 @@ func (a *Agent) handleStreamEvents() error {
})
logCtx.Info("Starting to receive events from event stream")
var err error
// Continuously retrieve events from the event stream 'inbox' and process them, while the stream is connected
for a.IsConnected() && err == nil {
err = a.receiver(stream)
if err != nil {
Expand All @@ -194,6 +176,7 @@ func (a *Agent) handleStreamEvents() error {
})
logCtx.Info("Starting to send events to event stream")
var err error
// Continuously read events from the 'outbox', and send them to principal, while the stream is connected
for a.IsConnected() && err == nil {
err = a.sender(stream)
if err != nil {
Expand Down
17 changes: 15 additions & 2 deletions agent/inbound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@ import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

backend_mocks "github.com/argoproj-labs/argocd-agent/internal/backend/mocks"
"github.com/argoproj-labs/argocd-agent/internal/manager"
"github.com/argoproj-labs/argocd-agent/internal/manager/application"
"github.com/argoproj-labs/argocd-agent/pkg/types"
)

func Test_CreateApplication(t *testing.T) {
a := newAgent(t)
be := backend_mocks.NewApplication(t)
a.appManager = application.NewApplicationManager(be, "argocd", application.WithAllowUpsert(true))
var err error
a.appManager, err = application.NewApplicationManager(be, "argocd", application.WithAllowUpsert(true))
require.NoError(t, err)
require.NotNil(t, a)
app := &v1alpha1.Application{ObjectMeta: v1.ObjectMeta{
Name: "test",
Expand Down Expand Up @@ -67,7 +70,9 @@ func Test_CreateApplication(t *testing.T) {
func Test_UpdateApplication(t *testing.T) {
a := newAgent(t)
be := backend_mocks.NewApplication(t)
a.appManager = application.NewApplicationManager(be, "argocd", application.WithAllowUpsert(true))
var err error
a.appManager, err = application.NewApplicationManager(be, "argocd", application.WithAllowUpsert(true))
require.NoError(t, err)
require.NotNil(t, a)
app := &v1alpha1.Application{
ObjectMeta: v1.ObjectMeta{
Expand All @@ -85,6 +90,8 @@ func Test_UpdateApplication(t *testing.T) {

t.Run("Update application using patch in managed mode", func(t *testing.T) {
a.mode = types.AgentModeManaged
a.appManager.Role = manager.ManagerRoleAgent
a.appManager.Mode = manager.ManagerModeManaged
getEvent := be.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&v1alpha1.Application{}, nil)
defer getEvent.Unset()
supportsPatchEvent := be.On("SupportsPatch").Return(true)
Expand All @@ -98,6 +105,8 @@ func Test_UpdateApplication(t *testing.T) {

t.Run("Update application using update in managed mode", func(t *testing.T) {
a.mode = types.AgentModeManaged
a.appManager.Role = manager.ManagerRoleAgent
a.appManager.Mode = manager.ManagerModeManaged
getEvent := be.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&v1alpha1.Application{}, nil)
defer getEvent.Unset()
supportsPatchEvent := be.On("SupportsPatch").Return(false)
Expand All @@ -111,6 +120,8 @@ func Test_UpdateApplication(t *testing.T) {

t.Run("Update application using patch in autonomous mode", func(t *testing.T) {
a.mode = types.AgentModeAutonomous
a.appManager.Role = manager.ManagerRoleAgent
a.appManager.Mode = manager.ManagerModeAutonomous
getEvent := be.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&v1alpha1.Application{}, nil)
defer getEvent.Unset()
supportsPatchEvent := be.On("SupportsPatch").Return(true)
Expand All @@ -124,6 +135,8 @@ func Test_UpdateApplication(t *testing.T) {

t.Run("Update application using update in autonomous mode", func(t *testing.T) {
a.mode = types.AgentModeAutonomous
a.appManager.Role = manager.ManagerRoleAgent
a.appManager.Mode = manager.ManagerModeAutonomous
getEvent := be.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&v1alpha1.Application{}, nil)
defer getEvent.Unset()
supportsPatchEvent := be.On("SupportsPatch").Return(false)
Expand Down
4 changes: 4 additions & 0 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func NewAgentRunCommand() *cobra.Command {
cmd.Fatal("No remote specified")
}

if namespace == "" {
cmd.Fatal("namespace value is empty and must be specified")
}

kubeConfig, err := cmd.GetKubeConfig(ctx, namespace, kubeConfig, kubeContext)
if err != nil {
cmd.Fatal("Could not load Kubernetes config: %v", err)
Expand Down
8 changes: 7 additions & 1 deletion internal/auth/userpass/userpass.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,17 @@ type UserPassAuthentication struct {
dbpath string
lock sync.RWMutex
userdb map[string]string
dummy []byte

// dummy is not a password itself, but rather is used to make timing attacks
// a little more complex: in the case where an invalid username is sent, we
// will hash the dummy password instead (to prevent attackers from
// determining whether a username exists by timing response times)
dummy []byte
}

// NewUserPassAuthentication creates a new instance of UserPassAuthentication
func NewUserPassAuthentication(path string) *UserPassAuthentication {
// see 'dummy' field of UserPassAuthentication for explanation of this value
dummy, _ := bcrypt.GenerateFromPassword([]byte("bdf3fdc6da5b5029e83f3024858c3c1e6aa3d1e71fa09e4691212f7571b5a3e3"), bcrypt.DefaultCost)
return &UserPassAuthentication{
userdb: make(map[string]string),
Expand Down
17 changes: 14 additions & 3 deletions internal/backend/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,23 @@ import (
)

type ApplicationSelector struct {
Labels map[string]string
Names []string

// Labels is not currently implemented.
Labels map[string]string

// Names is not currently implemented.
Names []string

// Namespaces is used by the 'List' Application interface function to restrict the list of Applications returned to a specific set of Namespaces.
Namespaces []string
Projects []string

// Projects is not currently implemented.
Projects []string
}

// Application defines a generic interface to store/track Argo CD Application state, via ApplicationManager.
//
// As of this writing (August 2024), the only implementation is a Kubernetes-based backend (KubernetesBackend in 'internal/backend/kubernetes/application') but other backends (e.g. RDBMS-backed) could be implemented in the future.
type Application interface {
List(ctx context.Context, selector ApplicationSelector) ([]v1alpha1.Application, error)
Create(ctx context.Context, app *v1alpha1.Application) (*v1alpha1.Application, error)
Expand Down
9 changes: 8 additions & 1 deletion internal/backend/kubernetes/application/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,16 @@ import (

var _ backend.Application = &KubernetesBackend{}

// KubernetesBackend is an implementation of the backend.Application interface, which is used by ApplicationManager to track/update the state of Argo CD Applications.
// KubernetesBackend stores/retrieves all data from Argo CD Application CRs on the cluster that is local to the agent/principal.
//
// KubernetesBackend is used by both the principal and agent components.
type KubernetesBackend struct {
// appClient is used to interfact with Argo CD Application resources on the cluster on which agent/principal is installed.
appClient appclientset.Interface
informer *appinformer.AppInformer
// informer is used to watch for change events for Argo CD Application resources on the cluster
informer *appinformer.AppInformer
// namespace is not currently read, is not guaranteed to be non-empty, and is not guaranteed to contain the source of Argo CD Application CRs in all cases
namespace string
usePatch bool
}
Expand Down
3 changes: 3 additions & 0 deletions internal/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type EventTarget string

const TypePrefix = "io.argoproj.argocd-agent.event"

// Supported EventTypes that are sent agent <-> principal
const (
Ping EventType = TypePrefix + ".ping"
Pong EventType = TypePrefix + ".pong"
Expand Down Expand Up @@ -63,10 +64,12 @@ func (t EventTarget) String() string {
return string(t)
}

// EventSource is a utility to construct new 'cloudevents.Event' events for a given 'source'
type EventSource struct {
source string
}

// Event is the 'on the wire' representation of an event, and is parsed by from protobuf via FromWire
type Event struct {
event *cloudevents.Event
target EventTarget
Expand Down
3 changes: 2 additions & 1 deletion internal/informer/application/appinformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const defaultResyncPeriod = 1 * time.Minute
// AppInformer is a filtering and customizable SharedIndexInformer for Argo CD
// Application resources in a cluster. It works across a configurable set of
// namespaces, lets you define a list of filters to indicate interest in the
// resource of a particular even and allows you to set up callbacks to handle
// resource of a particular event and allows you to set up callbacks to handle
// the events.
type AppInformer struct {
appClient appclientset.Interface
Expand All @@ -49,6 +49,7 @@ type AppInformer struct {
AppInformer cache.SharedIndexInformer
AppLister applisters.ApplicationLister

// lock should be acquired when reading/writing from callbacks defined in 'options' field
lock sync.RWMutex

// synced indicates whether the informer is synced and the watch is set up
Expand Down
1 change: 1 addition & 0 deletions internal/informer/application/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
// Options should not be modified concurrently, they are not implemented in a
// thread-safe way.
type AppInformerOptions struct {
// namespace will be set to "", if len(namespaces) > 0
namespace string
namespaces []string
appMetrics *metrics.ApplicationWatcherMetrics
Expand Down
Loading

0 comments on commit e93b66d

Please sign in to comment.