diff --git a/internal/events/constants.go b/internal/events/constants.go index e7e2d95961..9c96eccf70 100644 --- a/internal/events/constants.go +++ b/internal/events/constants.go @@ -47,4 +47,6 @@ const ( TopicQueueReconcileEntityDelete = "internal.entity.delete.event" // TopicQueueReconcileEntityAdd is the topic for reconciling when an entity is added TopicQueueReconcileEntityAdd = "internal.entity.add.event" + // TopicQueueRepoReminder is the topic for repo reminder events + TopicQueueRepoReminder = "repo.reminder.event" ) diff --git a/internal/reminder/messages/messages.go b/internal/reminder/messages/messages.go new file mode 100644 index 0000000000..cd67f1ebea --- /dev/null +++ b/internal/reminder/messages/messages.go @@ -0,0 +1,67 @@ +// Copyright 2024 Stacklok, Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package messages contains the messages used by the reminder service +package messages + +import ( + "encoding/json" + "fmt" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/go-playground/validator/v10" + "github.com/google/uuid" +) + +// RepoReminderEvent is an event that is published by the reminder service to trigger repo reconciliation +type RepoReminderEvent struct { + // Project is the project that the event is relevant to + Project uuid.UUID `json:"project"` + // RepositoryID is id of the repository to be reconciled + RepositoryID int64 `json:"repository" validate:"gte=0"` + // ProviderID is the provider of the repository + ProviderID uuid.UUID `json:"provider"` +} + +// NewRepoReminderMessage creates a new repo reminder message +func NewRepoReminderMessage(providerId uuid.UUID, repoID int64, projectID uuid.UUID) (*message.Message, error) { + evt := &RepoReminderEvent{ + RepositoryID: repoID, + Project: projectID, + ProviderID: providerId, + } + + evtStr, err := json.Marshal(evt) + if err != nil { + return nil, fmt.Errorf("error marshalling repo reminder event: %w", err) + } + + msg := message.NewMessage(uuid.New().String(), evtStr) + return msg, nil +} + +// RepoReminderEventFromMessage creates a new repo reminder event from a message +func RepoReminderEventFromMessage(msg *message.Message) (*RepoReminderEvent, error) { + var evt RepoReminderEvent + if err := json.Unmarshal(msg.Payload, &evt); err != nil { + return nil, fmt.Errorf("error unmarshalling payload: %w", err) + } + + validate := validator.New() + if err := validate.Struct(&evt); err != nil { + return nil, err + } + + return &evt, nil +} diff --git a/internal/reminder/reminder.go b/internal/reminder/reminder.go index 8b8bf76929..f6f8048a56 100644 --- a/internal/reminder/reminder.go +++ b/internal/reminder/reminder.go @@ -28,6 +28,9 @@ import ( reminderconfig "github.com/stacklok/minder/internal/config/reminder" "github.com/stacklok/minder/internal/db" + "github.com/stacklok/minder/internal/events" + "github.com/stacklok/minder/internal/events/common" + remindermessages "github.com/stacklok/minder/internal/reminder/messages" ) // Interface is an interface over the reminder service @@ -51,7 +54,7 @@ type reminder struct { ticker *time.Ticker eventPublisher message.Publisher - eventDBCloser driverCloser + eventDBCloser common.DriverCloser } // NewReminder creates a new reminder instance @@ -144,19 +147,64 @@ func (r *reminder) sendReminders(ctx context.Context) []error { logger.Info().Msgf("created repository batch of size: %d", len(repos)) - // Update the reminder_last_sent for each repository to export as metrics + messages := make([]*message.Message, 0, len(repos)) + errorSlice := make([]error, 0) + + tx, err := r.store.BeginTransaction() + if err != nil { + return []error{err} + } + + defer tx.Rollback() + + qtx := r.store.GetQuerierWithTransaction(tx) + + // TODO: Collect Metrics + // Potential metrics: + // - Gauge: Number of reminders in the current batch + // - UpDownCounter: Average reminders sent per batch + // - Histogram: reminder_last_sent time distribution for _, repo := range repos { - logger.Debug().Str("repo", repo.ID.String()). - Time("previously", repo.ReminderLastSent.Time).Msg("updating reminder_last_sent") - err := r.store.UpdateReminderLastSentById(ctx, repo.ID) + repoReconcilerMessage, err := remindermessages.NewRepoReminderMessage( + repo.ProviderID, repo.RepoID, repo.ProjectID, + ) + if err != nil { + errorSlice = append(errorSlice, err) + continue + } + + logger.Debug(). + Str("repo", repo.ID.String()). + Time("previously", repo.ReminderLastSent.Time). + Msg("updating reminder_last_sent") + + err = qtx.UpdateReminderLastSentById(ctx, repo.ID) if err != nil { logger.Error().Err(err).Str("repo", repo.ID.String()).Msg("unable to update reminder_last_sent") - return []error{err} + errorSlice = append(errorSlice, err) + continue + } + + messages = append(messages, repoReconcilerMessage) + } + + if len(messages) != 0 { + err = r.eventPublisher.Publish(events.TopicQueueRepoReminder, messages...) + if err != nil { + errorSlice = append(errorSlice, fmt.Errorf("error publishing messages: %w", err)) + } else { + logger.Info().Msgf("sent %d reminders", len(messages)) + + // Commit the transaction i.e update reminder_last_sent + // only if the messages were sent successfully + if err = tx.Commit(); err != nil { + logger.Error().Err(err).Msg("unable to commit transaction") + errorSlice = append(errorSlice, err) + } } - // TODO: Send the actual reminders } - return nil + return errorSlice } func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, error) { diff --git a/internal/reminder/sql_publisher.go b/internal/reminder/sql_publisher.go index 520ebc5544..f2f952166a 100644 --- a/internal/reminder/sql_publisher.go +++ b/internal/reminder/sql_publisher.go @@ -23,11 +23,11 @@ import ( watermillsql "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" "github.com/ThreeDotsLabs/watermill/message" "github.com/rs/zerolog" -) -type driverCloser func() + "github.com/stacklok/minder/internal/events/common" +) -func (r *reminder) setupSQLPublisher(ctx context.Context) (message.Publisher, driverCloser, error) { +func (r *reminder) setupSQLPublisher(ctx context.Context) (message.Publisher, common.DriverCloser, error) { logger := zerolog.Ctx(ctx) db, _, err := r.cfg.EventConfig.Connection.GetDBConnection(ctx) diff --git a/internal/reminderprocessor/reminder_processor.go b/internal/reminderprocessor/reminder_processor.go new file mode 100644 index 0000000000..ff814231b2 --- /dev/null +++ b/internal/reminderprocessor/reminder_processor.go @@ -0,0 +1,63 @@ +// +// Copyright 2024 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package reminderprocessor processes the incoming reminders +package reminderprocessor + +import ( + "fmt" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/rs/zerolog/log" + + "github.com/stacklok/minder/internal/events" + reconcilermessages "github.com/stacklok/minder/internal/reconcilers/messages" + remindermessages "github.com/stacklok/minder/internal/reminder/messages" +) + +// ReminderProcessor processes the incoming reminders +type ReminderProcessor struct { + evt events.Interface +} + +// NewReminderProcessor creates a new ReminderProcessor +func NewReminderProcessor(evt events.Interface) *ReminderProcessor { + return &ReminderProcessor{evt: evt} +} + +// Register implements the Consumer interface. +func (rp *ReminderProcessor) Register(r events.Registrar) { + r.Register(events.TopicQueueRepoReminder, rp.reminderMessageHandler) +} + +func (rp *ReminderProcessor) reminderMessageHandler(msg *message.Message) error { + evt, err := remindermessages.RepoReminderEventFromMessage(msg) + if err != nil { + return fmt.Errorf("error unmarshalling reminder event: %w", err) + } + + log.Info().Msgf("Received reminder event: %v", evt) + + repoReconcileMsg, err := reconcilermessages.NewRepoReconcilerMessage(evt.ProviderID, evt.RepositoryID, evt.Project) + if err != nil { + return fmt.Errorf("error creating repo reconcile event: %w", err) + } + + // This is a non-fatal error, so we'll just log it and continue with the next ones + if err := rp.evt.Publish(events.TopicQueueReconcileRepoInit, repoReconcileMsg); err != nil { + log.Printf("error publishing reconciler event: %v", err) + } + return nil +} diff --git a/internal/service/service.go b/internal/service/service.go index 44312c0e2f..9d55fd69a9 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -49,6 +49,7 @@ import ( "github.com/stacklok/minder/internal/providers/session" provtelemetry "github.com/stacklok/minder/internal/providers/telemetry" "github.com/stacklok/minder/internal/reconcilers" + "github.com/stacklok/minder/internal/reminderprocessor" "github.com/stacklok/minder/internal/repositories/github" "github.com/stacklok/minder/internal/repositories/github/webhooks" "github.com/stacklok/minder/internal/ruletypes" @@ -188,6 +189,10 @@ func AllInOneServerService( im := installations.NewInstallationManager(ghProviders) evt.ConsumeEvents(im) + // Processor would only work for sql driver as reminder publisher is sql based + reminderProcessor := reminderprocessor.NewReminderProcessor(evt) + evt.ConsumeEvents(reminderProcessor) + // Start the gRPC and HTTP server in separate goroutines errg.Go(func() error { return s.StartGRPCServer(ctx)