Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ClientIntent status & event reporting robustness by using LRU cache & caching only after successful report #484

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 6 additions & 16 deletions src/shared/operator_cloud_client/cloud_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type CloudClient interface {
ReportNetworkPolicies(ctx context.Context, namespace string, policies []graphqlclient.NetworkPolicyInput) error
ReportExternallyAccessibleServices(ctx context.Context, namespace string, services []graphqlclient.ExternallyAccessibleServiceInput) error
ReportProtectedServices(ctx context.Context, namespace string, protectedServices []graphqlclient.ProtectedServiceInput) error
ReportIntentEvents(ctx context.Context, events []graphqlclient.ClientIntentEventInput)
ReportClientIntentStatuses(ctx context.Context, statuses []graphqlclient.ClientIntentStatusInput)
ReportIntentEvents(ctx context.Context, events []graphqlclient.ClientIntentEventInput) error
ReportClientIntentStatuses(ctx context.Context, statuses []graphqlclient.ClientIntentStatusInput) error
}

type CloudClientImpl struct {
Expand Down Expand Up @@ -116,22 +116,12 @@ func (c *CloudClientImpl) ReportProtectedServices(ctx context.Context, namespace
return errors.Wrap(err)
}

func (c *CloudClientImpl) ReportIntentEvents(ctx context.Context, events []graphqlclient.ClientIntentEventInput) {
func (c *CloudClientImpl) ReportIntentEvents(ctx context.Context, events []graphqlclient.ClientIntentEventInput) error {
_, err := graphqlclient.ReportClientIntentEvents(ctx, c.client, events)
if err != nil {
logrus.WithError(err).Error("failed to report intent events")
return
}
logrus.Info("Intent events reported to cloud successfully")
return
return errors.Wrap(err)
}

func (c *CloudClientImpl) ReportClientIntentStatuses(ctx context.Context, statuses []graphqlclient.ClientIntentStatusInput) {
func (c *CloudClientImpl) ReportClientIntentStatuses(ctx context.Context, statuses []graphqlclient.ClientIntentStatusInput) error {
_, err := graphqlclient.ReportClientIntentStatuses(ctx, c.client, statuses)
if err != nil {
logrus.WithError(err).Error("failed to report intent statuses")
return
}
logrus.Info("Intent statuses reported to cloud successfully")
return
return errors.Wrap(err)
}
141 changes: 93 additions & 48 deletions src/shared/operator_cloud_client/intent_events_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package operator_cloud_client

import (
"context"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/otterize/intents-operator/src/operator/api/v2alpha1"
"github.com/otterize/intents-operator/src/shared/errors"
"github.com/otterize/intents-operator/src/shared/otterizecloud/graphqlclient"
Expand Down Expand Up @@ -30,23 +30,22 @@ type intentStatusDetails struct {
ObservedGeneration int
}

type IntentEvent struct {
Event v1.Event
Intent v2alpha1.ClientIntents
}

type IntentEventsPeriodicReporter struct {
cloudClient CloudClient
k8sClient client.Client
k8sClusterManager ctrl.Manager
eventCache *lru.Cache[eventKey, eventGeneration]
statusCache *lru.Cache[intentStatusKey, intentStatusDetails]
eventCache *expirable.LRU[eventKey, eventGeneration]
statusCache *expirable.LRU[intentStatusKey, intentStatusDetails]
}

func NewIntentEventsSender(cloudClient CloudClient, k8sClusterManager ctrl.Manager) (*IntentEventsPeriodicReporter, error) {
eventCache, err := lru.New[eventKey, eventGeneration](1000)
if err != nil {
return nil, errors.Wrap(err)
}
statusCache, err := lru.New[intentStatusKey, intentStatusDetails](1000)
if err != nil {
return nil, errors.Wrap(err)
}
eventCache := expirable.NewLRU[eventKey, eventGeneration](1000, nil, time.Hour)
statusCache := expirable.NewLRU[intentStatusKey, intentStatusDetails](1000, nil, time.Hour)

return &IntentEventsPeriodicReporter{
cloudClient: cloudClient,
Expand Down Expand Up @@ -125,62 +124,81 @@ func (ies *IntentEventsPeriodicReporter) waitForCacheSync(ctx context.Context) {
}

func (ies *IntentEventsPeriodicReporter) reportIntentEvents(ctx context.Context) {
gqlEvents, err := ies.queryIntentEvents(ctx)
intentEvents, err := ies.queryIntentEvents(ctx)
if err != nil {
logrus.WithError(err).Error("Failed to query intent events")
return
}

if len(gqlEvents) == 0 {
if len(intentEvents) == 0 {
logrus.WithField("events_in_cache", ies.eventCache.Len()).Debug("No new intent events to report")
return
}

ies.doReportEvents(ctx, gqlEvents)
logrus.WithField("events", len(intentEvents)).Info("Reporting intent events")

err = ies.doReportEvents(ctx, intentEvents)
if err != nil {
logrus.WithError(err).Error("Failed to report intent events")
return
}

ies.cacheReportedEvents(intentEvents)
}

func (ies *IntentEventsPeriodicReporter) doReportEvents(ctx context.Context, gqlEvents []graphqlclient.ClientIntentEventInput) {
func (ies *IntentEventsPeriodicReporter) doReportEvents(ctx context.Context, intentEvents []IntentEvent) error {
timeoutCtx, cancel := context.WithTimeout(ctx, viper.GetDuration(otterizecloudclient.CloudClientTimeoutKey))
defer cancel()

gqlEvents := lo.Map(intentEvents, func(intentEvent IntentEvent, _ int) graphqlclient.ClientIntentEventInput {
return eventToGQLEvent(intentEvent.Intent, intentEvent.Event)
})

eventChunks := lo.Chunk(gqlEvents, 100)
for _, chunk := range eventChunks {
ies.cloudClient.ReportIntentEvents(timeoutCtx, chunk)
err := ies.cloudClient.ReportIntentEvents(timeoutCtx, chunk)
if err != nil {
return errors.Wrap(err)
}
}
return nil
}

func (ies *IntentEventsPeriodicReporter) queryIntentEvents(ctx context.Context) ([]graphqlclient.ClientIntentEventInput, error) {
func (ies *IntentEventsPeriodicReporter) queryIntentEvents(ctx context.Context) ([]IntentEvent, error) {
events := v1.EventList{}
gqlEvents := make([]graphqlclient.ClientIntentEventInput, 0)
err := ies.k8sClient.List(ctx, &events, client.MatchingFields{involvedObjectKindField: "ClientIntents"})
if err != nil {
return nil, errors.Wrap(err)
}
if len(events.Items) == 0 {
logrus.Debugf("No events in list")
return gqlEvents, nil
}
for _, event := range events.Items {

filteredEvents := lo.Filter(events.Items, func(event v1.Event, _ int) bool {
key := eventKey(event.UID)
if cachedGeneration, ok := ies.eventCache.Get(key); ok && cachedGeneration == eventGeneration(event.Generation) {
continue
cachedGeneration, ok := ies.eventCache.Get(key)
return !ok || cachedGeneration != eventGeneration(event.Generation)
})

}
intentEvents := lo.FilterMap(filteredEvents, func(event v1.Event, _ int) (IntentEvent, bool) {
intent := v2alpha1.ClientIntents{}
err := ies.k8sClient.Get(ctx, client.ObjectKey{Namespace: event.InvolvedObject.Namespace, Name: event.InvolvedObject.Name}, &intent)
if err != nil {
logrus.Errorf("Failed to get intent %s/%s: %v", event.InvolvedObject.Namespace, event.InvolvedObject.Name, err)
continue
return IntentEvent{}, false
}
gqlEvents = append(gqlEvents, eventToGQLEvent(intent, event))
ies.eventCache.Add(key, eventGeneration(event.Generation))

return IntentEvent{Event: event, Intent: intent}, true
})

return intentEvents, nil
}

func (ies *IntentEventsPeriodicReporter) cacheReportedEvents(intentEvents []IntentEvent) {
for _, intentEvent := range intentEvents {
ies.eventCache.Add(eventKey(intentEvent.Event.UID), eventGeneration(intentEvent.Event.Generation))
}
return gqlEvents, nil
}

func (ies *IntentEventsPeriodicReporter) reportIntentStatuses(ctx context.Context) {
statuses, err := ies.queryIntentStatuses(ctx)

if err != nil {
logrus.WithError(err).Error("Failed to query intent statuses")
return
Expand All @@ -191,38 +209,65 @@ func (ies *IntentEventsPeriodicReporter) reportIntentStatuses(ctx context.Contex
return
}

ies.doReportStatuses(ctx, statuses)
logrus.WithField("statuses", len(statuses)).Info("Reporting intent statuses")

err = ies.doReportStatuses(ctx, statuses)
if err != nil {
logrus.WithError(err).Error("Failed to report intent statuses")
return
}

ies.cacheReportedIntentStatuses(statuses)
}

func (ies *IntentEventsPeriodicReporter) doReportStatuses(ctx context.Context, statuses []graphqlclient.ClientIntentStatusInput) {
func (ies *IntentEventsPeriodicReporter) doReportStatuses(ctx context.Context, intents []v2alpha1.ClientIntents) error {
timeoutCtx, cancel := context.WithTimeout(ctx, viper.GetDuration(otterizecloudclient.CloudClientTimeoutKey))
defer cancel()
statusChunks := lo.Chunk(statuses, 100)

gqlStatuses := lo.Map(intents, func(intent v2alpha1.ClientIntents, _ int) graphqlclient.ClientIntentStatusInput {
return statusToGQLStatus(intent)
})

statusChunks := lo.Chunk(gqlStatuses, 100)
for _, chunk := range statusChunks {
ies.cloudClient.ReportClientIntentStatuses(timeoutCtx, chunk)
err := ies.cloudClient.ReportClientIntentStatuses(timeoutCtx, chunk)
if err != nil {
return errors.Wrap(err)
}
}

return nil
}

func (ies *IntentEventsPeriodicReporter) queryIntentStatuses(ctx context.Context) ([]graphqlclient.ClientIntentStatusInput, error) {
func (ies *IntentEventsPeriodicReporter) queryIntentStatuses(ctx context.Context) ([]v2alpha1.ClientIntents, error) {
clientIntents := v2alpha1.ClientIntentsList{}
err := ies.k8sClient.List(ctx, &clientIntents)
if err != nil {
return nil, errors.Wrap(err)
}

gqlStatuses := make([]graphqlclient.ClientIntentStatusInput, 0)
for _, intent := range clientIntents.Items {
if cachedDetails, ok := ies.statusCache.Get(intentStatusKey(intent.UID)); ok && cachedDetails.Generation == int(intent.Generation) && cachedDetails.UpToDate == intent.Status.UpToDate && cachedDetails.ObservedGeneration == int(intent.Status.ObservedGeneration) {
continue
}
gqlStatuses = append(gqlStatuses, statusToGQLStatus(intent))
ies.statusCache.Add(intentStatusKey(intent.UID), intentStatusDetails{
Generation: int(intent.Generation),
UpToDate: intent.Status.UpToDate,
ObservedGeneration: int(intent.Status.ObservedGeneration),
})
filteredIntents := lo.Filter(clientIntents.Items, func(intent v2alpha1.ClientIntents, _ int) bool {
cachedDetails, ok := ies.statusCache.Get(intentStatusKey(intent.UID))
return !ok ||
cachedDetails.Generation != int(intent.Generation) ||
cachedDetails.UpToDate != intent.Status.UpToDate ||
cachedDetails.ObservedGeneration != int(intent.Status.ObservedGeneration)
})

return filteredIntents, nil
}

func (ies *IntentEventsPeriodicReporter) cacheReportedIntentStatuses(intents []v2alpha1.ClientIntents) {
for _, intent := range intents {
ies.statusCache.Add(
intentStatusKey(intent.UID),
intentStatusDetails{
Generation: int(intent.Generation),
UpToDate: intent.Status.UpToDate,
ObservedGeneration: int(intent.Status.ObservedGeneration),
},
)
}
return gqlStatuses, nil
}

func statusToGQLStatus(intent v2alpha1.ClientIntents) graphqlclient.ClientIntentStatusInput {
Expand Down
Loading
Loading