Skip to content

Commit

Permalink
chore(event-reporter): refactoring, variables grouping, code splitting (
Browse files Browse the repository at this point in the history
#351)

* event-reporter: created dedicated func resolveApplicationVersions

* event-reporter: added new types for variables grouping

* event-reporter: removed redundant code comment

* event-reporter(refactoring): params grouping, added new type ArgoTrackingMetadata to pass data between methods

* event-reporter(refactoring): params grouping, added new type ReportedEntityParentApp to pass data between methods

* event-reporter(refactoring): params grouping, added new type ReportedResource to pass data between methods, created new methods to group logic inside getResourceEventPayload

* event-reporter(refactoring): fixed nil pointer issues after refactoring

* event-reporter(refactoring): fixed lint issue

* event-reporter(refactoring): fixed lint issue

* event-reporter: changes after pr review
  • Loading branch information
oleksandr-codefresh authored Nov 7, 2024
1 parent 66dc000 commit 5af7ecc
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 166 deletions.
5 changes: 4 additions & 1 deletion event_reporter/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ func (c *eventReporterController) Run(ctx context.Context) {
}
trackingMethod := argoutil.GetTrackingMethod(c.settingsMgr)

err = c.applicationEventReporter.StreamApplicationEvents(ctx, &a, eventProcessingStartedAt, ignoreResourceCache, appInstanceLabelKey, trackingMethod)
err = c.applicationEventReporter.StreamApplicationEvents(ctx, &a, eventProcessingStartedAt, ignoreResourceCache, &reporter.ArgoTrackingMetadata{
AppInstanceLabelKey: &appInstanceLabelKey,
TrackingMethod: &trackingMethod,
})
if err != nil {
return err
}
Expand Down
87 changes: 55 additions & 32 deletions event_reporter/reporter/application_event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ type ApplicationEventReporter interface {
a *appv1.Application,
eventProcessingStartedAt string,
ignoreResourceCache bool,
appInstanceLabelKey string,
trackingMethod appv1.TrackingMethod,
argoTrackingMetadata *ArgoTrackingMetadata,
) error
ShouldSendApplicationEvent(ae *appv1.ApplicationWatchEvent) (shouldSend bool, syncStatusChanged bool)
}
Expand Down Expand Up @@ -113,8 +112,7 @@ func (s *applicationEventReporter) StreamApplicationEvents(
a *appv1.Application,
eventProcessingStartedAt string,
ignoreResourceCache bool,
appInstanceLabelKey string,
trackingMethod appv1.TrackingMethod,
argoTrackingMetadata *ArgoTrackingMetadata,
) error {
metricTimer := metricsUtils.NewMetricTimer()

Expand All @@ -141,18 +139,11 @@ func (s *applicationEventReporter) StreamApplicationEvents(

desiredManifests, manifestGenErr := s.getDesiredManifests(ctx, a, nil, logCtx)

syncRevision := utils.GetOperationStateRevision(a)
var applicationVersions *apiclient.ApplicationVersions
if syncRevision != nil {
syncManifests, _ := s.getDesiredManifests(ctx, a, syncRevision, logCtx)
applicationVersions = syncManifests.GetApplicationVersions()
} else {
applicationVersions = nil
}
applicationVersions := s.resolveApplicationVersions(ctx, a, logCtx)

logCtx.Info("getting parent application name")

parentAppIdentity := utils.GetParentAppIdentity(a, appInstanceLabelKey, trackingMethod)
parentAppIdentity := utils.GetParentAppIdentity(a, *argoTrackingMetadata.AppInstanceLabelKey, *argoTrackingMetadata.TrackingMethod)

if utils.IsChildApp(parentAppIdentity) {
logCtx.Info("processing as child application")
Expand All @@ -165,27 +156,29 @@ func (s *applicationEventReporter) StreamApplicationEvents(
}

rs := utils.GetAppAsResource(a)
utils.SetHealthStatusIfMissing(rs)

parentDesiredManifests, manifestGenErr := s.getDesiredManifests(ctx, parentApplicationEntity, nil, logCtx)

// helm app hasnt revision
// TODO: add check if it helm application
parentAppSyncRevisionsMetadata, err := s.getApplicationRevisionsMetadata(ctx, logCtx, parentApplicationEntity)
if err != nil {
logCtx.WithError(err).Warn("failed to get parent application's revision metadata, resuming")
}

utils.SetHealthStatusIfMissing(rs)
err = s.processResource(ctx, *rs, parentApplicationEntity, logCtx, eventProcessingStartedAt, parentDesiredManifests, appTree, manifestGenErr, a, parentAppSyncRevisionsMetadata, appInstanceLabelKey, trackingMethod, applicationVersions)
err = s.processResource(ctx, *rs, logCtx, eventProcessingStartedAt, parentDesiredManifests, manifestGenErr, a, applicationVersions, &ReportedEntityParentApp{
app: parentApplicationEntity,
appTree: appTree,
revisionsMetadata: parentAppSyncRevisionsMetadata,
}, argoTrackingMetadata)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricChildAppEventType, metrics.MetricEventUnknownErrorType, a.Name)
return err
}
s.metricsServer.ObserveEventProcessingDurationHistogramDuration(a.Name, metrics.MetricChildAppEventType, metricTimer.Duration())
} else {
logCtx.Info("processing as root application")
// will get here only for root applications (not managed as a resource by another application)
appEvent, err := s.getApplicationEventPayload(ctx, a, appTree, eventProcessingStartedAt, appInstanceLabelKey, trackingMethod, applicationVersions)
logCtx.Info("processing as root application")
appEvent, err := s.getApplicationEventPayload(ctx, a, appTree, eventProcessingStartedAt, applicationVersions, argoTrackingMetadata)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricParentAppEventType, metrics.MetricEventGetPayloadErrorType, a.Name)
return fmt.Errorf("failed to get application event: %w", err)
Expand Down Expand Up @@ -216,7 +209,11 @@ func (s *applicationEventReporter) StreamApplicationEvents(
s.metricsServer.IncCachedIgnoredEventsCounter(metrics.MetricResourceEventType, a.Name)
continue
}
err := s.processResource(ctx, rs, a, logCtx, eventProcessingStartedAt, desiredManifests, appTree, manifestGenErr, nil, revisionsMetadata, appInstanceLabelKey, trackingMethod, nil)
err := s.processResource(ctx, rs, logCtx, eventProcessingStartedAt, desiredManifests, manifestGenErr, nil, nil, &ReportedEntityParentApp{
app: a,
appTree: appTree,
revisionsMetadata: revisionsMetadata,
}, argoTrackingMetadata)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metrics.MetricResourceEventType, metrics.MetricEventUnknownErrorType, a.Name)
return err
Expand All @@ -225,6 +222,16 @@ func (s *applicationEventReporter) StreamApplicationEvents(
return nil
}

