Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 198 additions & 0 deletions applicationset/controllers/applicationset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package controllers

import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -959,6 +960,189 @@ func (r *ApplicationSetReconciler) removeOwnerReferencesOnDeleteAppSet(ctx conte
return nil
}

// getEarliestWaitingTransitionTimeOfAppset extracts the earliest LastTransitionTime from ApplicationSet status
// for Applications in Waiting state that have had a revision change (not new apps).
// Returns nil if no Waiting applications with UNRECONCILED revision changes are found.
func getEarliestWaitingTransitionTimeOfAppset(appset *argov1alpha1.ApplicationSet, applications []argov1alpha1.Application) *metav1.Time {
// Build a map of app name to Application for quick lookup
appMap := make(map[string]*argov1alpha1.Application)
for i := range applications {
appMap[applications[i].Name] = &applications[i]
}

var earliest *metav1.Time
for _, appStatus := range appset.Status.ApplicationStatus {
// Only consider apps in Waiting state that have a transition time
// The message "Application has pending changes" indicates a revision change (not a new app)
if appStatus.Status != argov1alpha1.ProgressiveSyncWaiting ||
appStatus.LastTransitionTime == nil ||
!strings.Contains(appStatus.Message, "pending changes") {
continue
}

app, exists := appMap[appStatus.Application]
if !exists {
continue
}

// If the app has been reconciled after transitioning to Waiting AND has the correct revision,
if app.Status.ReconciledAt != nil &&
app.Status.ReconciledAt.After(appStatus.LastTransitionTime.Time) &&
reflect.DeepEqual(app.Status.GetRevisions(), appStatus.TargetRevisions) {
// This app has already been reconciled with the correct revision, skip it
continue
}

// This app needs reconciliation
if earliest == nil || appStatus.LastTransitionTime.Before(earliest) {
earliest = appStatus.LastTransitionTime
}
}
return earliest
}

// addRefreshAnnotationToApplications adds the refresh annotation to all Applications owned by the ApplicationSet
func (r *ApplicationSetReconciler) addRefreshAnnotationToApplications(ctx context.Context, logCtx *log.Entry, appset argov1alpha1.ApplicationSet, applications []argov1alpha1.Application) error {
for _, app := range applications {
// Check if annotation already exists
if app.Annotations != nil && app.Annotations[argov1alpha1.AnnotationKeyRefresh] != "" {
logCtx.WithField("app", app.Name).Debug("Refresh annotation already present, skipping")
continue
}

// Patch the application with the refresh annotation
patch := map[string]any{
"metadata": map[string]any{
"annotations": map[string]string{
argov1alpha1.AnnotationKeyRefresh: string(argov1alpha1.RefreshTypeNormal),
},
},
}
patchJSON, err := json.Marshal(patch)
if err != nil {
return fmt.Errorf("error marshaling refresh annotation patch for app %s: %w", app.Name, err)
}

err = r.Client.Patch(ctx, &app, client.RawPatch(types.MergePatchType, patchJSON))
if err != nil {
return fmt.Errorf("error adding refresh annotation to app %s: %w", app.Name, err)
}
logCtx.WithField("app", app.Name).Info("Added refresh annotation to Application")
}
return nil
}

// checkAllApplicationsReconciled verifies that all Applications have been reconciled since the given time,
// that no refresh annotations are present, and that the reconciled revision matches the target revision
func checkAllApplicationsReconciled(applications []argov1alpha1.Application, logCtx *log.Entry, appset *argov1alpha1.ApplicationSet, sinceTime *metav1.Time) bool {
if sinceTime == nil {
return true
}

for _, app := range applications {
if app.Annotations != nil && app.Annotations[argov1alpha1.AnnotationKeyRefresh] != "" {
logCtx.Debug("Application still has refresh annotation, waiting for reconciliation")
return false
}

// Check if ReconciledAt is set and is after sinceTime
if app.Status.ReconciledAt == nil {
logCtx.Debug("Application ReconciledAt is nil, not yet reconciled")
return false
}
if !app.Status.ReconciledAt.After(sinceTime.Time) {
logCtx.WithFields(log.Fields{
"app": app.Name,
"reconciledAt": app.Status.ReconciledAt.Time,
"sinceTime": sinceTime.Time,
"reconciledIsAfter": app.Status.ReconciledAt.After(sinceTime.Time),
}).Debug("Application not reconciled after transition time")
return false
}

// This ensures the app actually picked up the new revision
idx := findApplicationStatusIndex(appset.Status.ApplicationStatus, app.Name)
if idx != -1 {
appStatus := appset.Status.ApplicationStatus[idx]
currentRevisions := app.Status.GetRevisions()

if !reflect.DeepEqual(currentRevisions, appStatus.TargetRevisions) {
logCtx.WithFields(log.Fields{
"app": app.Name,
"currentRevisions": currentRevisions,
"targetRevisions": appStatus.TargetRevisions,
}).Debug("Application reconciled but revision doesn't match target - waiting for correct revision")
return false
}
}
}
return true
}

