Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connect reminder service to minder server to dispatch reminders #3630

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/events/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,6 @@ const (
TopicQueueReconcileEntityDelete = "internal.entity.delete.event"
// TopicQueueReconcileEntityAdd is the topic for reconciling when an entity is added
TopicQueueReconcileEntityAdd = "internal.entity.add.event"
// TopicQueueRepoReminder is the topic for repo reminder events
TopicQueueRepoReminder = "repo.reminder.event"
)
67 changes: 67 additions & 0 deletions internal/reminder/messages/messages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2024 Stacklok, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package messages contains the messages used by the reminder service
package messages

import (
"encoding/json"
"fmt"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/go-playground/validator/v10"
"github.com/google/uuid"
)

// RepoReminderEvent is an event that is published by the reminder service to trigger repo reconciliation
type RepoReminderEvent struct {
// Project is the project that the event is relevant to
Project uuid.UUID `json:"project"`
// RepositoryID is id of the repository to be reconciled
RepositoryID int64 `json:"repository" validate:"gte=0"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, this should use the id field from the repositories table, not the repo_id from GitHub. That will help insulate this from GitHub-specific fields that are different in (for example) GitLab or BitBucket in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, id is the primary key. It looks like our current indexes are set up off repo_id; sorry for not noticing this previously...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking further, it looks like we use the GitHub-matching repo_id in NewRepoReconcilerMessage already, so that argues towards using it here as well.

// ProviderID is the provider of the repository
ProviderID uuid.UUID `json:"provider"`
Comment on lines +29 to +34
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference would be to order these Project > Provider > Repo, which is the ownership order.

}

// NewRepoReminderMessage creates a new repo reminder message
func NewRepoReminderMessage(providerId uuid.UUID, repoID int64, projectID uuid.UUID) (*message.Message, error) {
evt := &RepoReminderEvent{
RepositoryID: repoID,
Project: projectID,
ProviderID: providerId,
}

evtStr, err := json.Marshal(evt)
if err != nil {
return nil, fmt.Errorf("error marshalling repo reminder event: %w", err)
}

msg := message.NewMessage(uuid.New().String(), evtStr)
return msg, nil
}

// RepoReminderEventFromMessage creates a new repo reminder event from a message
func RepoReminderEventFromMessage(msg *message.Message) (*RepoReminderEvent, error) {
var evt RepoReminderEvent
if err := json.Unmarshal(msg.Payload, &evt); err != nil {
return nil, fmt.Errorf("error unmarshalling payload: %w", err)
}

validate := validator.New()
if err := validate.Struct(&evt); err != nil {
return nil, err
}
Comment on lines +61 to +64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the benefit of calling validate and checking that the numeric repo_id is non-negative? It seems like the caller of this method will still need to check whether or not repo_id actually exists in the database, so this seems like it adds some extra code without really accomplishing the necessary validation; the necessary validation requires a database connection, and would presumably return the actual row if needed.


return &evt, nil
}
64 changes: 56 additions & 8 deletions internal/reminder/reminder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (

reminderconfig "github.com/stacklok/minder/internal/config/reminder"
"github.com/stacklok/minder/internal/db"
"github.com/stacklok/minder/internal/events"
"github.com/stacklok/minder/internal/events/common"
remindermessages "github.com/stacklok/minder/internal/reminder/messages"
)

// Interface is an interface over the reminder service
Expand All @@ -51,7 +54,7 @@ type reminder struct {
ticker *time.Ticker

eventPublisher message.Publisher
eventDBCloser driverCloser
eventDBCloser common.DriverCloser
}

// NewReminder creates a new reminder instance
Expand Down Expand Up @@ -144,19 +147,64 @@ func (r *reminder) sendReminders(ctx context.Context) []error {

logger.Info().Msgf("created repository batch of size: %d", len(repos))

// Update the reminder_last_sent for each repository to export as metrics
messages := make([]*message.Message, 0, len(repos))
errorSlice := make([]error, 0)

tx, err := r.store.BeginTransaction()
if err != nil {
return []error{err}
}

defer tx.Rollback()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need all of the following database reads and writes to go into a single transaction (but not the getRepositoryBatch call above in the transaction, or the r.updateRepositoryCursor in the outer getRepositoryBatch call)?


qtx := r.store.GetQuerierWithTransaction(tx)

// TODO: Collect Metrics
// Potential metrics:
// - Gauge: Number of reminders in the current batch
// - UpDownCounter: Average reminders sent per batch
// - Histogram: reminder_last_sent time distribution
Comment on lines +162 to +166
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we want to export the metrics? And some thoughts on adding these metrics?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I wonder what metrics would be useful as a minder operator.

About the histogram, I think a histogram that shows me how out of date (how much before a cutoff) a repo was when it was selected would be useful.

I think the metrics would be interesting to fine-tune the algorithm, did you see some other value in the metrics?

Since reminder is a separate service, then I guess implementation-wise it should just have its own metrics server that could be scraped separately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My $0.02 would be:

  1. Create a MeterProvider ala https://github.com/stacklok/minder/blob/main/internal/controlplane/server.go#L188.
  2. Wire the metrics into prometheus. Since this has no external interface, you can simply set up an internal http.Server to serve the metrics -- we could put other debug information there later.
  3. If you call otel.SetMeterProvider in the server setup, you don't need to worry about plumbing directly from the code recording the metric to the server; OTel will handle this in the backend (with some globals, but that's fine in this case). That means that the metrics code can do the simple thing from the OTel docs:
    var timeShift metrics.Float64Histogram
    func init() {
      meter := otel.GetMeterProvider().Meter("reminder")  // Or could be ""
      var err error
      timeShift, err = meter.Float64Counter("send_delay", api.WithDescription("Delay in revisit after eligible"), api.WithUnit("s"))
      if err != nil {
        panic("Couldn't register meter: %v", err)
      }
    }
    
    // At call site:
      timeShift.Record(ctx, send_delay)  // No need for attributes

for _, repo := range repos {
logger.Debug().Str("repo", repo.ID.String()).
Time("previously", repo.ReminderLastSent.Time).Msg("updating reminder_last_sent")
err := r.store.UpdateReminderLastSentById(ctx, repo.ID)
repoReconcilerMessage, err := remindermessages.NewRepoReminderMessage(
repo.ProviderID, repo.RepoID, repo.ProjectID,
)
if err != nil {
errorSlice = append(errorSlice, err)
continue
}

logger.Debug().
Str("repo", repo.ID.String()).
Time("previously", repo.ReminderLastSent.Time).
Msg("updating reminder_last_sent")

err = qtx.UpdateReminderLastSentById(ctx, repo.ID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional that you update when the reminder is sent before you send the message?

... except that with transactions, this actually sort-of happens after the message was sent, too -- you may be taking out a lock on the repo row for the duration of the iteration. Transactions can make before/after behavior hard to reason about in some cases, so I'd be cautious about introducing them when they aren't needed / aren't producing specific guarantees.

if err != nil {
logger.Error().Err(err).Str("repo", repo.ID.String()).Msg("unable to update reminder_last_sent")
return []error{err}
errorSlice = append(errorSlice, err)
Comment on lines 183 to +184
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general:

Either handle an error (by logging, recovering, etc) OR propagate an error, but don't do both.

Logging the error and also propagating it often means that you'll end up with errors being double- or even triple-logged, as different parts of the stack log the error but then pass it up to be logged in the next part of the stack.

continue
}

messages = append(messages, repoReconcilerMessage)
}

if len(messages) != 0 {
err = r.eventPublisher.Publish(events.TopicQueueRepoReminder, messages...)
if err != nil {
errorSlice = append(errorSlice, fmt.Errorf("error publishing messages: %w", err))
} else {
Comment on lines +193 to +195
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than an if/else here, generally prefer to use early-return, particularly since this seems like a terminal case.

More generally, I'm not convinced that there's a large benefit to batching the Publish messages, rather than sending each in the previous part of the loop.

logger.Info().Msgf("sent %d reminders", len(messages))

// Commit the transaction i.e update reminder_last_sent
// only if the messages were sent successfully
if err = tx.Commit(); err != nil {
logger.Error().Err(err).Msg("unable to commit transaction")
errorSlice = append(errorSlice, err)
}
Comment on lines +198 to +203
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UpdateReminderLastSentById queries are part of a transaction and are only committed if the messages are published successfully. Still, the transaction could fail, which would result in reminders being sent, but the reminder_last_sent field won't be updated (better than false positive i.e. reminder_last_sent was updated but reminders weren't sent)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine. Did you consider to split sending the messages into a separate function or return earlier if len(messages) == 0? This might be a personal preference but I find it easier to read code with fewer indentation levels.

Something like:

if len(messages) == 0 {
   return errorSlice
}

// the rest of the code

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the reminder_last_sent field is mostly diagnostic, because we're actually ticking forward on the (in-memory) repository cursor, rather than using reminder_last_sent to actually dictate when repos are reconsidered. I think this is correct, but that suggests to me that strict update correctness is less necessary.

(I'm not arguing against keeping this data, but it seems like the actual mechanism that prevents re-evaluation is when the reminder is received and a rule evaluation is complete, which could be minutes after the reminder is sent on a particularly bad day.)

}
// TODO: Send the actual reminders
}

return nil
return errorSlice
}

func (r *reminder) getRepositoryBatch(ctx context.Context) ([]db.Repository, error) {
Expand Down
6 changes: 3 additions & 3 deletions internal/reminder/sql_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
watermillsql "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/rs/zerolog"
)

type driverCloser func()
"github.com/stacklok/minder/internal/events/common"
)

func (r *reminder) setupSQLPublisher(ctx context.Context) (message.Publisher, driverCloser, error) {
func (r *reminder) setupSQLPublisher(ctx context.Context) (message.Publisher, common.DriverCloser, error) {
logger := zerolog.Ctx(ctx)

db, _, err := r.cfg.EventConfig.Connection.GetDBConnection(ctx)
Expand Down
63 changes: 63 additions & 0 deletions internal/reminderprocessor/reminder_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//
// Copyright 2024 Stacklok, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package reminderprocessor processes the incoming reminders
package reminderprocessor

import (
"fmt"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/rs/zerolog/log"

"github.com/stacklok/minder/internal/events"
reconcilermessages "github.com/stacklok/minder/internal/reconcilers/messages"
remindermessages "github.com/stacklok/minder/internal/reminder/messages"
)

// ReminderProcessor processes the incoming reminders
type ReminderProcessor struct {
evt events.Interface
}

// NewReminderProcessor creates a new ReminderProcessor
func NewReminderProcessor(evt events.Interface) *ReminderProcessor {
return &ReminderProcessor{evt: evt}
}

// Register implements the Consumer interface.
func (rp *ReminderProcessor) Register(r events.Registrar) {
r.Register(events.TopicQueueRepoReminder, rp.reminderMessageHandler)
}

func (rp *ReminderProcessor) reminderMessageHandler(msg *message.Message) error {
evt, err := remindermessages.RepoReminderEventFromMessage(msg)
if err != nil {
return fmt.Errorf("error unmarshalling reminder event: %w", err)
}

log.Info().Msgf("Received reminder event: %v", evt)

repoReconcileMsg, err := reconcilermessages.NewRepoReconcilerMessage(evt.ProviderID, evt.RepositoryID, evt.Project)
if err != nil {
return fmt.Errorf("error creating repo reconcile event: %w", err)
}

// This is a non-fatal error, so we'll just log it and continue with the next ones
if err := rp.evt.Publish(events.TopicQueueReconcileRepoInit, repoReconcileMsg); err != nil {
log.Printf("error publishing reconciler event: %v", err)
}
return nil
}
Comment on lines +45 to +63
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like this is simply forwarding messages from one queue to another; why not simply sending the TopicQueueReconcileRepoInit message directly from reminder.go and skip this extra intermediary? (There might be a good reason, but we should encapsulate that with a comment if there is such a reason.)

5 changes: 5 additions & 0 deletions internal/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/stacklok/minder/internal/providers/session"
provtelemetry "github.com/stacklok/minder/internal/providers/telemetry"
"github.com/stacklok/minder/internal/reconcilers"
"github.com/stacklok/minder/internal/reminderprocessor"
"github.com/stacklok/minder/internal/repositories/github"
"github.com/stacklok/minder/internal/repositories/github/webhooks"
"github.com/stacklok/minder/internal/ruletypes"
Expand Down Expand Up @@ -188,6 +189,10 @@ func AllInOneServerService(
im := installations.NewInstallationManager(ghProviders)
evt.ConsumeEvents(im)

// Processor would only work for sql driver as reminder publisher is sql based
reminderProcessor := reminderprocessor.NewReminderProcessor(evt)
evt.ConsumeEvents(reminderProcessor)

// Start the gRPC and HTTP server in separate goroutines
errg.Go(func() error {
return s.StartGRPCServer(ctx)
Expand Down