func (s *applicationEventReporter) resolveApplicationVersions(ctx context.Context, a *appv1.Application, logCtx *log.Entry) *apiclient.ApplicationVersions {
syncRevision := utils.GetOperationStateRevision(a)
if syncRevision == nil {
return nil
}

syncManifests, _ := s.getDesiredManifests(ctx, a, syncRevision, logCtx)
return syncManifests.GetApplicationVersions()
}

func (s *applicationEventReporter) getAppForResourceReporting(
rs appv1.ResourceStatus,
ctx context.Context,
Expand Down Expand Up @@ -252,17 +259,14 @@ func (s *applicationEventReporter) getAppForResourceReporting(
func (s *applicationEventReporter) processResource(
ctx context.Context,
rs appv1.ResourceStatus,
parentApplication *appv1.Application,
logCtx *log.Entry,
appEventProcessingStartedAt string,
desiredManifests *apiclient.ManifestResponse,
appTree *appv1.ApplicationTree,
manifestGenErr bool,
originalApplication *appv1.Application,
revisionsMetadata *utils.AppSyncRevisionsMetadata,
appInstanceLabelKey string,
trackingMethod appv1.TrackingMethod,
applicationVersions *apiclient.ApplicationVersions,
originalApplication *appv1.Application, // passed onlu if resource is app
applicationVersions *apiclient.ApplicationVersions, // passed onlu if resource is app
reportedEntityParentApp *ReportedEntityParentApp,
argoTrackingMetadata *ArgoTrackingMetadata,
) error {
metricsEventType := metrics.MetricResourceEventType
if utils.IsApp(rs) {
Expand All @@ -277,25 +281,44 @@ func (s *applicationEventReporter) processResource(
// get resource desired state
desiredState := getResourceDesiredState(&rs, desiredManifests, logCtx)

actualState, err := s.getResourceActualState(ctx, logCtx, metricsEventType, rs, parentApplication, originalApplication)
actualState, err := s.getResourceActualState(ctx, logCtx, metricsEventType, rs, reportedEntityParentApp.app, originalApplication)
if err != nil {
return err
}
if actualState == nil {
return nil
}

parentApplicationToReport, revisionMetadataToReport := s.getAppForResourceReporting(rs, ctx, logCtx, parentApplication, revisionsMetadata)
parentApplicationToReport, revisionMetadataToReport := s.getAppForResourceReporting(rs, ctx, logCtx, reportedEntityParentApp.app, reportedEntityParentApp.revisionsMetadata)

var originalAppRevisionMetadata *utils.AppSyncRevisionsMetadata = nil

if originalApplication != nil {
originalAppRevisionMetadata, _ = s.getApplicationRevisionsMetadata(ctx, logCtx, originalApplication)
}

ev, err := getResourceEventPayload(parentApplicationToReport, &rs, actualState, desiredState, appTree, manifestGenErr, appEventProcessingStartedAt, originalApplication, revisionMetadataToReport, originalAppRevisionMetadata, appInstanceLabelKey, trackingMethod, applicationVersions)
ev, err := getResourceEventPayload(
appEventProcessingStartedAt,
&ReportedResource{
rs: &rs,
actualState: actualState,
desiredState: desiredState,
manifestGenErr: manifestGenErr,
rsAsAppInfo: &ReportedResourceAsApp{
app: originalApplication,
revisionsMetadata: originalAppRevisionMetadata,
applicationVersions: applicationVersions,
},
},
&ReportedEntityParentApp{
app: parentApplicationToReport,
appTree: reportedEntityParentApp.appTree,
revisionsMetadata: revisionMetadataToReport,
},
argoTrackingMetadata,
)
if err != nil {
s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventGetPayloadErrorType, parentApplication.Name)
s.metricsServer.IncErroredEventsCounter(metricsEventType, metrics.MetricEventGetPayloadErrorType, reportedEntityParentApp.app.Name)
logCtx.WithError(err).Warn("failed to get event payload, resuming")
return nil
}
Expand All @@ -307,7 +330,7 @@ func (s *applicationEventReporter) processResource(
appName = appRes.Name
} else {
utils.LogWithResourceStatus(logCtx, rs).Info("streaming resource event")
appName = parentApplication.Name
appName = reportedEntityParentApp.app.Name
}

if err := s.codefreshClient.SendEvent(ctx, appName, ev); err != nil {
Expand Down
4 changes: 1 addition & 3 deletions event_reporter/reporter/application_event_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@ import (
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo-cd/v2/common"
"github.com/argoproj/argo-cd/v2/pkg/apiclient/application"
"github.com/argoproj/argo-cd/v2/pkg/apiclient/events"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/pkg/codefresh"
"github.com/argoproj/argo-cd/v2/util/argo"
)

const (
Expand Down Expand Up @@ -226,7 +224,7 @@ func TestStreamApplicationEvent(t *testing.T) {
assert.Equal(t, *app, actualApp)
return nil
}
_ = eventReporter.StreamApplicationEvents(context.Background(), app, "", false, common.LabelKeyAppInstance, argo.TrackingMethodLabel)
_ = eventReporter.StreamApplicationEvents(context.Background(), app, "", false, getMockedArgoTrackingMetadata())
})
}

Expand Down
Loading

0 comments on commit 5af7ecc

Please sign in to comment.