// ensureApplicationsReconciled ensures all Applications are reconciled before proceeding with progressive sync
// It adds refresh annotations if needed and checks if all apps have been reconciled
func (r *ApplicationSetReconciler) ensureApplicationsReconciled(ctx context.Context, logCtx *log.Entry, appset *argov1alpha1.ApplicationSet, applications []argov1alpha1.Application) (bool, error) {
// Get the earliest transition time for Waiting status (only for revision changes, not new apps) of owned Applications
earliestWaitingTime := getEarliestWaitingTransitionTimeOfAppset(appset, applications)

if earliestWaitingTime == nil {
logCtx.Debug("No applications with pending revision changes, skipping reconciliation check") // Or a new app
return true, nil
}

logCtx.WithField("earliest_waiting_time", earliestWaitingTime.Time).Info("Applications have pending revision changes, checking if reconciliation needed")

// TODO: remove - Logs current state of all applications
for _, app := range applications {
hasAnnotation := app.Annotations != nil && app.Annotations[argov1alpha1.AnnotationKeyRefresh] != ""
var reconciledAt string
if app.Status.ReconciledAt != nil {
reconciledAt = app.Status.ReconciledAt.Time.Format(time.RFC3339)
} else {
reconciledAt = "nil"
}
logCtx.WithFields(log.Fields{
"app": app.Name,
"hasRefresh": hasAnnotation,
"reconciledAt": reconciledAt,
"health": app.Status.Health.Status,
"sync": app.Status.Sync.Status,
}).Debug("Application state before reconciliation check")
}

// Check if all applications have been reconciled since the earliestWaitingTime
allReconciled := checkAllApplicationsReconciled(applications, logCtx, appset, earliestWaitingTime)
if allReconciled {
logCtx.Info("All Applications have been reconciled, proceeding with progressive sync")
return true, nil
}

// log apps that still have refresh annotation
appsWithAnnotation := []string{}
for _, app := range applications {
if app.Annotations != nil && app.Annotations[argov1alpha1.AnnotationKeyRefresh] != "" {
appsWithAnnotation = append(appsWithAnnotation, app.Name)
}
}

if len(appsWithAnnotation) > 0 {
// We've already added annotations, just waiting for reconciliation
logCtx.WithField("apps_with_annotation", appsWithAnnotation).Info("Waiting for Applications with refresh annotations to be reconciled")
return false, nil
}

// add refresh annotations to trigger reconciliation
logCtx.Info("Applications have pending changes, adding refresh annotations to all Applications to trigger reconciliation")
// TODO: check if util/argo/argo.go RefreshApp can be used instead
err := r.addRefreshAnnotationToApplications(ctx, logCtx, *appset, applications)
if err != nil {
return false, fmt.Errorf("failed to add refresh annotations: %w", err)
}

logCtx.Info("Refresh annotations added to all applications, waiting for application controller to reconcile them")
return false, nil
}

func (r *ApplicationSetReconciler) performProgressiveSyncs(ctx context.Context, logCtx *log.Entry, appset argov1alpha1.ApplicationSet, applications []argov1alpha1.Application, desiredApplications []argov1alpha1.Application) (map[string]bool, error) {
appDependencyList, appStepMap := r.buildAppDependencyList(logCtx, appset, desiredApplications)

Expand All @@ -967,6 +1151,17 @@ func (r *ApplicationSetReconciler) performProgressiveSyncs(ctx context.Context,
return nil, fmt.Errorf("failed to update applicationset app status: %w", err)
}

// Ensure all applications are reconciled before proceeding with progressive sync
allReconciled, err := r.ensureApplicationsReconciled(ctx, logCtx, &appset, applications)
if err != nil {
return nil, fmt.Errorf("failed to ensure applications reconciled: %w", err)
}
if !allReconciled {
// Not all applications are reconciled yet, return empty sync map to prevent progression
logCtx.Debug("Progressive sync blocked until all applications are reconciled")
return map[string]bool{}, nil
}

logCtx.Infof("ApplicationSet %v step list:", appset.Name)
for stepIndex, applicationNames := range appDependencyList {
logCtx.Infof("step %v: %+v", stepIndex+1, applicationNames)
Expand Down Expand Up @@ -1202,6 +1397,9 @@ func (r *ApplicationSetReconciler) updateApplicationSetApplicationStatus(ctx con
newAppStatus.Message = "Application resource has synced, updating status to Progressing"
}
}
//TODO: add else-if condition here instead to check for earliest transition time and add refresh annotations
// Checking for transition time after updating applicationStatus' is not working - apps syncing out of order after waiting for apps to refresh.

} else {
// The target revision is the same, so we need to evaluate the current revision progress
if currentAppStatus.Status == argov1alpha1.ProgressiveSyncPending {
Expand Down
Loading
Loading